diff options
Diffstat (limited to 'Zotlabs/Lib/QueueWorker.php')
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 126 |
1 files changed, 54 insertions, 72 deletions
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php index 2bcbdfc5c..da52fb859 100644 --- a/Zotlabs/Lib/QueueWorker.php +++ b/Zotlabs/Lib/QueueWorker.php @@ -2,11 +2,9 @@ namespace Zotlabs\Lib; - use Ramsey\Uuid\Uuid; use Ramsey\Uuid\Exception\UnableToBuildUuidException; - class QueueWorker { public static $queueworker = null; @@ -14,54 +12,25 @@ class QueueWorker { public static $workermaxage = 0; public static $workersleep = 100; public static $default_priorities = [ - 'Notifier' => 10, - 'Deliver' => 10, - 'Cache_query' => 10, - 'Content_importer' => 1, - 'File_importer' => 1, - 'Channel_purge' => 1, - 'Directory' => 1 + 'Notifier' => 10, + 'Deliver' => 10, + 'Cache_query' => 10, + 'Content_importer' => 1, + 'File_importer' => 1, + 'Channel_purge' => 1, + 'Directory' => 1 ]; private static function qbegin() { - switch (ACTIVE_DBTYPE) { - case DBTYPE_MYSQL: - q('BEGIN'); - break; - - case DBTYPE_POSTGRES: - q('BEGIN'); - break; - } - return; + q('BEGIN'); } private static function qcommit() { - switch (ACTIVE_DBTYPE) { - case DBTYPE_MYSQL: - //q("UNLOCK TABLES"); - q("COMMIT"); - break; - - case DBTYPE_POSTGRES: - q("COMMIT"); - break; - } - return; + q("COMMIT"); } private static function qrollback() { - switch (ACTIVE_DBTYPE) { - case DBTYPE_MYSQL: - q("ROLLBACK"); - //q("UNLOCK TABLES"); - break; - - case DBTYPE_POSTGRES: - q("ROLLBACK"); - break; - } - return; + q("ROLLBACK"); } public static function Summon($argv) { @@ -70,7 +39,7 @@ class QueueWorker { $priority = 0; // @TODO allow reprioritization - if(isset(self::$default_priorities[$argv[0]])) { + if (isset(self::$default_priorities[$argv[0]])) { $priority = self::$default_priorities[$argv[0]]; } @@ -83,8 +52,7 @@ class QueueWorker { ); if ($r) { logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG); - logger(print_r($workinfo,true)); - $argv = []; + logger(print_r($workinfo, true)); return; } @@ -100,12 +68,12 @@ class QueueWorker { return; } self::qcommit(); - hz_syslog('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); } $workers = self::GetWorkerCount(); - if ($workers <= self::$maxworkers) { - hz_syslog("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + if ($workers < self::$maxworkers) { + logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); $phpbin = get_config('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } @@ -116,7 +84,7 @@ class QueueWorker { if ($argv[0] !== 'Queueworker') { $priority = 0; // @TODO allow reprioritization - if(isset(self::$default_priorities[$argv[0]])) { + if (isset(self::$default_priorities[$argv[0]])) { $priority = self::$default_priorities[$argv[0]]; } @@ -129,9 +97,7 @@ class QueueWorker { ); if ($r) { logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG); - logger(print_r($workinfo,true)); - - $argv = []; + logger(print_r($workinfo, true)); return; } @@ -177,14 +143,18 @@ class QueueWorker { if (self::$queueworker) { return self::$queueworker; } + $wid = uniqid('', true); - usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker. - $workers = self::GetWorkerCount(); - if ($workers >= self::$maxworkers) { + + usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker. + + if (self::GetWorkerCount() >= self::$maxworkers) { logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG); return false; } + self::$queueworker = $wid; + return $wid; } @@ -193,17 +163,13 @@ class QueueWorker { self::qbegin(); - if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { - $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); - } - else { - $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); - } + $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); if (!$work) { self::qrollback(); return false; } + $id = $work[0]['workerq_id']; $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d", @@ -224,25 +190,38 @@ class QueueWorker { } public static function Process() { + $sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100)); + $auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0); + if (!self::GetWorkerID()) { - hz_syslog('Unable to get worker ID. Exiting.', LOGGER_DEBUG); + if ($auto_queue_worker_sleep) { + set_config('queueworker', 'queue_worker_sleep', $sleep + 100); + } + + logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG); killme(); } - $jobs = 0; - $workid = self::getWorkId(); + if ($auto_queue_worker_sleep && $sleep > 100) { + $next_sleep = $sleep - 100; + set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep)); + } + + $jobs = 0; + $workid = self::getWorkId(); $load_average_sleep = false; - self::$workersleep = get_config('queueworker', 'queue_worker_sleep'); - self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100); + self::$workersleep = $sleep; + self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100); if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) { + // experimental! $load_average_sleep = true; } while ($workid) { if ($load_average_sleep) { - $load_average = sys_getloadavg(); + $load_average = sys_getloadavg(); self::$workersleep = intval($load_average[0]) * 10000; if (!self::$workersleep) { @@ -250,6 +229,8 @@ class QueueWorker { } } + logger('queue_worker_sleep: ' . self::$workersleep, LOGGER_DEBUG); + usleep(self::$workersleep); self::qbegin(); @@ -259,8 +240,8 @@ class QueueWorker { if (isset($workitem[0])) { // At least SOME work to do.... in case there's more, let's ramp up workers. $workers = self::GetWorkerCount(); - if ($workers <= self::$maxworkers) { - logger("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + if ($workers < self::$maxworkers) { + logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); $phpbin = get_config('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } @@ -277,9 +258,11 @@ class QueueWorker { $rnd = random_string(); - hz_syslog('PROCESSING: ' . $rnd . ' ' . print_r($argv,true)); + logger('PROCESSING: ' . $rnd . ' ' . print_r($argv, true)); + $cls::run($argc, $argv); - hz_syslog('COMPLETED: ' . $rnd); + + logger('COMPLETED: ' . $rnd); // @FIXME: Right now we assume that if we get a return, everything is OK. // At some point we may want to test whether the run returns true/false @@ -326,7 +309,6 @@ class QueueWorker { //Make sure nothing new came in $work = q("select * from workerq"); } - return; } /** @@ -335,7 +317,7 @@ class QueueWorker { * @param string $data * @return string $uuid */ - private static function getUuid($data) { + private static function getUuid(string $data) { $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a'; try { $uuid = Uuid::uuid5($namespace, $data)->toString(); |