From 4d54755057047f6947285ee0db6e89fa12f294c8 Mon Sep 17 00:00:00 2001 From: Mario Date: Thu, 15 Dec 2022 16:53:17 +0000 Subject: queueworker: fix maxworkers check and cleanup --- Zotlabs/Lib/QueueWorker.php | 35 ++++++++++------------------------- 1 file changed, 10 insertions(+), 25 deletions(-) (limited to 'Zotlabs') diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php index b4ee6c327..2bcbdfc5c 100644 --- a/Zotlabs/Lib/QueueWorker.php +++ b/Zotlabs/Lib/QueueWorker.php @@ -23,16 +23,14 @@ class QueueWorker { 'Directory' => 1 ]; - private static function qbegin($tablename) { + private static function qbegin() { switch (ACTIVE_DBTYPE) { case DBTYPE_MYSQL: q('BEGIN'); - //q('LOCK TABLE ' . $tablename . ' WRITE'); break; case DBTYPE_POSTGRES: q('BEGIN'); - //q('LOCK TABLE '.$tablename.' IN ACCESS EXCLUSIVE MODE'); break; } return; @@ -90,7 +88,7 @@ class QueueWorker { return; } - self::qbegin('workerq'); + self::qbegin(); $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", intval($priority), $workinfo_json, @@ -106,8 +104,8 @@ class QueueWorker { } $workers = self::GetWorkerCount(); - if ($workers < self::$maxworkers) { - hz_syslog("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + if ($workers <= self::$maxworkers) { + hz_syslog("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); $phpbin = get_config('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } @@ -137,7 +135,7 @@ class QueueWorker { return; } - self::qbegin('workerq'); + self::qbegin(); $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", intval($priority), $workinfo_json, @@ -193,7 +191,7 @@ class QueueWorker { private static function getWorkId() { self::GetWorkerCount(); - self::qbegin('workerq'); + self::qbegin(); if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); @@ -254,28 +252,15 @@ class QueueWorker { usleep(self::$workersleep); - self::qbegin('workerq'); - - //if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { - //$workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED", - //$workid - //); - //} - //else { - //$workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED", - //$workid - //); - //} - + self::qbegin(); $workitem = dbq("SELECT * FROM workerq WHERE workerq_id = $workid"); - self::qcommit(); if (isset($workitem[0])) { // At least SOME work to do.... in case there's more, let's ramp up workers. $workers = self::GetWorkerCount(); - if ($workers < self::$maxworkers) { - logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + if ($workers <= self::$maxworkers) { + logger("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); $phpbin = get_config('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } @@ -301,7 +286,7 @@ class QueueWorker { // and requeue the work to be tried again if needed. But we probably want // to implement some sort of "retry interval" first. - self::qbegin('workerq'); + self::qbegin(); dbq("delete from workerq where workerq_id = $workid"); self::qcommit(); } -- cgit v1.2.3