From d5d67708ac912fa863a062e3469eb4ac17e84cdf Mon Sep 17 00:00:00 2001 From: zotlabs Date: Sun, 29 Jan 2017 14:45:25 -0800 Subject: Alter the queue so that each queue item stores the scheduled time of the next delivery. This keeps the query for queued items simple. We no longer group by posturl; as the queue update function will only keep one item per destination scheduled for shorter term processing. Others (multiple queued items for a single destination) will be scheduled for delivery far into the future and only delivered if the hub responds to the "active" or short term queue item. --- Zotlabs/Daemon/Queue.php | 23 ++++------------------- boot.php | 2 +- include/queue_fn.php | 46 +++++++++++++++++++++++++++++++++++++++++---- install/schema_mysql.sql | 2 ++ install/schema_postgres.sql | 2 ++ install/update.php | 21 +++++++++++++++++++-- 6 files changed, 70 insertions(+), 26 deletions(-) diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index 27306589d..74541867c 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -61,30 +61,15 @@ class Queue { // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once // or twice a day. - // FIXME: can we sort postgres on outq_priority and maintain the 'distinct' ? - // The order by max(outq_priority) might be a dodgy query because of the group by. - // The desired result is to return a sequence in the order most likely to be delivered in this run. - // If a hub has already been sitting in the queue for a few days, they should be delivered last; - // hence every failure should drop them further down the priority list. - - if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) { - $prefix = 'DISTINCT ON (outq_posturl)'; - $suffix = 'ORDER BY outq_posturl'; - } else { - $prefix = ''; - $suffix = 'GROUP BY outq_posturl ORDER BY max(outq_priority)'; - } - $r = q("SELECT $prefix * FROM outq WHERE outq_delivered = 0 and (( outq_created > %s - INTERVAL %s and outq_updated < %s - INTERVAL %s ) OR ( outq_updated < %s - INTERVAL %s )) $suffix", - db_utcnow(), db_quoteinterval('12 HOUR'), - db_utcnow(), db_quoteinterval('15 MINUTE'), - db_utcnow(), db_quoteinterval('1 HOUR') + $r = q("SELECT * FROM outq WHERE outq_delivered = 0 and outq_scheduled < '%s' ", + db_utcnow() ); } if(! $r) return; - foreach($r as $rr) { - queue_deliver($rr); + foreach($r as $rv) { + queue_deliver($rv); } } } diff --git a/boot.php b/boot.php index adc5dc13d..69a08f0b3 100755 --- a/boot.php +++ b/boot.php @@ -52,7 +52,7 @@ define ( 'PLATFORM_NAME', 'hubzilla' ); define ( 'STD_VERSION', '2.1' ); define ( 'ZOT_REVISION', '1.2' ); -define ( 'DB_UPDATE_VERSION', 1187 ); +define ( 'DB_UPDATE_VERSION', 1188 ); define ( 'PROJECT_BASE', __DIR__ ); diff --git a/include/queue_fn.php b/include/queue_fn.php index 0950faf85..74dde5de2 100644 --- a/include/queue_fn.php +++ b/include/queue_fn.php @@ -2,9 +2,42 @@ function update_queue_item($id, $add_priority = 0) { logger('queue: requeue item ' . $id,LOGGER_DEBUG); - q("UPDATE outq SET outq_updated = '%s', outq_priority = outq_priority + %d WHERE outq_hash = '%s'", + $x = q("select outq_created, outq_posturl from outq where outq_hash = '%s' limit 1", + dbesc($id) + ); + if(! $x) + return; + + // Set all other records for this destination way into the future. + // The queue delivers by destination. We'll keep one queue item for + // this destination (this one) with a shorter delivery. If we succeed + // once, we'll try to deliver everything for that destination. + // The delivery will be set to at most once per hour, and if the + // queue item is less than 12 hours old, we'll schedule for fifteen + // minutes. + + $r = q("UPDATE outq SET outq_scheduled = '%s' WHERE outq_posturl = '%s'", + dbesc(datetime_convert('UTC','UTC','now + 5 days')), + dbesc($x[0]['outq_posturl']) + ); + + $since = datetime_convert('UTC','UTC',$x[0]['outq_created']); + + if($since < datetime_convert('UTC','UTC','now - 12 hour')) { + $next = datetime_convert('UTC','UTC','now + 1 hour'); + } + else { + $next = datetime_convert('UTC','UTC','now + 15 minutes'); + } + + q("UPDATE outq SET outq_updated = '%s', + outq_priority = outq_priority + %d, + outq_scheduled = '%s' + WHERE outq_hash = '%s'", + dbesc(datetime_convert()), intval($add_priority), + dbesc($next), dbesc($id) ); } @@ -33,8 +66,12 @@ function queue_set_delivered($id,$channel = 0) { logger('queue: set delivered ' . $id,LOGGER_DEBUG); $sql_extra = (($channel_id) ? " and outq_channel = " . intval($channel_id) . " " : ''); - q("update outq set outq_delivered = 1, outq_updated = '%s' where outq_hash = '%s' $sql_extra ", + // Set the next scheduled run date so far in the future that it will be expired + // long before it ever makes it back into the delivery chain. + + q("update outq set outq_delivered = 1, outq_updated = '%s', outq_scheduled = '%s' where outq_hash = '%s' $sql_extra ", dbesc(datetime_convert()), + dbesc(datetime_convert('UTC','UTC','now + 5 days')), dbesc($id) ); } @@ -44,8 +81,8 @@ function queue_set_delivered($id,$channel = 0) { function queue_insert($arr) { $x = q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_priority, - outq_created, outq_updated, outq_notify, outq_msg ) - values ( '%s', %d, %d, '%s', '%s', %d, %d, '%s', '%s', '%s', '%s' )", + outq_created, outq_updated, outq_scheduled, outq_notify, outq_msg ) + values ( '%s', %d, %d, '%s', '%s', %d, %d, '%s', '%s', '%s', '%s', '%s' )", dbesc($arr['hash']), intval($arr['account_id']), intval($arr['channel_id']), @@ -55,6 +92,7 @@ function queue_insert($arr) { intval(($arr['priority']) ? $arr['priority'] : 0), dbesc(datetime_convert()), dbesc(datetime_convert()), + dbesc(datetime_convert()), dbesc($arr['notify']), dbesc(($arr['msg']) ? $arr['msg'] : '') ); diff --git a/install/schema_mysql.sql b/install/schema_mysql.sql index bd7d9d79c..be5317722 100644 --- a/install/schema_mysql.sql +++ b/install/schema_mysql.sql @@ -898,6 +898,7 @@ CREATE TABLE IF NOT EXISTS `outq` ( `outq_delivered` tinyint(1) NOT NULL DEFAULT '0', `outq_created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', `outq_updated` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', + `outq_scheduled` datetime NOT NULL DEFAULT '0001-01-01 00:00:00', `outq_notify` mediumtext NOT NULL, `outq_msg` mediumtext NOT NULL, `outq_priority` smallint(6) NOT NULL DEFAULT '0', @@ -907,6 +908,7 @@ CREATE TABLE IF NOT EXISTS `outq` ( KEY `outq_hub` (`outq_posturl`), KEY `outq_created` (`outq_created`), KEY `outq_updated` (`outq_updated`), + KEY `outq_scheduled` (`outq_scheduled`), KEY `outq_async` (`outq_async`), KEY `outq_delivered` (`outq_delivered`), KEY `outq_priority` (`outq_priority`) diff --git a/install/schema_postgres.sql b/install/schema_postgres.sql index f8e4ecf5f..e78425828 100644 --- a/install/schema_postgres.sql +++ b/install/schema_postgres.sql @@ -884,6 +884,7 @@ CREATE TABLE "outq" ( "outq_delivered" numeric(1) NOT NULL DEFAULT '0', "outq_created" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00', "outq_updated" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00', + "outq_scheduled" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00', "outq_notify" text NOT NULL, "outq_msg" text NOT NULL, "outq_priority" smallint NOT NULL DEFAULT '0', @@ -894,6 +895,7 @@ create index "outq_channel" on outq ("outq_channel"); create index "outq_hub" on outq ("outq_posturl"); create index "outq_created" on outq ("outq_created"); create index "outq_updated" on outq ("outq_updated"); +create index "outq_scheduled" on outq ("outq_scheduled"); create index "outq_async" on outq ("outq_async"); create index "outq_delivered" on outq ("outq_delivered"); create index "outq_priority" on outq ("outq_priority"); diff --git a/install/update.php b/install/update.php index 34f23b049..87cf4ba60 100644 --- a/install/update.php +++ b/install/update.php @@ -1,6 +1,6 @@