diff options
Diffstat (limited to 'Zotlabs/Daemon')
-rw-r--r-- | Zotlabs/Daemon/Cron.php | 2 | ||||
-rw-r--r-- | Zotlabs/Daemon/Master.php | 127 | ||||
-rw-r--r-- | Zotlabs/Daemon/Notifier.php | 2 |
3 files changed, 126 insertions, 5 deletions
diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php index d1c516f96..25e49b817 100644 --- a/Zotlabs/Daemon/Cron.php +++ b/Zotlabs/Daemon/Cron.php @@ -60,7 +60,7 @@ class Cron { drop_item($rr['id'],false,(($rr['item_wall']) ? DROPITEM_PHASE1 : DROPITEM_NORMAL)); if($rr['item_wall']) { // The notifier isn't normally invoked unless item_drop is interactive. - Zotlabs\Daemon\Master::Summon( [ 'Notifier', 'drop', $rr['id'] ] ); + Master::Summon( [ 'Notifier', 'drop', $rr['id'] ] ); } } } diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 580df97db..3a71ee578 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -3,7 +3,6 @@ namespace Zotlabs\Daemon; if(array_search( __file__ , get_included_files()) === 0) { - require_once('include/cli_startup.php'); array_shift($argv); $argc = count($argv); @@ -17,14 +16,134 @@ if(array_search( __file__ , get_included_files()) === 0) { class Master { + static public $queueworker = null; + static public function Summon($arr) { proc_run('php','Zotlabs/Daemon/Master.php',$arr); } static public function Release($argc,$argv) { cli_startup(); - logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); - $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; - $cls::run($argc,$argv); + + $maxworkers = get_config('system','max_queue_workers'); + + if (!$maxworkers || $maxworkers == 0) { + logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + self::ClearQueue(); + } else { + logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $workinfo = ['argc'=>$argc,'argv'=>$argv]; + q("insert into config (cat,k,v) values ('queuework','%s','%s')", + dbesc(uniqid('workitem:',true)), + dbesc(serialize($workinfo))); + self::Process(); + } + } + + static public function GetWorkerID() { + $maxworkers = get_config('system','max_queue_workers'); + $maxworkers = ($maxworkers) ? $maxworkers : 3; + + $workermaxage = get_config('system','max_queue_worker_age'); + $workermaxage = ($workermaxage) ? $workermaxage : 300; + + $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%'); + + if (count($workers) > $maxworkers) { + foreach ($workers as $idx => $worker) { + $curtime = time(); + $age = (intval($curtime) - intval($worker['v'])); + if ( $age > $workermaxage) { + logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG); + $k = explode('_',$worker['k']); + q("delete from config where cat='queueworkers' and k='%s'", + 'workerstarted_'.$k[1]); + q("update config set k='workitem' where cat='queuework' and k='%s'", + 'workitem_'.$k[1]); + unset($workers[$idx]); + } + } + if (count($workers) > $maxworkers) { + return false; + } + } + return uniqid(); + + } + + static public function Process() { + + self::$queueworker = self::GetWorkerID(); + + if (!self::$queueworker) { + logger('Master: unable to obtain worker ID.'); + killme(); + } + + set_config('queueworkers','workerstarted_'.self::$queueworker,time()); + + $workersleep = get_config('system','queue_worker_sleep'); + $workersleep = ($workersleep) ? $workersleep : 5; + cli_startup(); + + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); + $jobs = 0; + while ($work) { + $workitem = q("select * from config where cat='queuework' and k='%s'", + 'workitem_'.self::$queueworker); + + if (isset($workitem[0])) { + $jobs++; + $workinfo = unserialize($workitem[0]['v']); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + + //Delete unclaimed duplicate workitems. + q("delete from config where cat='queuework' and k='workitem' and v='%s'", + serialize($argv)); + + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + + //Right now we assume that if we get a return, everything is OK. + //At some point we may want to test whether the run returns true/false + // and requeue the work to be tried again. But we probably want + // to implement some sort of "retry interval" first. + + q("delete from config where cat='queuework' and k='%s'", + 'workitem_'.self::$queueworker); + } else { + break; + } + sleep ($workersleep); + $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1", + 'workitem_'.self::$queueworker, + dbesc('workitem:%')); + + } + logger('Master: Worker Thread: queue items processed:' . $jobs); + q("delete from config where cat='queueworkers' and k='%s'", + 'workerstarted_'.self::$queueworker); } + + static public function ClearQueue() { + $work = q("select * from config where cat='queuework' and k like '%s'", + dbesc('workitem%')); + foreach ($work as $workitem) { + $workinfo = unserialize($workitem['v']); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc,$argv); + } + $work = q("delete from config where cat='queuework' and k like '%s'", + dbesc('workitem%')); + } + } diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php index fa2368a92..f74c8f11c 100644 --- a/Zotlabs/Daemon/Notifier.php +++ b/Zotlabs/Daemon/Notifier.php @@ -559,6 +559,8 @@ class Notifier { foreach($dhubs as $hub) { + logger('notifier_hub: ' . $hub['hubloc_url'],LOGGER_DEBUG); + if($hub['hubloc_network'] !== 'zot') { $narr = [ 'channel' => $channel, |