$argc,'argv'=>$argv]; q("insert into config (cat,k,v) values ('queuework','%s','%s')", dbesc(uniqid('workitem:',true)), dbesc(serialize($workinfo))); self::Process(); } } static public function GetWorkerID() { $maxworkers = get_config('system','max_queue_workers'); $maxworkers = ($maxworkers) ? $maxworkers : 3; $workermaxage = get_config('system','max_queue_worker_age'); $workermaxage = ($workermaxage) ? $workermaxage : 300; $workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%'); if (count($workers) > $maxworkers) { foreach ($workers as $idx => $worker) { $curtime = time(); $age = (intval($curtime) - intval($worker['v'])); if ( $age > $workermaxage) { logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG); $k = explode('_',$worker['k']); q("delete from config where cat='queueworkers' and k='%s'", 'workerstarted_'.$k[1]); q("update config set k='%s' where cat='queuework' and k='%s'", dbesc(uniqid('workitem:',true)), 'workitem_'.$k[1]); unset($workers[$idx]); } } if (count($workers) > $maxworkers) { return false; } } return uniqid('',true); } static public function Process() { self::$queueworker = self::GetWorkerID(); if (!self::$queueworker) { logger('Master: unable to obtain worker ID.'); killme(); } set_config('queueworkers','workerstarted_'.self::$queueworker,time()); $workersleep = get_config('system','queue_worker_sleep'); $workersleep = ($workersleep) ? $workersleep : 5; cli_startup(); $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", 'workitem_'.self::$queueworker, dbesc('workitem:%')); $jobs = 0; while ($work) { $workitem = q("select * from config where cat='queuework' and k='%s'", 'workitem_'.self::$queueworker); if (isset($workitem[0])) { $jobs++; $workinfo = unserialize($workitem[0]['v']); $argc = $workinfo['argc']; $argv = $workinfo['argv']; logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); //Delete unclaimed duplicate workitems. q("delete from config where cat='queuework' and k='workitem' and v='%s'", serialize($argv)); $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; $cls::run($argc,$argv); //Right now we assume that if we get a return, everything is OK. //At some point we may want to test whether the run returns true/false // and requeue the work to be tried again. But we probably want // to implement some sort of "retry interval" first. q("delete from config where cat='queuework' and k='%s'", 'workitem_'.self::$queueworker); } else { break; } sleep ($workersleep); $work = q("update config set k='%s' where id in (select id from config where cat='queuework' and k like '%s' limit 1)", 'workitem_'.self::$queueworker, dbesc('workitem:%')); } logger('Master: Worker Thread: queue items processed:' . $jobs); q("delete from config where cat='queueworkers' and k='%s'", 'workerstarted_'.self::$queueworker); } static public function ClearQueue() { $work = q("select * from config where cat='queuework' and k like '%s'", dbesc('workitem%')); foreach ($work as $workitem) { $workinfo = unserialize($workitem['v']); $argc = $workinfo['argc']; $argv = $workinfo['argv']; logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG); $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; $cls::run($argc,$argv); } $work = q("delete from config where cat='queuework' and k like '%s'", dbesc('workitem%')); } }