diff options
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 54 | ||||
-rw-r--r-- | boot.php | 2 |
2 files changed, 39 insertions, 17 deletions
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php index da52fb859..eecf79907 100644 --- a/Zotlabs/Lib/QueueWorker.php +++ b/Zotlabs/Lib/QueueWorker.php @@ -21,8 +21,14 @@ class QueueWorker { 'Directory' => 1 ]; - private static function qbegin() { - q('BEGIN'); + // Exceptions for processtimeout value. + // Currently the value is overriden with 3600 seconds (1h). + public static $long_running_cmd = [ + 'Queue' + ]; + + private static function qstart() { + q('START TRANSACTION'); } private static function qcommit() { @@ -56,11 +62,12 @@ class QueueWorker { return; } - self::qbegin(); - $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + self::qstart(); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", intval($priority), $workinfo_json, - dbesc($uuid) + dbesc($uuid), + dbesc($argv[0]) ); if (!$r) { self::qrollback(); @@ -101,11 +108,12 @@ class QueueWorker { return; } - self::qbegin(); - $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + self::qstart(); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", intval($priority), $workinfo_json, - dbesc($uuid) + dbesc($uuid), + dbesc($argv[0]) ); if (!$r) { self::qrollback(); @@ -133,7 +141,8 @@ class QueueWorker { db_utcnow() ); - usleep(self::$workersleep); + //usleep(self::$workersleep); + $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null"); logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG); return intval($workers[0]['total']); @@ -148,7 +157,9 @@ class QueueWorker { usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker. - if (self::GetWorkerCount() >= self::$maxworkers) { + $workers = self::GetWorkerCount(); + + if ($workers >= self::$maxworkers) { logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG); return false; } @@ -161,21 +172,30 @@ class QueueWorker { private static function getWorkId() { self::GetWorkerCount(); - self::qbegin(); + self::qstart(); - $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); + // This is probably the better solution but is not supported by mariadb < 10.6 + // $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); + + $work = dbq("SELECT workerq_id, workerq_cmd FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE;"); if (!$work) { - self::qrollback(); + self::qcommit(); return false; } $id = $work[0]['workerq_id']; + $cmd = $work[0]['workerq_cmd']; + $age = self::$workermaxage; + + if (in_array($cmd, self::$long_running_cmd)) { + $age = 3600; // 1h TODO: make this configurable + } $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d", self::$queueworker, db_utcnow(), - db_quoteinterval(self::$workermaxage . " SECOND"), + db_quoteinterval($age . " SECOND"), intval($id) ); @@ -184,8 +204,10 @@ class QueueWorker { logger("Could not update workerq.", LOGGER_DEBUG); return false; } + logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG); self::qcommit(); + return $id; } @@ -233,7 +255,7 @@ class QueueWorker { usleep(self::$workersleep); - self::qbegin(); + self::qstart(); $workitem = dbq("SELECT * FROM workerq WHERE workerq_id = $workid"); self::qcommit(); @@ -269,7 +291,7 @@ class QueueWorker { // and requeue the work to be tried again if needed. But we probably want // to implement some sort of "retry interval" first. - self::qbegin(); + self::qstart(); dbq("delete from workerq where workerq_id = $workid"); self::qcommit(); } @@ -63,7 +63,7 @@ define('PLATFORM_NAME', 'hubzilla'); define('STD_VERSION', '7.9.14'); define('ZOT_REVISION', '6.0'); -define('DB_UPDATE_VERSION', 1254); +define('DB_UPDATE_VERSION', 1255); define('PROJECT_BASE', __DIR__); |