aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Zotlabs/Lib/QueueWorker.php54
-rw-r--r--boot.php2
2 files changed, 39 insertions, 17 deletions
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php
index da52fb859..eecf79907 100644
--- a/Zotlabs/Lib/QueueWorker.php
+++ b/Zotlabs/Lib/QueueWorker.php
@@ -21,8 +21,14 @@ class QueueWorker {
'Directory' => 1
];
- private static function qbegin() {
- q('BEGIN');
+ // Exceptions for processtimeout value.
+ // Currently the value is overriden with 3600 seconds (1h).
+ public static $long_running_cmd = [
+ 'Queue'
+ ];
+
+ private static function qstart() {
+ q('START TRANSACTION');
}
private static function qcommit() {
@@ -56,11 +62,12 @@ class QueueWorker {
return;
}
- self::qbegin();
- $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
+ self::qstart();
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')",
intval($priority),
$workinfo_json,
- dbesc($uuid)
+ dbesc($uuid),
+ dbesc($argv[0])
);
if (!$r) {
self::qrollback();
@@ -101,11 +108,12 @@ class QueueWorker {
return;
}
- self::qbegin();
- $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
+ self::qstart();
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')",
intval($priority),
$workinfo_json,
- dbesc($uuid)
+ dbesc($uuid),
+ dbesc($argv[0])
);
if (!$r) {
self::qrollback();
@@ -133,7 +141,8 @@ class QueueWorker {
db_utcnow()
);
- usleep(self::$workersleep);
+ //usleep(self::$workersleep);
+
$workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null");
logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG);
return intval($workers[0]['total']);
@@ -148,7 +157,9 @@ class QueueWorker {
usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker.
- if (self::GetWorkerCount() >= self::$maxworkers) {
+ $workers = self::GetWorkerCount();
+
+ if ($workers >= self::$maxworkers) {
logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
return false;
}
@@ -161,21 +172,30 @@ class QueueWorker {
private static function getWorkId() {
self::GetWorkerCount();
- self::qbegin();
+ self::qstart();
- $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
+ // This is probably the better solution but is not supported by mariadb < 10.6
+ // $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
+
+ $work = dbq("SELECT workerq_id, workerq_cmd FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE;");
if (!$work) {
- self::qrollback();
+ self::qcommit();
return false;
}
$id = $work[0]['workerq_id'];
+ $cmd = $work[0]['workerq_cmd'];
+ $age = self::$workermaxage;
+
+ if (in_array($cmd, self::$long_running_cmd)) {
+ $age = 3600; // 1h TODO: make this configurable
+ }
$work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
self::$queueworker,
db_utcnow(),
- db_quoteinterval(self::$workermaxage . " SECOND"),
+ db_quoteinterval($age . " SECOND"),
intval($id)
);
@@ -184,8 +204,10 @@ class QueueWorker {
logger("Could not update workerq.", LOGGER_DEBUG);
return false;
}
+
logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG);
self::qcommit();
+
return $id;
}
@@ -233,7 +255,7 @@ class QueueWorker {
usleep(self::$workersleep);
- self::qbegin();
+ self::qstart();
$workitem = dbq("SELECT * FROM workerq WHERE workerq_id = $workid");
self::qcommit();
@@ -269,7 +291,7 @@ class QueueWorker {
// and requeue the work to be tried again if needed. But we probably want
// to implement some sort of "retry interval" first.
- self::qbegin();
+ self::qstart();
dbq("delete from workerq where workerq_id = $workid");
self::qcommit();
}
diff --git a/boot.php b/boot.php
index 8ee72843e..ea8a592dc 100644
--- a/boot.php
+++ b/boot.php
@@ -63,7 +63,7 @@ define('PLATFORM_NAME', 'hubzilla');
define('STD_VERSION', '7.9.14');
define('ZOT_REVISION', '6.0');
-define('DB_UPDATE_VERSION', 1254);
+define('DB_UPDATE_VERSION', 1255);
define('PROJECT_BASE', __DIR__);