From 724b8cc6a54c2e778a2d67b2177ecf373f615c64 Mon Sep 17 00:00:00 2001 From: Mario Date: Sun, 12 Feb 2023 10:41:23 +0000 Subject: port queue improvements from streams --- Zotlabs/Daemon/Queue.php | 77 +++++++++++++++++------------------------------- Zotlabs/Lib/Queue.php | 28 +++++++++++++----- 2 files changed, 48 insertions(+), 57 deletions(-) (limited to 'Zotlabs') diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index 3eb7d9d23..30f474a5c 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -7,79 +7,56 @@ use Zotlabs\Lib\Queue as LibQueue; class Queue { static public function run($argc, $argv) { - - require_once('include/items.php'); - require_once('include/bbcode.php'); - - if ($argc > 1) - $queue_id = $argv[1]; - else - $queue_id = EMPTY_STR; + $queue_id = ($argc > 1) ? $argv[1] : ''; logger('queue: start'); // delete all queue items more than 3 days old // but first mark these sites dead if we haven't heard from them in a month - $r = q("select outq_posturl from outq where outq_created < %s - INTERVAL %s", - db_utcnow(), db_quoteinterval('3 DAY') + $oldqItems = q("select outq_posturl from outq where outq_created < %s - INTERVAL %s", + db_utcnow(), + db_quoteinterval('3 DAY') ); - if ($r) { - foreach ($r as $rr) { - $h = parse_url($rr['outq_posturl']); - $desturl = $h['scheme'] . '://' . $h['host'] . (isset($h['port']) ? ':' . $h['port'] : ''); + + if ($oldqItems) { + foreach ($oldqItems as $qItem) { + $h = parse_url($qItem['outq_posturl']); + $site_url = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : ''); q("update site set site_dead = 1 where site_dead = 0 and site_url = '%s' and site_update < %s - INTERVAL %s", - dbesc($desturl), - db_utcnow(), db_quoteinterval('1 MONTH') + dbesc($site_url), + db_utcnow(), + db_quoteinterval('1 MONTH') ); } } + logger('Removing ' . count($oldqItems) . ' old queue entries'); q("DELETE FROM outq WHERE outq_created < %s - INTERVAL %s", - db_utcnow(), db_quoteinterval('3 DAY') + db_utcnow(), + db_quoteinterval('3 DAY') ); + $deliveries = []; + if ($queue_id) { - $r = q("SELECT * FROM outq WHERE outq_hash = '%s' LIMIT 1", + $qItems = q("SELECT * FROM outq WHERE outq_hash = '%s' LIMIT 1", dbesc($queue_id) ); + logger('queue deliver: ' . $qItems[0]['outq_hash'] . ' to ' . $qItems[0]['outq_posturl'], LOGGER_DEBUG); + LibQueue\Queue::deliver($qItems[0]); } else { - - // For the first 12 hours we'll try to deliver every 15 minutes - // After that, we'll only attempt delivery once per hour. - // This currently only handles the default queue drivers ('zot' or '') which we will group by posturl - // so that we don't start off a thousand deliveries for a couple of dead hubs. - // The zot driver will deliver everything destined for a single hub once contact is made (*if* contact is made). - // Other drivers will have to do something different here and may need their own query. - - // Note: this requires some tweaking as new posts to long dead hubs once a day will keep them in the - // "every 15 minutes" category. We probably need to prioritise them when inserted into the queue - // or just prior to this query based on recent and long-term delivery history. If we have good reason to believe - // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once - // or twice a day. - - $sqlrandfunc = db_getfunc('rand'); - - $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", + $qItems = q("SELECT * FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s ", db_utcnow() ); - while ($r) { - foreach ($r as $rv) { - LibQueue::deliver($rv); + if ($qItems) { + foreach ($qItems as $qItem) { + $deliveries[] = $qItem['outq_hash']; } - $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", - db_utcnow() - ); + do_delivery($deliveries); } - } - if (!$r) - return; - - foreach ($r as $rv) { - LibQueue::deliver($rv); - } - - return; + } } + } diff --git a/Zotlabs/Lib/Queue.php b/Zotlabs/Lib/Queue.php index c3f9cda20..23691408a 100644 --- a/Zotlabs/Lib/Queue.php +++ b/Zotlabs/Lib/Queue.php @@ -65,16 +65,32 @@ class Queue { ); } - - static function remove($id,$channel_id = 0) { - logger('queue: remove queue item ' . $id,LOGGER_DEBUG); + public static function remove($id, $channel_id = 0) { + logger('queue: remove queue item ' . $id, LOGGER_DEBUG); $sql_extra = (($channel_id) ? " and outq_channel = " . intval($channel_id) . " " : ''); - q("DELETE FROM outq WHERE outq_hash = '%s' $sql_extra", + // figure out what endpoint it is going to. + $record = q("select outq_posturl from outq where outq_hash = '%s' $sql_extra", dbesc($id) ); - } + if ($record) { + q("DELETE FROM outq WHERE outq_hash = '%s' $sql_extra", + dbesc($id) + ); + + // If there's anything remaining in the queue for this site, move one of them to the next active + // queue run by setting outq_scheduled back to the present. We may be attempting to deliver it + // as a 'piled_up' delivery, but this ensures the site has an active queue entry as long as queued + // entries still exist for it. This fixes an issue where one immediate delivery left everything + // else for that site undeliverable since all the other entries had been pushed far into the future. + + q("update outq set outq_scheduled = '%s' where outq_posturl = '%s' limit 1", + dbesc(datetime_convert()), + dbesc($record[0]['outq_posturl']) + ); + } + } static function remove_by_posturl($posturl) { logger('queue: remove queue posturl ' . $posturl,LOGGER_DEBUG); @@ -84,8 +100,6 @@ class Queue { ); } - - static function set_delivered($id,$channel = 0) { logger('queue: set delivered ' . $id,LOGGER_DEBUG); $sql_extra = (($channel['channel_id']) ? " and outq_channel = " . intval($channel['channel_id']) . " " : ''); -- cgit v1.2.3