aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs/Daemon
diff options
context:
space:
mode:
Diffstat (limited to 'Zotlabs/Daemon')
-rw-r--r--Zotlabs/Daemon/Cron.php2
-rw-r--r--Zotlabs/Daemon/Master.php127
-rw-r--r--Zotlabs/Daemon/Notifier.php2
3 files changed, 126 insertions, 5 deletions
diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php
index d1c516f96..25e49b817 100644
--- a/Zotlabs/Daemon/Cron.php
+++ b/Zotlabs/Daemon/Cron.php
@@ -60,7 +60,7 @@ class Cron {
drop_item($rr['id'],false,(($rr['item_wall']) ? DROPITEM_PHASE1 : DROPITEM_NORMAL));
if($rr['item_wall']) {
// The notifier isn't normally invoked unless item_drop is interactive.
- Zotlabs\Daemon\Master::Summon( [ 'Notifier', 'drop', $rr['id'] ] );
+ Master::Summon( [ 'Notifier', 'drop', $rr['id'] ] );
}
}
}
diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php
index 580df97db..3a71ee578 100644
--- a/Zotlabs/Daemon/Master.php
+++ b/Zotlabs/Daemon/Master.php
@@ -3,7 +3,6 @@
namespace Zotlabs\Daemon;
if(array_search( __file__ , get_included_files()) === 0) {
-
require_once('include/cli_startup.php');
array_shift($argv);
$argc = count($argv);
@@ -17,14 +16,134 @@ if(array_search( __file__ , get_included_files()) === 0) {
class Master {
+ static public $queueworker = null;
+
static public function Summon($arr) {
proc_run('php','Zotlabs/Daemon/Master.php',$arr);
}
static public function Release($argc,$argv) {
cli_startup();
- logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
- $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
- $cls::run($argc,$argv);
+
+ $maxworkers = get_config('system','max_queue_workers');
+
+ if (!$maxworkers || $maxworkers == 0) {
+ logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+ self::ClearQueue();
+ } else {
+ logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $workinfo = ['argc'=>$argc,'argv'=>$argv];
+ q("insert into config (cat,k,v) values ('queuework','%s','%s')",
+ dbesc(uniqid('workitem:',true)),
+ dbesc(serialize($workinfo)));
+ self::Process();
+ }
+ }
+
+ static public function GetWorkerID() {
+ $maxworkers = get_config('system','max_queue_workers');
+ $maxworkers = ($maxworkers) ? $maxworkers : 3;
+
+ $workermaxage = get_config('system','max_queue_worker_age');
+ $workermaxage = ($workermaxage) ? $workermaxage : 300;
+
+ $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');
+
+ if (count($workers) > $maxworkers) {
+ foreach ($workers as $idx => $worker) {
+ $curtime = time();
+ $age = (intval($curtime) - intval($worker['v']));
+ if ( $age > $workermaxage) {
+ logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
+ $k = explode('_',$worker['k']);
+ q("delete from config where cat='queueworkers' and k='%s'",
+ 'workerstarted_'.$k[1]);
+ q("update config set k='workitem' where cat='queuework' and k='%s'",
+ 'workitem_'.$k[1]);
+ unset($workers[$idx]);
+ }
+ }
+ if (count($workers) > $maxworkers) {
+ return false;
+ }
+ }
+ return uniqid();
+
+ }
+
+ static public function Process() {
+
+ self::$queueworker = self::GetWorkerID();
+
+ if (!self::$queueworker) {
+ logger('Master: unable to obtain worker ID.');
+ killme();
+ }
+
+ set_config('queueworkers','workerstarted_'.self::$queueworker,time());
+
+ $workersleep = get_config('system','queue_worker_sleep');
+ $workersleep = ($workersleep) ? $workersleep : 5;
+ cli_startup();
+
+ $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
+ 'workitem_'.self::$queueworker,
+ dbesc('workitem:%'));
+ $jobs = 0;
+ while ($work) {
+ $workitem = q("select * from config where cat='queuework' and k='%s'",
+ 'workitem_'.self::$queueworker);
+
+ if (isset($workitem[0])) {
+ $jobs++;
+ $workinfo = unserialize($workitem[0]['v']);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+
+ //Delete unclaimed duplicate workitems.
+ q("delete from config where cat='queuework' and k='workitem' and v='%s'",
+ serialize($argv));
+
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+
+ //Right now we assume that if we get a return, everything is OK.
+ //At some point we may want to test whether the run returns true/false
+ // and requeue the work to be tried again. But we probably want
+ // to implement some sort of "retry interval" first.
+
+ q("delete from config where cat='queuework' and k='%s'",
+ 'workitem_'.self::$queueworker);
+ } else {
+ break;
+ }
+ sleep ($workersleep);
+ $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
+ 'workitem_'.self::$queueworker,
+ dbesc('workitem:%'));
+
+ }
+ logger('Master: Worker Thread: queue items processed:' . $jobs);
+ q("delete from config where cat='queueworkers' and k='%s'",
+ 'workerstarted_'.self::$queueworker);
}
+
+ static public function ClearQueue() {
+ $work = q("select * from config where cat='queuework' and k like '%s'",
+ dbesc('workitem%'));
+ foreach ($work as $workitem) {
+ $workinfo = unserialize($workitem['v']);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+ }
+ $work = q("delete from config where cat='queuework' and k like '%s'",
+ dbesc('workitem%'));
+ }
+
}
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php
index fa2368a92..f74c8f11c 100644
--- a/Zotlabs/Daemon/Notifier.php
+++ b/Zotlabs/Daemon/Notifier.php
@@ -559,6 +559,8 @@ class Notifier {
foreach($dhubs as $hub) {
+ logger('notifier_hub: ' . $hub['hubloc_url'],LOGGER_DEBUG);
+
if($hub['hubloc_network'] !== 'zot') {
$narr = [
'channel' => $channel,