diff options
Diffstat (limited to 'Zotlabs/Lib/QueueWorker.php')
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 61 |
1 files changed, 26 insertions, 35 deletions
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php index 68e747b0f..24114438e 100644 --- a/Zotlabs/Lib/QueueWorker.php +++ b/Zotlabs/Lib/QueueWorker.php @@ -4,6 +4,9 @@ namespace Zotlabs\Lib; use Ramsey\Uuid\Uuid; use Ramsey\Uuid\Exception\UnableToBuildUuidException; +use Zotlabs\Lib\Config; + +require_once 'include/dba/dba_transaction.php'; class QueueWorker { @@ -28,18 +31,6 @@ class QueueWorker { 'Expire' ]; - private static function qstart() { - q('START TRANSACTION'); - } - - private static function qcommit() { - q("COMMIT"); - } - - private static function qrollback() { - q("ROLLBACK"); - } - public static function Summon($argv) { if ($argv[0] !== 'Queueworker') { @@ -65,7 +56,7 @@ class QueueWorker { logger('queueworker_stats_summon: cmd:' . $argv[0] . ' ' . 'timestamp:' . time()); - self::qstart(); + $transaction = new \DbaTransaction(\DBA::$dba); $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", intval($priority), $workinfo_json, @@ -73,18 +64,18 @@ class QueueWorker { dbesc($argv[0]) ); if (!$r) { - self::qrollback(); + // Transaction is autmatically rolled back on return logger("INSERT FAILED", LOGGER_DEBUG); return; } - self::qcommit(); + $transaction->commit(); logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); } $workers = self::GetWorkerCount(); if ($workers < self::$maxworkers) { logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); - $phpbin = get_config('system', 'phpbin', 'php'); + $phpbin = Config::Get('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } } @@ -111,7 +102,7 @@ class QueueWorker { return; } - self::qstart(); + $transaction = new \DbaTransaction(\DBA::$dba); $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", intval($priority), $workinfo_json, @@ -119,11 +110,11 @@ class QueueWorker { dbesc($argv[0]) ); if (!$r) { - self::qrollback(); + // Transaction is automatically rolled back on return logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG); return; } - self::qcommit(); + $transaction->commit(); logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); } @@ -132,18 +123,18 @@ class QueueWorker { public static function GetWorkerCount() { if (self::$maxworkers == 0) { - self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4); + self::$maxworkers = Config::Get('queueworker', 'max_queueworkers', 4); self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4; } if (self::$workermaxage == 0) { - self::$workermaxage = get_config('queueworker', 'max_queueworker_age'); + self::$workermaxage = Config::Get('queueworker', 'max_queueworker_age'); self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300; } - self::qstart(); + $transaction = new \DbaTransaction(\DBA::$dba); // skip locked is preferred but is not supported by mariadb < 10.6 which is still used a lot - hence make it optional - $sql_quirks = ((get_config('system', 'db_skip_locked_supported')) ? 'SKIP LOCKED' : 'NOWAIT'); + $sql_quirks = ((Config::Get('system', 'db_skip_locked_supported')) ? 'SKIP LOCKED' : 'NOWAIT'); $r = q("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NOT NULL AND workerq_processtimeout < %s FOR UPDATE $sql_quirks", db_utcnow() @@ -158,7 +149,7 @@ class QueueWorker { $u = dbq("update workerq set workerq_reservationid = null where workerq_id in ($ids)"); } - self::qcommit(); + $transaction->commit(); //q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s", //db_utcnow() @@ -196,15 +187,15 @@ class QueueWorker { private static function getWorkId() { self::GetWorkerCount(); - self::qstart(); + $transaction = new \DbaTransaction(\DBA::$dba); // skip locked is preferred but is not supported by mariadb < 10.6 which is still used a lot - hence make it optional - $sql_quirks = ((get_config('system', 'db_skip_locked_supported')) ? 'SKIP LOCKED' : 'NOWAIT'); + $sql_quirks = ((Config::Get('system', 'db_skip_locked_supported')) ? 'SKIP LOCKED' : 'NOWAIT'); $work = dbq("SELECT workerq_id, workerq_cmd FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE $sql_quirks"); if (!$work) { - self::qrollback(); + // Transaction automatically rolled back on return return false; } @@ -224,24 +215,24 @@ class QueueWorker { ); if (!$work) { - self::qrollback(); + // Transaction automatically rolled back on return logger("Could not update workerq.", LOGGER_DEBUG); return false; } logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG); - self::qcommit(); + $transaction->commit(); return $id; } public static function Process() { - $sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100)); - $auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0); + $sleep = intval(Config::Get('queueworker', 'queue_worker_sleep', 100)); + $auto_queue_worker_sleep = Config::Get('queueworker', 'auto_queue_worker_sleep', 0); if (!self::GetWorkerID()) { if ($auto_queue_worker_sleep) { - set_config('queueworker', 'queue_worker_sleep', $sleep + 100); + Config::Set('queueworker', 'queue_worker_sleep', $sleep + 100); } logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG); @@ -250,7 +241,7 @@ class QueueWorker { if ($auto_queue_worker_sleep && $sleep > 100) { $next_sleep = $sleep - 100; - set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep)); + Config::Set('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep)); } $jobs = 0; @@ -259,7 +250,7 @@ class QueueWorker { self::$workersleep = $sleep; self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100); - if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) { + if (function_exists('sys_getloadavg') && Config::Get('queueworker', 'load_average_sleep')) { // very experimental! $load_average_sleep = true; } @@ -287,7 +278,7 @@ class QueueWorker { if ($workers < self::$maxworkers) { logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); - $phpbin = get_config('system', 'phpbin', 'php'); + $phpbin = Config::Get('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); } |