aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs/Lib/QueueWorker.php
diff options
context:
space:
mode:
Diffstat (limited to 'Zotlabs/Lib/QueueWorker.php')
-rw-r--r--Zotlabs/Lib/QueueWorker.php126
1 files changed, 54 insertions, 72 deletions
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php
index 2bcbdfc5c..da52fb859 100644
--- a/Zotlabs/Lib/QueueWorker.php
+++ b/Zotlabs/Lib/QueueWorker.php
@@ -2,11 +2,9 @@
namespace Zotlabs\Lib;
-
use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\Exception\UnableToBuildUuidException;
-
class QueueWorker {
public static $queueworker = null;
@@ -14,54 +12,25 @@ class QueueWorker {
public static $workermaxage = 0;
public static $workersleep = 100;
public static $default_priorities = [
- 'Notifier' => 10,
- 'Deliver' => 10,
- 'Cache_query' => 10,
- 'Content_importer' => 1,
- 'File_importer' => 1,
- 'Channel_purge' => 1,
- 'Directory' => 1
+ 'Notifier' => 10,
+ 'Deliver' => 10,
+ 'Cache_query' => 10,
+ 'Content_importer' => 1,
+ 'File_importer' => 1,
+ 'Channel_purge' => 1,
+ 'Directory' => 1
];
private static function qbegin() {
- switch (ACTIVE_DBTYPE) {
- case DBTYPE_MYSQL:
- q('BEGIN');
- break;
-
- case DBTYPE_POSTGRES:
- q('BEGIN');
- break;
- }
- return;
+ q('BEGIN');
}
private static function qcommit() {
- switch (ACTIVE_DBTYPE) {
- case DBTYPE_MYSQL:
- //q("UNLOCK TABLES");
- q("COMMIT");
- break;
-
- case DBTYPE_POSTGRES:
- q("COMMIT");
- break;
- }
- return;
+ q("COMMIT");
}
private static function qrollback() {
- switch (ACTIVE_DBTYPE) {
- case DBTYPE_MYSQL:
- q("ROLLBACK");
- //q("UNLOCK TABLES");
- break;
-
- case DBTYPE_POSTGRES:
- q("ROLLBACK");
- break;
- }
- return;
+ q("ROLLBACK");
}
public static function Summon($argv) {
@@ -70,7 +39,7 @@ class QueueWorker {
$priority = 0; // @TODO allow reprioritization
- if(isset(self::$default_priorities[$argv[0]])) {
+ if (isset(self::$default_priorities[$argv[0]])) {
$priority = self::$default_priorities[$argv[0]];
}
@@ -83,8 +52,7 @@ class QueueWorker {
);
if ($r) {
logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG);
- logger(print_r($workinfo,true));
- $argv = [];
+ logger(print_r($workinfo, true));
return;
}
@@ -100,12 +68,12 @@ class QueueWorker {
return;
}
self::qcommit();
- hz_syslog('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
+ logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
}
$workers = self::GetWorkerCount();
- if ($workers <= self::$maxworkers) {
- hz_syslog("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
+ if ($workers < self::$maxworkers) {
+ logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
$phpbin = get_config('system', 'phpbin', 'php');
proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
}
@@ -116,7 +84,7 @@ class QueueWorker {
if ($argv[0] !== 'Queueworker') {
$priority = 0; // @TODO allow reprioritization
- if(isset(self::$default_priorities[$argv[0]])) {
+ if (isset(self::$default_priorities[$argv[0]])) {
$priority = self::$default_priorities[$argv[0]];
}
@@ -129,9 +97,7 @@ class QueueWorker {
);
if ($r) {
logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG);
- logger(print_r($workinfo,true));
-
- $argv = [];
+ logger(print_r($workinfo, true));
return;
}
@@ -177,14 +143,18 @@ class QueueWorker {
if (self::$queueworker) {
return self::$queueworker;
}
+
$wid = uniqid('', true);
- usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker.
- $workers = self::GetWorkerCount();
- if ($workers >= self::$maxworkers) {
+
+ usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker.
+
+ if (self::GetWorkerCount() >= self::$maxworkers) {
logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
return false;
}
+
self::$queueworker = $wid;
+
return $wid;
}
@@ -193,17 +163,13 @@ class QueueWorker {
self::qbegin();
- if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
- $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
- }
- else {
- $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
- }
+ $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
if (!$work) {
self::qrollback();
return false;
}
+
$id = $work[0]['workerq_id'];
$work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
@@ -224,25 +190,38 @@ class QueueWorker {
}
public static function Process() {
+ $sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100));
+ $auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0);
+
if (!self::GetWorkerID()) {
- hz_syslog('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
+ if ($auto_queue_worker_sleep) {
+ set_config('queueworker', 'queue_worker_sleep', $sleep + 100);
+ }
+
+ logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
killme();
}
- $jobs = 0;
- $workid = self::getWorkId();
+ if ($auto_queue_worker_sleep && $sleep > 100) {
+ $next_sleep = $sleep - 100;
+ set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep));
+ }
+
+ $jobs = 0;
+ $workid = self::getWorkId();
$load_average_sleep = false;
- self::$workersleep = get_config('queueworker', 'queue_worker_sleep');
- self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);
+ self::$workersleep = $sleep;
+ self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);
if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) {
+ // experimental!
$load_average_sleep = true;
}
while ($workid) {
if ($load_average_sleep) {
- $load_average = sys_getloadavg();
+ $load_average = sys_getloadavg();
self::$workersleep = intval($load_average[0]) * 10000;
if (!self::$workersleep) {
@@ -250,6 +229,8 @@ class QueueWorker {
}
}
+ logger('queue_worker_sleep: ' . self::$workersleep, LOGGER_DEBUG);
+
usleep(self::$workersleep);
self::qbegin();
@@ -259,8 +240,8 @@ class QueueWorker {
if (isset($workitem[0])) {
// At least SOME work to do.... in case there's more, let's ramp up workers.
$workers = self::GetWorkerCount();
- if ($workers <= self::$maxworkers) {
- logger("Less <= max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
+ if ($workers < self::$maxworkers) {
+ logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
$phpbin = get_config('system', 'phpbin', 'php');
proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
}
@@ -277,9 +258,11 @@ class QueueWorker {
$rnd = random_string();
- hz_syslog('PROCESSING: ' . $rnd . ' ' . print_r($argv,true));
+ logger('PROCESSING: ' . $rnd . ' ' . print_r($argv, true));
+
$cls::run($argc, $argv);
- hz_syslog('COMPLETED: ' . $rnd);
+
+ logger('COMPLETED: ' . $rnd);
// @FIXME: Right now we assume that if we get a return, everything is OK.
// At some point we may want to test whether the run returns true/false
@@ -326,7 +309,6 @@ class QueueWorker {
//Make sure nothing new came in
$work = q("select * from workerq");
}
- return;
}
/**
@@ -335,7 +317,7 @@ class QueueWorker {
* @param string $data
* @return string $uuid
*/
- private static function getUuid($data) {
+ private static function getUuid(string $data) {
$namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a';
try {
$uuid = Uuid::uuid5($namespace, $data)->toString();