From 2bb58843ab303ba3e1a4a2c0c5a64138852ebbe0 Mon Sep 17 00:00:00 2001 From: Mario Date: Fri, 2 Dec 2022 19:22:19 +0000 Subject: move queueworker to core and bump version --- Zotlabs/Lib/QueueWorker.php | 351 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 Zotlabs/Lib/QueueWorker.php (limited to 'Zotlabs/Lib/QueueWorker.php') diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php new file mode 100644 index 000000000..21983a4e2 --- /dev/null +++ b/Zotlabs/Lib/QueueWorker.php @@ -0,0 +1,351 @@ + 10, + 'Deliver' => 10, + 'Cache_query' => 10, + 'Content_importer' => 1, + 'File_importer' => 1, + 'Channel_purge' => 1, + 'Directory' => 1 + ]; + + private static function qbegin($tablename) { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q('BEGIN'); + q('LOCK TABLE ' . $tablename . ' WRITE'); + break; + + case DBTYPE_POSTGRES: + q('BEGIN'); + //q('LOCK TABLE '.$tablename.' IN ACCESS EXCLUSIVE MODE'); + break; + } + return; + } + + private static function qcommit() { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q("UNLOCK TABLES"); + q("COMMIT"); + break; + + case DBTYPE_POSTGRES: + q("COMMIT"); + break; + } + return; + } + + private static function qrollback() { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q("ROLLBACK"); + q("UNLOCK TABLES"); + break; + + case DBTYPE_POSTGRES: + q("ROLLBACK"); + break; + } + return; + } + + public static function Summon(&$argv) { + + $argc = count($argv); + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + + if(isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => $argc, 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG); + logger(print_r($workinfo,true)); + $argv = []; + return; + } + + self::qbegin('workerq'); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid) + ); + if (!$r) { + self::qrollback(); + logger("INSERT FAILED", LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + $argv = []; + + $workers = self::GetWorkerCount(); + if ($workers < self::$maxworkers) { + logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + } + + public static function Release(&$argv) { + + $argc = count($argv); + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + if(isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => $argc, 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG); + logger(print_r($workinfo,true)); + + $argv = []; + return; + } + + self::qbegin('workerq'); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid) + ); + if (!$r) { + self::qrollback(); + logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + $argv = []; + self::Process(); + } + + public static function GetWorkerCount() { + if (self::$maxworkers == 0) { + self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4); + self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4; + } + if (self::$workermaxage == 0) { + self::$workermaxage = get_config('queueworker', 'max_queueworker_age'); + self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300; + } + + q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s", + db_utcnow() + ); + + usleep(self::$workersleep); + $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null"); + logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG); + return intval($workers[0]['total']); + } + + public static function GetWorkerID() { + if (self::$queueworker) { + return self::$queueworker; + } + $wid = uniqid('', true); + usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker. + $workers = self::GetWorkerCount(); + if ($workers >= self::$maxworkers) { + logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG); + return false; + } + self::$queueworker = $wid; + return $wid; + } + + private static function getWorkId() { + self::GetWorkerCount(); + + self::qbegin('workerq'); + + if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { + $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); + } + else { + $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1;"); + } + + if (!$work) { + self::qrollback(); + return false; + } + $id = $work[0]['workerq_id']; + + $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d", + self::$queueworker, + db_utcnow(), + db_quoteinterval(self::$workermaxage . " SECOND"), + intval($id) + ); + + if (!$work) { + self::qrollback(); + logger("Could not update workerq.", LOGGER_DEBUG); + return false; + } + logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG); + self::qcommit(); + return $id; + } + + public static function Process() { + self::$workersleep = get_config('queueworker', 'queue_worker_sleep'); + self::$workersleep = intval(self::$workersleep) > 100 ? intval(self::$workersleep) : 100; + + if (!self::GetWorkerID()) { + logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG); + killme(); + } + + $jobs = 0; + $workid = self::getWorkId(); + while ($workid) { + usleep(self::$workersleep); + // @FIXME: Currently $workersleep is a fixed value. It may be a good idea + // to implement a "backoff" instead - based on load average or some + // other metric. + + self::qbegin('workerq'); + + if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { + $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED", + $workid + ); + } + else { + $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d", + $workid + ); + } + + self::qcommit(); + + if (isset($workitem[0])) { + // At least SOME work to do.... in case there's more, let's ramp up workers. + $workers = self::GetWorkerCount(); + if ($workers < self::$maxworkers) { + logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + + $jobs++; + logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG); + + $workinfo = json_decode($workitem[0]['workerq_data'], true); + $argv = $workinfo['argv']; + logger('Master: process: ' . json_encode($argv), LOGGER_DEBUG); + + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $argv = flatten_array_recursive($argv); + $argc = count($argv); + $cls::run($argc, $argv); + + // @FIXME: Right now we assume that if we get a return, everything is OK. + // At some point we may want to test whether the run returns true/false + // and requeue the work to be tried again if needed. But we probably want + // to implement some sort of "retry interval" first. + + self::qbegin('workerq'); + q("delete from workerq where workerq_id = %d", $workid); + self::qcommit(); + } + else { + logger("NO WORKITEM!", LOGGER_DEBUG); + } + $workid = self::getWorkId(); + } + logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG); + } + + public static function ClearQueue() { + $work = q("select * from workerq"); + while ($work) { + foreach ($work as $workitem) { + $workinfo = json_decode($workitem['v'], true); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG); + if (!isset($argv[0])) { + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + continue; + } + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc, $argv); + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + usleep(300000); + //Give the server .3 seconds to catch its breath between tasks. + //This will hopefully keep it from crashing to it's knees entirely + //if the last task ended up initiating other parallel processes + //(eg. polling remotes) + } + //Make sure nothing new came in + $work = q("select * from workerq"); + } + return; + } + + /** + * @brief Generate a name-based v5 UUID with custom namespace + * + * @param string $data + * @return string $uuid + */ + private static function getUuid($data) { + $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a'; + try { + $uuid = Uuid::uuid5($namespace, $data)->toString(); + } catch (UnableToBuildUuidException $e) { + logger('UUID generation failed'); + return ''; + } + return $uuid; + } + +} -- cgit v1.2.3