aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs
diff options
context:
space:
mode:
Diffstat (limited to 'Zotlabs')
-rw-r--r--Zotlabs/Daemon/Cron.php8
-rw-r--r--Zotlabs/Daemon/Master.php11
-rw-r--r--Zotlabs/Daemon/Poller.php16
-rw-r--r--Zotlabs/Lib/Libsync.php9
-rw-r--r--Zotlabs/Lib/QueueWorker.php351
-rw-r--r--Zotlabs/Module/Import.php2
-rw-r--r--Zotlabs/Module/Queueworker.php122
-rw-r--r--Zotlabs/Update/_1254.php55
8 files changed, 569 insertions, 5 deletions
diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php
index 6629491de..c3158a4eb 100644
--- a/Zotlabs/Daemon/Cron.php
+++ b/Zotlabs/Daemon/Cron.php
@@ -50,7 +50,7 @@ class Cron {
require_once('include/account.php');
remove_expired_registrations();
- $interval = get_config('system', 'delivery_interval', 3);
+ //$interval = get_config('system', 'delivery_interval', 3);
// expire any expired items
@@ -65,8 +65,10 @@ class Cron {
if ($rr['item_wall']) {
// The notifier isn't normally invoked unless item_drop is interactive.
Master::Summon(['Notifier', 'drop', $rr['id']]);
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
}
@@ -96,8 +98,10 @@ class Cron {
if ($r) {
foreach ($r as $rr) {
Master::Summon(array('Directory', $rr['channel_id'], 'force'));
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
@@ -151,8 +155,10 @@ class Cron {
);
}
Master::Summon(array('Notifier', 'wall-new', $rr['id']));
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
}
diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php
index 6fa656be5..495718bf4 100644
--- a/Zotlabs/Daemon/Master.php
+++ b/Zotlabs/Daemon/Master.php
@@ -2,6 +2,8 @@
namespace Zotlabs\Daemon;
+use Zotlabs\Lib\QueueWorker;
+
if (array_search(__file__, get_included_files()) === 0) {
require_once('include/cli_startup.php');
array_shift($argv);
@@ -16,6 +18,10 @@ if (array_search(__file__, get_included_files()) === 0) {
class Master {
static public function Summon($arr) {
+
+ QueueWorker::Summon($arr);
+ return;
+/*
$hookinfo = [
'argv' => $arr
];
@@ -32,11 +38,15 @@ class Master {
$phpbin = get_config('system', 'phpbin', 'php');
proc_run($phpbin, 'Zotlabs/Daemon/Master.php', $arr);
+*/
}
static public function Release($argc, $argv) {
cli_startup();
+ QueueWorker::Release($argv);
+ return;
+/*
$hookinfo = [
'argv' => $argv
];
@@ -54,5 +64,6 @@ class Master {
logger('Master: release: ' . json_encode($argv), LOGGER_ALL, LOG_DEBUG);
$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
$cls::run($argc, $argv);
+*/
}
}
diff --git a/Zotlabs/Daemon/Poller.php b/Zotlabs/Daemon/Poller.php
index 63c498f17..5bf8d3a02 100644
--- a/Zotlabs/Daemon/Poller.php
+++ b/Zotlabs/Daemon/Poller.php
@@ -17,10 +17,12 @@ class Poller {
}
}
+/*
$interval = intval(get_config('system', 'poll_interval'));
if (!$interval)
$interval = ((get_config('system', 'delivery_interval') === false) ? 3 : intval(get_config('system', 'delivery_interval')));
+
// Check for a lockfile. If it exists, but is over an hour old, it's stale. Ignore it.
$lockfile = 'store/[data]/poller';
if ((file_exists($lockfile)) && (filemtime($lockfile) > (time() - 3600))
@@ -32,6 +34,7 @@ class Poller {
// Create a lockfile. Needs two vars, but $x doesn't need to contain anything.
$x = '';
file_put_contents($lockfile, $x);
+*/
logger('poller: start');
@@ -103,8 +106,10 @@ class Poller {
if ($t < $x) {
Master::Summon(['Onepoll', $contact['abook_id']]);
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
continue;
@@ -167,8 +172,11 @@ class Poller {
continue;
Master::Summon(['Onepoll', $contact['abook_id']]);
+
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
@@ -190,9 +198,13 @@ class Poller {
if ($rr['ud_last'] > NULL_DATE)
if ($rr['ud_last'] > datetime_convert('UTC', 'UTC', 'now - 1 day'))
continue;
+
Master::Summon(['Onedirsync', $rr['ud_id']]);
+
+ /*
if ($interval)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
}
@@ -200,9 +212,9 @@ class Poller {
set_config('system', 'lastpoll', datetime_convert());
//All done - clear the lockfile
-
+/*
@unlink($lockfile);
-
+*/
return;
}
}
diff --git a/Zotlabs/Lib/Libsync.php b/Zotlabs/Lib/Libsync.php
index fd9886f71..b02ae4c69 100644
--- a/Zotlabs/Lib/Libsync.php
+++ b/Zotlabs/Lib/Libsync.php
@@ -135,8 +135,10 @@ class Libsync {
$info['collection_members'] = $r;
}
+ /*
$interval = ((get_config('system', 'delivery_interval') !== false)
? intval(get_config('system', 'delivery_interval')) : 2);
+ */
logger('Packet: ' . print_r($info, true), LOGGER_DATA, LOG_DEBUG);
@@ -155,19 +157,24 @@ class Libsync {
]);
+ /*
$x = q("select count(outq_hash) as total from outq where outq_delivered = 0");
+
if (intval($x[0]['total']) > intval(get_config('system', 'force_queue_threshold', 3000))) {
logger('immediate delivery deferred.', LOGGER_DEBUG, LOG_INFO);
Queue::update($hash);
continue;
}
-
+ */
Master::Summon(['Deliver', $hash]);
+
+ /*
$total = $total - 1;
if ($interval && $total)
@time_sleep_until(microtime(true) + (float)$interval);
+ */
}
}
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php
new file mode 100644
index 000000000..21983a4e2
--- /dev/null
+++ b/Zotlabs/Lib/QueueWorker.php
@@ -0,0 +1,351 @@
+<?php
+
+namespace Zotlabs\Lib;
+
+
+use Ramsey\Uuid\Uuid;
+use Ramsey\Uuid\Exception\UnableToBuildUuidException;
+
+
+class QueueWorker {
+
+ public static $queueworker = null;
+ public static $maxworkers = 0;
+ public static $workermaxage = 0;
+ public static $workersleep = 100;
+ public static $default_priorities = [
+ 'Notifier' => 10,
+ 'Deliver' => 10,
+ 'Cache_query' => 10,
+ 'Content_importer' => 1,
+ 'File_importer' => 1,
+ 'Channel_purge' => 1,
+ 'Directory' => 1
+ ];
+
+ private static function qbegin($tablename) {
+ switch (ACTIVE_DBTYPE) {
+ case DBTYPE_MYSQL:
+ q('BEGIN');
+ q('LOCK TABLE ' . $tablename . ' WRITE');
+ break;
+
+ case DBTYPE_POSTGRES:
+ q('BEGIN');
+ //q('LOCK TABLE '.$tablename.' IN ACCESS EXCLUSIVE MODE');
+ break;
+ }
+ return;
+ }
+
+ private static function qcommit() {
+ switch (ACTIVE_DBTYPE) {
+ case DBTYPE_MYSQL:
+ q("UNLOCK TABLES");
+ q("COMMIT");
+ break;
+
+ case DBTYPE_POSTGRES:
+ q("COMMIT");
+ break;
+ }
+ return;
+ }
+
+ private static function qrollback() {
+ switch (ACTIVE_DBTYPE) {
+ case DBTYPE_MYSQL:
+ q("ROLLBACK");
+ q("UNLOCK TABLES");
+ break;
+
+ case DBTYPE_POSTGRES:
+ q("ROLLBACK");
+ break;
+ }
+ return;
+ }
+
+ public static function Summon(&$argv) {
+
+ $argc = count($argv);
+
+ if ($argv[0] !== 'Queueworker') {
+
+ $priority = 0; // @TODO allow reprioritization
+
+ if(isset(self::$default_priorities[$argv[0]])) {
+ $priority = self::$default_priorities[$argv[0]];
+ }
+
+ $workinfo = ['argc' => $argc, 'argv' => $argv];
+ $workinfo_json = json_encode($workinfo);
+ $uuid = self::getUuid($workinfo_json);
+
+ $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
+ dbesc($uuid)
+ );
+ if ($r) {
+ logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG);
+ logger(print_r($workinfo,true));
+ $argv = [];
+ return;
+ }
+
+ self::qbegin('workerq');
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
+ intval($priority),
+ $workinfo_json,
+ dbesc($uuid)
+ );
+ if (!$r) {
+ self::qrollback();
+ logger("INSERT FAILED", LOGGER_DEBUG);
+ return;
+ }
+ self::qcommit();
+ logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
+ }
+ $argv = [];
+
+ $workers = self::GetWorkerCount();
+ if ($workers < self::$maxworkers) {
+ logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
+ $phpbin = get_config('system', 'phpbin', 'php');
+ proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
+ }
+ }
+
+ public static function Release(&$argv) {
+
+ $argc = count($argv);
+
+ if ($argv[0] !== 'Queueworker') {
+
+ $priority = 0; // @TODO allow reprioritization
+ if(isset(self::$default_priorities[$argv[0]])) {
+ $priority = self::$default_priorities[$argv[0]];
+ }
+
+ $workinfo = ['argc' => $argc, 'argv' => $argv];
+ $workinfo_json = json_encode($workinfo);
+ $uuid = self::getUuid($workinfo_json);
+
+ $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
+ dbesc($uuid)
+ );
+ if ($r) {
+ logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG);
+ logger(print_r($workinfo,true));
+
+ $argv = [];
+ return;
+ }
+
+ self::qbegin('workerq');
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
+ intval($priority),
+ $workinfo_json,
+ dbesc($uuid)
+ );
+ if (!$r) {
+ self::qrollback();
+ logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG);
+ return;
+ }
+ self::qcommit();
+ logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
+ }
+ $argv = [];
+ self::Process();
+ }
+
+ public static function GetWorkerCount() {
+ if (self::$maxworkers == 0) {
+ self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4);
+ self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4;
+ }
+ if (self::$workermaxage == 0) {
+ self::$workermaxage = get_config('queueworker', 'max_queueworker_age');
+ self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300;
+ }
+
+ q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s",
+ db_utcnow()
+ );
+
+ usleep(self::$workersleep);
+ $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null");
+ logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG);
+ return intval($workers[0]['total']);
+ }
+
+ public static function GetWorkerID() {
+ if (self::$queueworker) {
+ return self::$queueworker;
+ }
+ $wid = uniqid('', true);
+ usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker.
+ $workers = self::GetWorkerCount();
+ if ($workers >= self::$maxworkers) {
+ logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
+ return false;
+ }
+ self::$queueworker = $wid;
+ return $wid;
+ }
+
+ private static function getWorkId() {
+ self::GetWorkerCount();
+
+ self::qbegin('workerq');
+
+ if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
+ $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
+ }
+ else {
+ $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1;");
+ }
+
+ if (!$work) {
+ self::qrollback();
+ return false;
+ }
+ $id = $work[0]['workerq_id'];
+
+ $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
+ self::$queueworker,
+ db_utcnow(),
+ db_quoteinterval(self::$workermaxage . " SECOND"),
+ intval($id)
+ );
+
+ if (!$work) {
+ self::qrollback();
+ logger("Could not update workerq.", LOGGER_DEBUG);
+ return false;
+ }
+ logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG);
+ self::qcommit();
+ return $id;
+ }
+
+ public static function Process() {
+ self::$workersleep = get_config('queueworker', 'queue_worker_sleep');
+ self::$workersleep = intval(self::$workersleep) > 100 ? intval(self::$workersleep) : 100;
+
+ if (!self::GetWorkerID()) {
+ logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
+ killme();
+ }
+
+ $jobs = 0;
+ $workid = self::getWorkId();
+ while ($workid) {
+ usleep(self::$workersleep);
+ // @FIXME: Currently $workersleep is a fixed value. It may be a good idea
+ // to implement a "backoff" instead - based on load average or some
+ // other metric.
+
+ self::qbegin('workerq');
+
+ if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
+ $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED",
+ $workid
+ );
+ }
+ else {
+ $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d",
+ $workid
+ );
+ }
+
+ self::qcommit();
+
+ if (isset($workitem[0])) {
+ // At least SOME work to do.... in case there's more, let's ramp up workers.
+ $workers = self::GetWorkerCount();
+ if ($workers < self::$maxworkers) {
+ logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
+ $phpbin = get_config('system', 'phpbin', 'php');
+ proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
+ }
+
+ $jobs++;
+ logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG);
+
+ $workinfo = json_decode($workitem[0]['workerq_data'], true);
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . json_encode($argv), LOGGER_DEBUG);
+
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $argv = flatten_array_recursive($argv);
+ $argc = count($argv);
+ $cls::run($argc, $argv);
+
+ // @FIXME: Right now we assume that if we get a return, everything is OK.
+ // At some point we may want to test whether the run returns true/false
+ // and requeue the work to be tried again if needed. But we probably want
+ // to implement some sort of "retry interval" first.
+
+ self::qbegin('workerq');
+ q("delete from workerq where workerq_id = %d", $workid);
+ self::qcommit();
+ }
+ else {
+ logger("NO WORKITEM!", LOGGER_DEBUG);
+ }
+ $workid = self::getWorkId();
+ }
+ logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG);
+ }
+
+ public static function ClearQueue() {
+ $work = q("select * from workerq");
+ while ($work) {
+ foreach ($work as $workitem) {
+ $workinfo = json_decode($workitem['v'], true);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+ logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG);
+ if (!isset($argv[0])) {
+ q("delete from workerq where workerq_id = %d",
+ $work[0]['workerq_id']
+ );
+ continue;
+ }
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc, $argv);
+ q("delete from workerq where workerq_id = %d",
+ $work[0]['workerq_id']
+ );
+ usleep(300000);
+ //Give the server .3 seconds to catch its breath between tasks.
+ //This will hopefully keep it from crashing to it's knees entirely
+ //if the last task ended up initiating other parallel processes
+ //(eg. polling remotes)
+ }
+ //Make sure nothing new came in
+ $work = q("select * from workerq");
+ }
+ return;
+ }
+
+ /**
+ * @brief Generate a name-based v5 UUID with custom namespace
+ *
+ * @param string $data
+ * @return string $uuid
+ */
+ private static function getUuid($data) {
+ $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a';
+ try {
+ $uuid = Uuid::uuid5($namespace, $data)->toString();
+ } catch (UnableToBuildUuidException $e) {
+ logger('UUID generation failed');
+ return '';
+ }
+ return $uuid;
+ }
+
+}
diff --git a/Zotlabs/Module/Import.php b/Zotlabs/Module/Import.php
index ec47e370b..c8a9ac5ed 100644
--- a/Zotlabs/Module/Import.php
+++ b/Zotlabs/Module/Import.php
@@ -536,7 +536,7 @@ class Import extends Controller {
$since = datetime_convert(date_default_timezone_get(), date_default_timezone_get(), '0001-01-01 00:00');
$until = datetime_convert(date_default_timezone_get(), date_default_timezone_get(), 'now + 1 day');
- $poll_interval = get_config('system', 'poll_interval', 3);
+ //$poll_interval = get_config('system', 'poll_interval', 3);
$page = 0;
Master::Summon(['Content_importer', sprintf('%d', $page), $since, $until, $channel['channel_address'], urlencode($hz_server)]);
diff --git a/Zotlabs/Module/Queueworker.php b/Zotlabs/Module/Queueworker.php
new file mode 100644
index 000000000..808a9ed61
--- /dev/null
+++ b/Zotlabs/Module/Queueworker.php
@@ -0,0 +1,122 @@
+<?php
+
+namespace Zotlabs\Module;
+
+use App;
+use Zotlabs\Web\Controller;
+
+class Queueworker extends Controller {
+
+ function init() {
+
+ }
+
+ function post() {
+
+ if ((!local_channel()) || (!is_site_admin())) {
+ goaway(z_root() . '/queueworker');
+ }
+
+ check_form_security_token('form_security_token', 'queueworker');
+
+ $maxqueueworkers = intval($_POST['queueworker_maxworkers']);
+ $maxqueueworkers = ($maxqueueworkers > 3) ? $maxqueueworkers : 4;
+ set_config('queueworker', 'max_queueworkers', $maxqueueworkers);
+
+ $maxworkerage = intval($_POST['queueworker_max_age']);
+ $maxworkerage = ($maxworkerage >= 120) ? $maxworkerage : 300;
+ set_config('queueworker', 'queueworker_max_age', $maxworkerage);
+
+ $queueworkersleep = intval($_POST['queue_worker_sleep']);
+ $queueworkersleep = ($queueworkersleep > 100) ? $queueworkersleep : 100;
+ set_config('queueworker', 'queue_worker_sleep', $queueworkersleep);
+
+ goaway(z_root() . '/queueworker');
+ }
+
+ function get() {
+
+ $content = "<H1>ERROR: Page not found</H1>";
+ App::$error = 404;
+
+ if (!local_channel()) {
+ return $content;
+ }
+
+ if (!(is_site_admin())) {
+ return $content;
+ }
+
+ load_config("queueworker");
+
+ $content = "<H1>Queue Status</H1>\n";
+
+ $r = q('select count(*) as qentries from workerq');
+
+ if (!$r) {
+ $content = "<H4>There was an error querying the database.</H4>";
+ return $content;
+ }
+
+ $content .= "<H4>There are " . $r[0]['qentries'] . " queue items to be processed.</H4>";
+
+ $r = dbq("select count(distinct workerq_reservationid) as qworkers from workerq where workerq_reservationid is not null");
+
+ $content .= "<H4>Active workers: " . $r[0]['qworkers'] . "</H4>";
+
+ $maxqueueworkers = get_config('queueworker', 'max_queueworkers', 4);
+ $maxqueueworkers = ($maxqueueworkers > 3) ? $maxqueueworkers : 4;
+ set_config('queueworker', 'max_queueworkers', $maxqueueworkers);
+
+ $sc = '';
+
+ $sc .= replace_macros(get_markup_template('field_input.tpl'), [
+ '$field' => [
+ 'queueworker_maxworkers',
+ t('Max queueworker threads'),
+ $maxqueueworkers,
+ t('Minimum 4, default 4')
+ ]
+ ]);
+
+ $workermaxage = get_config('queueworker', 'queueworker_max_age');
+ $workermaxage = ($workermaxage >= 120) ? $workermaxage : 300;
+ set_config('queueworker', 'max_queueworker_age', $workermaxage);
+
+ $sc .= replace_macros(get_markup_template('field_input.tpl'), [
+ '$field' => [
+ 'queueworker_max_age',
+ t('Assume workers dead after'),
+ $workermaxage,
+ t('Minimum 120, default 300 seconds')
+ ]
+ ]);
+
+ $queueworkersleep = get_config('queueworker', 'queue_worker_sleep');
+ $queueworkersleep = ($queueworkersleep > 100) ? $queueworkersleep : 100;
+ set_config('queueworker', 'queue_worker_sleep', $queueworkersleep);
+
+ $sc .= replace_macros(get_markup_template('field_input.tpl'), [
+ '$field' => [
+ 'queue_worker_sleep',
+ t('Pause before starting next task'),
+ $queueworkersleep,
+ t('Minimum 100, default 100 microseconds')
+ ]
+ ]);
+
+ $tpl = get_markup_template('settings_addon.tpl');
+ $content .= replace_macros($tpl, [
+ '$action_url' => 'queueworker',
+ '$form_security_token' => get_form_security_token('queueworker'),
+ '$title' => t('Queueworker Settings'),
+ '$content' => $sc,
+ '$baseurl' => z_root(),
+ '$submit' => t('Save')
+ ]
+ );
+
+ return $content;
+
+ }
+}
diff --git a/Zotlabs/Update/_1254.php b/Zotlabs/Update/_1254.php
new file mode 100644
index 000000000..ec54754c0
--- /dev/null
+++ b/Zotlabs/Update/_1254.php
@@ -0,0 +1,55 @@
+<?php
+
+namespace Zotlabs\Update;
+
+class _1254 {
+
+ function run() {
+
+ dbq("START TRANSACTION");
+
+ if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
+ $r1 = dbq("CREATE TABLE IF NOT EXISTS workerq (
+ workerq_id bigserial NOT NULL,
+ workerq_priority smallint,
+ workerq_reservationid varchar(25) DEFAULT NULL,
+ workerq_processtimeout timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
+ workerq_data text,
+ workerq_uuid UUID NOT NULL,
+ PRIMARY KEY (workerq_id))"
+ );
+
+ $r2 = dbq("CREATE INDEX idx_workerq_priority ON workerq (workerq_priority)");
+ $r3 = dbq("CREATE INDEX idx_workerq_reservationid ON workerq (workerq_reservationid)");
+ $r4 = dbq("CREATE INDEX idx_workerq_processtimeout ON workerq (workerq_processtimeout)");
+ $r5 = dbq("CREATE INDEX idx_workerq_uuid ON workerq (workerq_uuid)")
+
+ $r = ($r1 && $r2 && $r3 && $r4 && $r5);
+ }
+ else {
+ $r = dbq("CREATE TABLE IF NOT EXISTS workerq (
+ workerq_id BIGINT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
+ workerq_priority smallint,
+ workerq_reservationid varchar(25) DEFAULT NULL,
+ workerq_processtimeout datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
+ workerq_data text,
+ workerq_uuid char(36) NOT NULL DEFAULT ''
+ KEY workerq_priority (workerq_priority),
+ KEY workerq_reservationid (workerq_reservationid),
+ KEY workerq_processtimeout (workerq_uuid)
+ KEY workerq_uuid` (workerq_processtimeout)
+ ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4"
+ );
+ }
+
+ if($r) {
+ dbq("COMMIT");
+ return UPDATE_SUCCESS;
+ }
+
+ q("ROLLBACK");
+ return UPDATE_FAILED;
+
+ }
+
+}