diff options
Diffstat (limited to 'Zotlabs')
-rw-r--r-- | Zotlabs/Daemon/Cron.php | 8 | ||||
-rw-r--r-- | Zotlabs/Daemon/Master.php | 11 | ||||
-rw-r--r-- | Zotlabs/Daemon/Poller.php | 16 | ||||
-rw-r--r-- | Zotlabs/Lib/Libsync.php | 9 | ||||
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 351 | ||||
-rw-r--r-- | Zotlabs/Module/Import.php | 2 | ||||
-rw-r--r-- | Zotlabs/Module/Queueworker.php | 122 | ||||
-rw-r--r-- | Zotlabs/Update/_1254.php | 55 |
8 files changed, 569 insertions, 5 deletions
diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php index 6629491de..c3158a4eb 100644 --- a/Zotlabs/Daemon/Cron.php +++ b/Zotlabs/Daemon/Cron.php @@ -50,7 +50,7 @@ class Cron { require_once('include/account.php'); remove_expired_registrations(); - $interval = get_config('system', 'delivery_interval', 3); + //$interval = get_config('system', 'delivery_interval', 3); // expire any expired items @@ -65,8 +65,10 @@ class Cron { if ($rr['item_wall']) { // The notifier isn't normally invoked unless item_drop is interactive. Master::Summon(['Notifier', 'drop', $rr['id']]); + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } } } @@ -96,8 +98,10 @@ class Cron { if ($r) { foreach ($r as $rr) { Master::Summon(array('Directory', $rr['channel_id'], 'force')); + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } } @@ -151,8 +155,10 @@ class Cron { ); } Master::Summon(array('Notifier', 'wall-new', $rr['id'])); + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } } } diff --git a/Zotlabs/Daemon/Master.php b/Zotlabs/Daemon/Master.php index 6fa656be5..495718bf4 100644 --- a/Zotlabs/Daemon/Master.php +++ b/Zotlabs/Daemon/Master.php @@ -2,6 +2,8 @@ namespace Zotlabs\Daemon; +use Zotlabs\Lib\QueueWorker; + if (array_search(__file__, get_included_files()) === 0) { require_once('include/cli_startup.php'); array_shift($argv); @@ -16,6 +18,10 @@ if (array_search(__file__, get_included_files()) === 0) { class Master { static public function Summon($arr) { + + QueueWorker::Summon($arr); + return; +/* $hookinfo = [ 'argv' => $arr ]; @@ -32,11 +38,15 @@ class Master { $phpbin = get_config('system', 'phpbin', 'php'); proc_run($phpbin, 'Zotlabs/Daemon/Master.php', $arr); +*/ } static public function Release($argc, $argv) { cli_startup(); + QueueWorker::Release($argv); + return; +/* $hookinfo = [ 'argv' => $argv ]; @@ -54,5 +64,6 @@ class Master { logger('Master: release: ' . json_encode($argv), LOGGER_ALL, LOG_DEBUG); $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; $cls::run($argc, $argv); +*/ } } diff --git a/Zotlabs/Daemon/Poller.php b/Zotlabs/Daemon/Poller.php index 63c498f17..5bf8d3a02 100644 --- a/Zotlabs/Daemon/Poller.php +++ b/Zotlabs/Daemon/Poller.php @@ -17,10 +17,12 @@ class Poller { } } +/* $interval = intval(get_config('system', 'poll_interval')); if (!$interval) $interval = ((get_config('system', 'delivery_interval') === false) ? 3 : intval(get_config('system', 'delivery_interval'))); + // Check for a lockfile. If it exists, but is over an hour old, it's stale. Ignore it. $lockfile = 'store/[data]/poller'; if ((file_exists($lockfile)) && (filemtime($lockfile) > (time() - 3600)) @@ -32,6 +34,7 @@ class Poller { // Create a lockfile. Needs two vars, but $x doesn't need to contain anything. $x = ''; file_put_contents($lockfile, $x); +*/ logger('poller: start'); @@ -103,8 +106,10 @@ class Poller { if ($t < $x) { Master::Summon(['Onepoll', $contact['abook_id']]); + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } continue; @@ -167,8 +172,11 @@ class Poller { continue; Master::Summon(['Onepoll', $contact['abook_id']]); + + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } } @@ -190,9 +198,13 @@ class Poller { if ($rr['ud_last'] > NULL_DATE) if ($rr['ud_last'] > datetime_convert('UTC', 'UTC', 'now - 1 day')) continue; + Master::Summon(['Onedirsync', $rr['ud_id']]); + + /* if ($interval) @time_sleep_until(microtime(true) + (float)$interval); + */ } } } @@ -200,9 +212,9 @@ class Poller { set_config('system', 'lastpoll', datetime_convert()); //All done - clear the lockfile - +/* @unlink($lockfile); - +*/ return; } } diff --git a/Zotlabs/Lib/Libsync.php b/Zotlabs/Lib/Libsync.php index fd9886f71..b02ae4c69 100644 --- a/Zotlabs/Lib/Libsync.php +++ b/Zotlabs/Lib/Libsync.php @@ -135,8 +135,10 @@ class Libsync { $info['collection_members'] = $r; } + /* $interval = ((get_config('system', 'delivery_interval') !== false) ? intval(get_config('system', 'delivery_interval')) : 2); + */ logger('Packet: ' . print_r($info, true), LOGGER_DATA, LOG_DEBUG); @@ -155,19 +157,24 @@ class Libsync { ]); + /* $x = q("select count(outq_hash) as total from outq where outq_delivered = 0"); + if (intval($x[0]['total']) > intval(get_config('system', 'force_queue_threshold', 3000))) { logger('immediate delivery deferred.', LOGGER_DEBUG, LOG_INFO); Queue::update($hash); continue; } - + */ Master::Summon(['Deliver', $hash]); + + /* $total = $total - 1; if ($interval && $total) @time_sleep_until(microtime(true) + (float)$interval); + */ } } diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php new file mode 100644 index 000000000..21983a4e2 --- /dev/null +++ b/Zotlabs/Lib/QueueWorker.php @@ -0,0 +1,351 @@ +<?php + +namespace Zotlabs\Lib; + + +use Ramsey\Uuid\Uuid; +use Ramsey\Uuid\Exception\UnableToBuildUuidException; + + +class QueueWorker { + + public static $queueworker = null; + public static $maxworkers = 0; + public static $workermaxage = 0; + public static $workersleep = 100; + public static $default_priorities = [ + 'Notifier' => 10, + 'Deliver' => 10, + 'Cache_query' => 10, + 'Content_importer' => 1, + 'File_importer' => 1, + 'Channel_purge' => 1, + 'Directory' => 1 + ]; + + private static function qbegin($tablename) { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q('BEGIN'); + q('LOCK TABLE ' . $tablename . ' WRITE'); + break; + + case DBTYPE_POSTGRES: + q('BEGIN'); + //q('LOCK TABLE '.$tablename.' IN ACCESS EXCLUSIVE MODE'); + break; + } + return; + } + + private static function qcommit() { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q("UNLOCK TABLES"); + q("COMMIT"); + break; + + case DBTYPE_POSTGRES: + q("COMMIT"); + break; + } + return; + } + + private static function qrollback() { + switch (ACTIVE_DBTYPE) { + case DBTYPE_MYSQL: + q("ROLLBACK"); + q("UNLOCK TABLES"); + break; + + case DBTYPE_POSTGRES: + q("ROLLBACK"); + break; + } + return; + } + + public static function Summon(&$argv) { + + $argc = count($argv); + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + + if(isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => $argc, 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG); + logger(print_r($workinfo,true)); + $argv = []; + return; + } + + self::qbegin('workerq'); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid) + ); + if (!$r) { + self::qrollback(); + logger("INSERT FAILED", LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + $argv = []; + + $workers = self::GetWorkerCount(); + if ($workers < self::$maxworkers) { + logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + } + + public static function Release(&$argv) { + + $argc = count($argv); + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + if(isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => $argc, 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG); + logger(print_r($workinfo,true)); + + $argv = []; + return; + } + + self::qbegin('workerq'); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid) + ); + if (!$r) { + self::qrollback(); + logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + $argv = []; + self::Process(); + } + + public static function GetWorkerCount() { + if (self::$maxworkers == 0) { + self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4); + self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4; + } + if (self::$workermaxage == 0) { + self::$workermaxage = get_config('queueworker', 'max_queueworker_age'); + self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300; + } + + q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s", + db_utcnow() + ); + + usleep(self::$workersleep); + $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null"); + logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG); + return intval($workers[0]['total']); + } + + public static function GetWorkerID() { + if (self::$queueworker) { + return self::$queueworker; + } + $wid = uniqid('', true); + usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker. + $workers = self::GetWorkerCount(); + if ($workers >= self::$maxworkers) { + logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG); + return false; + } + self::$queueworker = $wid; + return $wid; + } + + private static function getWorkId() { + self::GetWorkerCount(); + + self::qbegin('workerq'); + + if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { + $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); + } + else { + $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1;"); + } + + if (!$work) { + self::qrollback(); + return false; + } + $id = $work[0]['workerq_id']; + + $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d", + self::$queueworker, + db_utcnow(), + db_quoteinterval(self::$workermaxage . " SECOND"), + intval($id) + ); + + if (!$work) { + self::qrollback(); + logger("Could not update workerq.", LOGGER_DEBUG); + return false; + } + logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG); + self::qcommit(); + return $id; + } + + public static function Process() { + self::$workersleep = get_config('queueworker', 'queue_worker_sleep'); + self::$workersleep = intval(self::$workersleep) > 100 ? intval(self::$workersleep) : 100; + + if (!self::GetWorkerID()) { + logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG); + killme(); + } + + $jobs = 0; + $workid = self::getWorkId(); + while ($workid) { + usleep(self::$workersleep); + // @FIXME: Currently $workersleep is a fixed value. It may be a good idea + // to implement a "backoff" instead - based on load average or some + // other metric. + + self::qbegin('workerq'); + + if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) { + $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED", + $workid + ); + } + else { + $workitem = q("SELECT * FROM workerq WHERE workerq_id = %d", + $workid + ); + } + + self::qcommit(); + + if (isset($workitem[0])) { + // At least SOME work to do.... in case there's more, let's ramp up workers. + $workers = self::GetWorkerCount(); + if ($workers < self::$maxworkers) { + logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + + $jobs++; + logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG); + + $workinfo = json_decode($workitem[0]['workerq_data'], true); + $argv = $workinfo['argv']; + logger('Master: process: ' . json_encode($argv), LOGGER_DEBUG); + + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $argv = flatten_array_recursive($argv); + $argc = count($argv); + $cls::run($argc, $argv); + + // @FIXME: Right now we assume that if we get a return, everything is OK. + // At some point we may want to test whether the run returns true/false + // and requeue the work to be tried again if needed. But we probably want + // to implement some sort of "retry interval" first. + + self::qbegin('workerq'); + q("delete from workerq where workerq_id = %d", $workid); + self::qcommit(); + } + else { + logger("NO WORKITEM!", LOGGER_DEBUG); + } + $workid = self::getWorkId(); + } + logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG); + } + + public static function ClearQueue() { + $work = q("select * from workerq"); + while ($work) { + foreach ($work as $workitem) { + $workinfo = json_decode($workitem['v'], true); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG); + if (!isset($argv[0])) { + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + continue; + } + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc, $argv); + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + usleep(300000); + //Give the server .3 seconds to catch its breath between tasks. + //This will hopefully keep it from crashing to it's knees entirely + //if the last task ended up initiating other parallel processes + //(eg. polling remotes) + } + //Make sure nothing new came in + $work = q("select * from workerq"); + } + return; + } + + /** + * @brief Generate a name-based v5 UUID with custom namespace + * + * @param string $data + * @return string $uuid + */ + private static function getUuid($data) { + $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a'; + try { + $uuid = Uuid::uuid5($namespace, $data)->toString(); + } catch (UnableToBuildUuidException $e) { + logger('UUID generation failed'); + return ''; + } + return $uuid; + } + +} diff --git a/Zotlabs/Module/Import.php b/Zotlabs/Module/Import.php index ec47e370b..c8a9ac5ed 100644 --- a/Zotlabs/Module/Import.php +++ b/Zotlabs/Module/Import.php @@ -536,7 +536,7 @@ class Import extends Controller { $since = datetime_convert(date_default_timezone_get(), date_default_timezone_get(), '0001-01-01 00:00'); $until = datetime_convert(date_default_timezone_get(), date_default_timezone_get(), 'now + 1 day'); - $poll_interval = get_config('system', 'poll_interval', 3); + //$poll_interval = get_config('system', 'poll_interval', 3); $page = 0; Master::Summon(['Content_importer', sprintf('%d', $page), $since, $until, $channel['channel_address'], urlencode($hz_server)]); diff --git a/Zotlabs/Module/Queueworker.php b/Zotlabs/Module/Queueworker.php new file mode 100644 index 000000000..808a9ed61 --- /dev/null +++ b/Zotlabs/Module/Queueworker.php @@ -0,0 +1,122 @@ +<?php + +namespace Zotlabs\Module; + +use App; +use Zotlabs\Web\Controller; + +class Queueworker extends Controller { + + function init() { + + } + + function post() { + + if ((!local_channel()) || (!is_site_admin())) { + goaway(z_root() . '/queueworker'); + } + + check_form_security_token('form_security_token', 'queueworker'); + + $maxqueueworkers = intval($_POST['queueworker_maxworkers']); + $maxqueueworkers = ($maxqueueworkers > 3) ? $maxqueueworkers : 4; + set_config('queueworker', 'max_queueworkers', $maxqueueworkers); + + $maxworkerage = intval($_POST['queueworker_max_age']); + $maxworkerage = ($maxworkerage >= 120) ? $maxworkerage : 300; + set_config('queueworker', 'queueworker_max_age', $maxworkerage); + + $queueworkersleep = intval($_POST['queue_worker_sleep']); + $queueworkersleep = ($queueworkersleep > 100) ? $queueworkersleep : 100; + set_config('queueworker', 'queue_worker_sleep', $queueworkersleep); + + goaway(z_root() . '/queueworker'); + } + + function get() { + + $content = "<H1>ERROR: Page not found</H1>"; + App::$error = 404; + + if (!local_channel()) { + return $content; + } + + if (!(is_site_admin())) { + return $content; + } + + load_config("queueworker"); + + $content = "<H1>Queue Status</H1>\n"; + + $r = q('select count(*) as qentries from workerq'); + + if (!$r) { + $content = "<H4>There was an error querying the database.</H4>"; + return $content; + } + + $content .= "<H4>There are " . $r[0]['qentries'] . " queue items to be processed.</H4>"; + + $r = dbq("select count(distinct workerq_reservationid) as qworkers from workerq where workerq_reservationid is not null"); + + $content .= "<H4>Active workers: " . $r[0]['qworkers'] . "</H4>"; + + $maxqueueworkers = get_config('queueworker', 'max_queueworkers', 4); + $maxqueueworkers = ($maxqueueworkers > 3) ? $maxqueueworkers : 4; + set_config('queueworker', 'max_queueworkers', $maxqueueworkers); + + $sc = ''; + + $sc .= replace_macros(get_markup_template('field_input.tpl'), [ + '$field' => [ + 'queueworker_maxworkers', + t('Max queueworker threads'), + $maxqueueworkers, + t('Minimum 4, default 4') + ] + ]); + + $workermaxage = get_config('queueworker', 'queueworker_max_age'); + $workermaxage = ($workermaxage >= 120) ? $workermaxage : 300; + set_config('queueworker', 'max_queueworker_age', $workermaxage); + + $sc .= replace_macros(get_markup_template('field_input.tpl'), [ + '$field' => [ + 'queueworker_max_age', + t('Assume workers dead after'), + $workermaxage, + t('Minimum 120, default 300 seconds') + ] + ]); + + $queueworkersleep = get_config('queueworker', 'queue_worker_sleep'); + $queueworkersleep = ($queueworkersleep > 100) ? $queueworkersleep : 100; + set_config('queueworker', 'queue_worker_sleep', $queueworkersleep); + + $sc .= replace_macros(get_markup_template('field_input.tpl'), [ + '$field' => [ + 'queue_worker_sleep', + t('Pause before starting next task'), + $queueworkersleep, + t('Minimum 100, default 100 microseconds') + ] + ]); + + $tpl = get_markup_template('settings_addon.tpl'); + $content .= replace_macros($tpl, [ + '$action_url' => 'queueworker', + '$form_security_token' => get_form_security_token('queueworker'), + '$title' => t('Queueworker Settings'), + '$content' => $sc, + '$baseurl' => z_root(), + '$submit' => t('Save') + ] + ); + + return $content; + + } +} diff --git a/Zotlabs/Update/_1254.php b/Zotlabs/Update/_1254.php new file mode 100644 index 000000000..ec54754c0 --- /dev/null +++ b/Zotlabs/Update/_1254.php @@ -0,0 +1,55 @@ +<?php + +namespace Zotlabs\Update; + +class _1254 { + + function run() { + + dbq("START TRANSACTION"); + + if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) { + $r1 = dbq("CREATE TABLE IF NOT EXISTS workerq ( + workerq_id bigserial NOT NULL, + workerq_priority smallint, + workerq_reservationid varchar(25) DEFAULT NULL, + workerq_processtimeout timestamp NOT NULL DEFAULT '0001-01-01 00:00:00', + workerq_data text, + workerq_uuid UUID NOT NULL, + PRIMARY KEY (workerq_id))" + ); + + $r2 = dbq("CREATE INDEX idx_workerq_priority ON workerq (workerq_priority)"); + $r3 = dbq("CREATE INDEX idx_workerq_reservationid ON workerq (workerq_reservationid)"); + $r4 = dbq("CREATE INDEX idx_workerq_processtimeout ON workerq (workerq_processtimeout)"); + $r5 = dbq("CREATE INDEX idx_workerq_uuid ON workerq (workerq_uuid)") + + $r = ($r1 && $r2 && $r3 && $r4 && $r5); + } + else { + $r = dbq("CREATE TABLE IF NOT EXISTS workerq ( + workerq_id BIGINT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT, + workerq_priority smallint, + workerq_reservationid varchar(25) DEFAULT NULL, + workerq_processtimeout datetime NOT NULL DEFAULT '0001-01-01 00:00:00', + workerq_data text, + workerq_uuid char(36) NOT NULL DEFAULT '' + KEY workerq_priority (workerq_priority), + KEY workerq_reservationid (workerq_reservationid), + KEY workerq_processtimeout (workerq_uuid) + KEY workerq_uuid` (workerq_processtimeout) + ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4" + ); + } + + if($r) { + dbq("COMMIT"); + return UPDATE_SUCCESS; + } + + q("ROLLBACK"); + return UPDATE_FAILED; + + } + +} |