aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs/Daemon/Master.php
diff options
context:
space:
mode:
authorMario Vavti <mario@mariovavti.com>2018-10-19 11:18:28 +0200
committerMario Vavti <mario@mariovavti.com>2018-10-19 11:18:28 +0200
commitfa9e9510e5d993d183feb942fe74be5fdd07f5cf (patch)
tree41fec09f527a9346e043b8099b458a97d81b03ed /Zotlabs/Daemon/Master.php
parent32de123db0ac526795a237ff46885fe8a332cbc0 (diff)
parent06b3ad1071c755757555baf941e2c0f446f97b21 (diff)
downloadvolse-hubzilla-fa9e9510e5d993d183feb942fe74be5fdd07f5cf.tar.gz
volse-hubzilla-fa9e9510e5d993d183feb942fe74be5fdd07f5cf.tar.bz2
volse-hubzilla-fa9e9510e5d993d183feb942fe74be5fdd07f5cf.zip
Merge branch '3.8RC'3.8
Diffstat (limited to 'Zotlabs/Daemon/Master.php')
-rw-r--r--Zotlabs/Daemon/Master.php127
1 files changed, 123 insertions, 4 deletions
diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php
index 580df97db..3a71ee578 100644
--- a/Zotlabs/Daemon/Master.php
+++ b/Zotlabs/Daemon/Master.php
@@ -3,7 +3,6 @@
namespace Zotlabs\Daemon;
if(array_search( __file__ , get_included_files()) === 0) {
-
require_once('include/cli_startup.php');
array_shift($argv);
$argc = count($argv);
@@ -17,14 +16,134 @@ if(array_search( __file__ , get_included_files()) === 0) {
class Master {
+ static public $queueworker = null;
+
static public function Summon($arr) {
proc_run('php','Zotlabs/Daemon/Master.php',$arr);
}
static public function Release($argc,$argv) {
cli_startup();
- logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
- $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
- $cls::run($argc,$argv);
+
+ $maxworkers = get_config('system','max_queue_workers');
+
+ if (!$maxworkers || $maxworkers == 0) {
+ logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+ self::ClearQueue();
+ } else {
+ logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $workinfo = ['argc'=>$argc,'argv'=>$argv];
+ q("insert into config (cat,k,v) values ('queuework','%s','%s')",
+ dbesc(uniqid('workitem:',true)),
+ dbesc(serialize($workinfo)));
+ self::Process();
+ }
+ }
+
+ static public function GetWorkerID() {
+ $maxworkers = get_config('system','max_queue_workers');
+ $maxworkers = ($maxworkers) ? $maxworkers : 3;
+
+ $workermaxage = get_config('system','max_queue_worker_age');
+ $workermaxage = ($workermaxage) ? $workermaxage : 300;
+
+ $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');
+
+ if (count($workers) > $maxworkers) {
+ foreach ($workers as $idx => $worker) {
+ $curtime = time();
+ $age = (intval($curtime) - intval($worker['v']));
+ if ( $age > $workermaxage) {
+ logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
+ $k = explode('_',$worker['k']);
+ q("delete from config where cat='queueworkers' and k='%s'",
+ 'workerstarted_'.$k[1]);
+ q("update config set k='workitem' where cat='queuework' and k='%s'",
+ 'workitem_'.$k[1]);
+ unset($workers[$idx]);
+ }
+ }
+ if (count($workers) > $maxworkers) {
+ return false;
+ }
+ }
+ return uniqid();
+
+ }
+
+ static public function Process() {
+
+ self::$queueworker = self::GetWorkerID();
+
+ if (!self::$queueworker) {
+ logger('Master: unable to obtain worker ID.');
+ killme();
+ }
+
+ set_config('queueworkers','workerstarted_'.self::$queueworker,time());
+
+ $workersleep = get_config('system','queue_worker_sleep');
+ $workersleep = ($workersleep) ? $workersleep : 5;
+ cli_startup();
+
+ $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
+ 'workitem_'.self::$queueworker,
+ dbesc('workitem:%'));
+ $jobs = 0;
+ while ($work) {
+ $workitem = q("select * from config where cat='queuework' and k='%s'",
+ 'workitem_'.self::$queueworker);
+
+ if (isset($workitem[0])) {
+ $jobs++;
+ $workinfo = unserialize($workitem[0]['v']);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+
+ //Delete unclaimed duplicate workitems.
+ q("delete from config where cat='queuework' and k='workitem' and v='%s'",
+ serialize($argv));
+
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+
+ //Right now we assume that if we get a return, everything is OK.
+ //At some point we may want to test whether the run returns true/false
+ // and requeue the work to be tried again. But we probably want
+ // to implement some sort of "retry interval" first.
+
+ q("delete from config where cat='queuework' and k='%s'",
+ 'workitem_'.self::$queueworker);
+ } else {
+ break;
+ }
+ sleep ($workersleep);
+ $work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
+ 'workitem_'.self::$queueworker,
+ dbesc('workitem:%'));
+
+ }
+ logger('Master: Worker Thread: queue items processed:' . $jobs);
+ q("delete from config where cat='queueworkers' and k='%s'",
+ 'workerstarted_'.self::$queueworker);
}
+
+ static public function ClearQueue() {
+ $work = q("select * from config where cat='queuework' and k like '%s'",
+ dbesc('workitem%'));
+ foreach ($work as $workitem) {
+ $workinfo = unserialize($workitem['v']);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc,$argv);
+ }
+ $work = q("delete from config where cat='queuework' and k like '%s'",
+ dbesc('workitem%'));
+ }
+
}