From a50433dd958ec52eec88867517b3ae6467758618 Mon Sep 17 00:00:00 2001 From: "DM42.Net (Matt Dent)" Date: Sun, 30 Dec 2018 15:05:45 -0500 Subject: Use subselect --- Zotlabs/Daemon/Master.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 0656ca05b..70d80d75b 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -89,7 +89,7 @@ class Master { $workersleep = ($workersleep) ? $workersleep : 5; cli_startup(); - $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", 'workitem_'.self::$queueworker, dbesc('workitem:%')); $jobs = 0; @@ -122,9 +122,9 @@ class Master { break; } sleep ($workersleep); - $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", - 'workitem_'.self::$queueworker, - dbesc('workitem:%')); + $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); } logger('Master: Worker Thread: queue items processed:' . $jobs); -- cgit v1.2.3 From a19563e1396cb882063db3ce3f355b2db48175e9 Mon Sep 17 00:00:00 2001 From: Mario Date: Wed, 2 Jan 2019 19:56:11 +0100 Subject: Revert "Use subselect" This reverts commit a50433dd958ec52eec88867517b3ae6467758618 --- Zotlabs/Daemon/Master.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 70d80d75b..0656ca05b 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -89,7 +89,7 @@ class Master { $workersleep = ($workersleep) ? $workersleep : 5; cli_startup(); - $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", 'workitem_'.self::$queueworker, dbesc('workitem:%')); $jobs = 0; @@ -122,9 +122,9 @@ class Master { break; } sleep ($workersleep); - $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", - 'workitem_'.self::$queueworker, - dbesc('workitem:%')); + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); } logger('Master: Worker Thread: queue items processed:' . $jobs); -- cgit v1.2.3 From 5d88326915790305b5ff6c4f8d6039d4171ac478 Mon Sep 17 00:00:00 2001 From: "M. Dent" Date: Wed, 9 Jan 2019 10:16:53 +0100 Subject: Remove Experimental Worker Queue from CORE - add hook 'daemon_master_release' --- Zotlabs/Daemon/Master.php | 129 ++++------------------------------------------ 1 file changed, 11 insertions(+), 118 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 0656ca05b..857d47243 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -16,8 +16,6 @@ if(array_search( __file__ , get_included_files()) === 0) { class Master { - static public $queueworker = null; - static public function Summon($arr) { proc_run('php','Zotlabs/Daemon/Master.php',$arr); } @@ -25,126 +23,21 @@ class Master { static public function Release($argc,$argv) { cli_startup(); - $maxworkers = get_config('system','max_queue_workers'); - - if (!$maxworkers || $maxworkers == 0) { - logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; - $cls::run($argc,$argv); - self::ClearQueue(); - } else { - logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - $workinfo = ['argc'=>$argc,'argv'=>$argv]; - q("insert into config (cat,k,v) values ('queuework','%s','%s')", - dbesc(uniqid('workitem:',true)), - dbesc(serialize($workinfo))); - self::Process(); - } - } - - static public function GetWorkerID() { - $maxworkers = get_config('system','max_queue_workers'); - $maxworkers = ($maxworkers) ? $maxworkers : 3; - - $workermaxage = get_config('system','max_queue_worker_age'); - $workermaxage = ($workermaxage) ? $workermaxage : 300; - - $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%'); - - if (count($workers) > $maxworkers) { - foreach ($workers as $idx => $worker) { - $curtime = time(); - $age = (intval($curtime) - intval($worker['v'])); - if ( $age > $workermaxage) { - logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG); - $k = explode('_',$worker['k']); - q("delete from config where cat='queueworkers' and k='%s'", - 'workerstarted_'.$k[1]); - q("update config set k='%s' where cat='queuework' and k='%s'", - dbesc(uniqid('workitem:',true)), - 'workitem_'.$k[1]); - unset($workers[$idx]); - } - } - if (count($workers) > $maxworkers) { - return false; - } - } - return uniqid('',true); - - } - - static public function Process() { - - self::$queueworker = self::GetWorkerID(); - - if (!self::$queueworker) { - logger('Master: unable to obtain worker ID.'); - killme(); - } - - set_config('queueworkers','workerstarted_'.self::$queueworker,time()); - - $workersleep = get_config('system','queue_worker_sleep'); - $workersleep = ($workersleep) ? $workersleep : 5; - cli_startup(); - - $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", - 'workitem_'.self::$queueworker, - dbesc('workitem:%')); - $jobs = 0; - while ($work) { - $workitem = q("select * from config where cat='queuework' and k='%s'", - 'workitem_'.self::$queueworker); - - if (isset($workitem[0])) { - $jobs++; - $workinfo = unserialize($workitem[0]['v']); - $argc = $workinfo['argc']; - $argv = $workinfo['argv']; - logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - - //Delete unclaimed duplicate workitems. - q("delete from config where cat='queuework' and k='workitem' and v='%s'", - serialize($argv)); - - $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; - $cls::run($argc,$argv); + $hookinfo = [ + 'argv'=>$argv + ]; - //Right now we assume that if we get a return, everything is OK. - //At some point we may want to test whether the run returns true/false - // and requeue the work to be tried again. But we probably want - // to implement some sort of "retry interval" first. + call_hooks ('daemon_master_release',$hookinfo); - q("delete from config where cat='queuework' and k='%s'", - 'workitem_'.self::$queueworker); - } else { - break; - } - sleep ($workersleep); - $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", - 'workitem_'.self::$queueworker, - dbesc('workitem:%')); + $argv = $hookinfo['argv']; + $argc = count($argv); + if ((!is_array($argv) || (count($argv) < 1))) { + return; } - logger('Master: Worker Thread: queue items processed:' . $jobs); - q("delete from config where cat='queueworkers' and k='%s'", - 'workerstarted_'.self::$queueworker); - } - static public function ClearQueue() { - $work = q("select * from config where cat='queuework' and k like '%s'", - dbesc('workitem%')); - foreach ($work as $workitem) { - $workinfo = unserialize($workitem['v']); - $argc = $workinfo['argc']; - $argv = $workinfo['argv']; - logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; - $cls::run($argc,$argv); - } - $work = q("delete from config where cat='queuework' and k like '%s'", - dbesc('workitem%')); + logger('Master: release: ' . json_encode($argv), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); } - } -- cgit v1.2.3 From b54ddccd7beece51d78b21b4ba9e98c908fec4c5 Mon Sep 17 00:00:00 2001 From: ZotSocial Admin Date: Wed, 9 Jan 2019 21:23:12 -0500 Subject: FIX: memory exhaustion on exceptionally large message queues & multiple Queue.php invocations duplicate work --- Zotlabs/Daemon/Queue.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index 8f529ff13..e041804f0 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -61,10 +61,18 @@ class Queue { // or just prior to this query based on recent and long-term delivery history. If we have good reason to believe // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once // or twice a day. - - $r = q("SELECT * FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s ", + + $r = q("SELECT *,RAND() as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", db_utcnow() ); + while ($r) { + foreach($r as $rv) { + queue_deliver($rv); + } + $r = q("SELECT *,RAND() as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", + db_utcnow() + ); + } } if(! $r) return; -- cgit v1.2.3 From 6791b05a4032a076651f7c8e4790614f0f405a55 Mon Sep 17 00:00:00 2001 From: "DM42.Net (Matt Dent)" Date: Thu, 10 Jan 2019 15:29:24 -0500 Subject: Fix for PGSQL/MYSQL difference --- Zotlabs/Daemon/Queue.php | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index e041804f0..6b525b8c3 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -12,6 +12,15 @@ class Queue { require_once('include/items.php'); require_once('include/bbcode.php'); + switch (DBTYPE_ACTIVE) { + case DBTYPE_MYSQL: + $sqlrandfunc = "RAND()"; + break; + + case DBTYPE_POSTGRESQL: + $sqlrandfunc = "RANDOM()"; + break; + } if($argc > 1) $queue_id = $argv[1]; @@ -62,14 +71,14 @@ class Queue { // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once // or twice a day. - $r = q("SELECT *,RAND() as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", + $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", db_utcnow() ); while ($r) { foreach($r as $rv) { queue_deliver($rv); } - $r = q("SELECT *,RAND() as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", + $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", db_utcnow() ); } -- cgit v1.2.3 From caf2c0a6c43f3162fc8f4d758eb06cb2f56b3865 Mon Sep 17 00:00:00 2001 From: "DM42.Net (Matt Dent)" Date: Fri, 11 Jan 2019 07:35:12 -0500 Subject: Use dba_driver.php::db_getfunc() --- Zotlabs/Daemon/Queue.php | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) (limited to 'Zotlabs/Daemon') diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index 6b525b8c3..814148404 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -12,16 +12,6 @@ class Queue { require_once('include/items.php'); require_once('include/bbcode.php'); - switch (DBTYPE_ACTIVE) { - case DBTYPE_MYSQL: - $sqlrandfunc = "RAND()"; - break; - - case DBTYPE_POSTGRESQL: - $sqlrandfunc = "RANDOM()"; - break; - } - if($argc > 1) $queue_id = $argv[1]; else @@ -71,6 +61,8 @@ class Queue { // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once // or twice a day. + $sqlrandfunc = db_getfunc('rand'); + $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", db_utcnow() ); -- cgit v1.2.3