diff options
author | M. Dent <dentm42@gmail.com> | 2018-10-07 17:43:03 +0000 |
---|---|---|
committer | Mario <mario@mariovavti.com> | 2018-10-08 11:12:09 +0200 |
commit | 58a9cde61d797ca64e38dfafa41b2b37d6bffc39 (patch) | |
tree | 1c5a9736c9e2f79aa4a58ecf934a35c51732c94c /Zotlabs | |
parent | 8ca321df7c742f4beed69bc92b3e28f978e56e04 (diff) | |
download | volse-hubzilla-58a9cde61d797ca64e38dfafa41b2b37d6bffc39.tar.gz volse-hubzilla-58a9cde61d797ca64e38dfafa41b2b37d6bffc39.tar.bz2 volse-hubzilla-58a9cde61d797ca64e38dfafa41b2b37d6bffc39.zip |
Fix (potential) runaway cron
(cherry picked from commit 108855aca212a858bf60d85d6418bae5450111d0)
Diffstat (limited to 'Zotlabs')
-rw-r--r-- | Zotlabs/Daemon/Master.php | 122 |
1 files changed, 118 insertions, 4 deletions
diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 580df97db..6fb674d60 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -3,7 +3,6 @@ namespace Zotlabs\Daemon; if(array_search( __file__ , get_included_files()) === 0) { - require_once('include/cli_startup.php'); array_shift($argv); $argc = count($argv); @@ -17,14 +16,129 @@ if(array_search( __file__ , get_included_files()) === 0) { class Master { + static public $queueworker = null; + static public function Summon($arr) { proc_run('php','Zotlabs/Daemon/Master.php',$arr); } static public function Release($argc,$argv) { cli_startup(); - logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; - $cls::run($argc,$argv); + + $maxworkers = get_config('system','max_queue_workers'); + + if (!$maxworkers || $maxworkers == 0) { + logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + self::ClearQueue(); + } else { + logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $workinfo = ['argc'=>$argc,'argv'=>$argv]; + q("insert into config (cat,k,v) values ('queuework','%s','%s')", + dbesc(uniqid('workitem:',true)), + dbesc(serialize($workinfo))); + self::Process(); + } + } + + static public function GetWorkerID() { + $maxworkers = get_config('system','max_queue_workers'); + $maxworkers = ($maxworkers) ? $maxworkers : 3; + + $workermaxage = get_config('system','max_queue_worker_age'); + $workermaxage = ($workermaxage) ? $workermaxage : 300; + + $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%'); + + if (count($workers) > $maxworkers) { + foreach ($workers as $idx => $worker) { + $curtime = time(); + if (($time - $worker['v']) > $workermaxage) { + $k = explode('_',$worker['k']); + q("delete from config where cat='queueworkers' and k='%s'", + 'workerstarted_'.$k[1]); + q("update config set k='workitem' where cat='queuework' and k='%s'", + 'workitem_'.$k[1]); + unset($workers[$idx]); + } + } + if (count($workers) > $maxworkers) { + return false; + } + } + return uniqid(); + + } + + static public function Process() { + + self::$queueworker = self::GetWorkerID(); + + if (!self::$queueworker) { + logger('Master: unable to obtain worker ID.'); + killme(); + } + + set_config('queueworkers','workerstarted_'.self::$queueworker,time()); + + $workersleep = get_config('system','queue_worker_sleep'); + $workersleep = ($workersleep) ? $workersleep : 5; + cli_startup(); + + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); + $jobs = 0; + while ($work) { + $workitem = q("select * from config where cat='queuework' and k='%s'", + 'workitem_'.self::$queueworker); + + if (isset($workitem[0])) { + $jobs++; + $workinfo = unserialize($workitem[0]['v']); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + + //Right now we assume that if we get a return, everything is OK. + //At some point we may want to test whether the run returns true/false + // and requeue the work to be tried again. But we probably want + // to implement some sort of "retry interval" first. + + q("delete from config where cat='queuework' and k='%s'", + 'workitem_'.self::$queueworker); + } else { + break; + } + sleep ($workersleep); + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); + + } + logger('Master: Worker Thread: queue items processed:' . $jobs); + q("delete from config where cat='queueworkers' and k='%s'", + 'workerstarted_'.self::$queueworker); } + + static public function ClearQueue() { + $work = q("select * from config where cat='queuework' and k like '%s'", + 'workitem_%', + dbesc('workitem%')); + foreach ($work as $workitem) { + $workinfo = unserialize($workitem['v']); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + } + $work = q("delete from config where cat='queuework' and k like '%s'", + 'workitem_%', + dbesc('workitem%')); + } + } |