diff options
Diffstat (limited to 'Zotlabs/Lib')
-rw-r--r-- | Zotlabs/Lib/Activity.php | 43 | ||||
-rw-r--r-- | Zotlabs/Lib/Libsync.php | 28 | ||||
-rw-r--r-- | Zotlabs/Lib/Libzot.php | 6 | ||||
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 357 | ||||
-rw-r--r-- | Zotlabs/Lib/Webfinger.php | 18 |
5 files changed, 431 insertions, 21 deletions
diff --git a/Zotlabs/Lib/Activity.php b/Zotlabs/Lib/Activity.php index 9dbb15c28..1a1031909 100644 --- a/Zotlabs/Lib/Activity.php +++ b/Zotlabs/Lib/Activity.php @@ -672,6 +672,7 @@ class Activity { } } } + return $ret; } @@ -2080,16 +2081,29 @@ class Activity { } - static function update_poll($item, $post) { + static function update_poll($item_id, $post) { $multi = false; $mid = $post['mid']; $content = $post['title']; + if (!$item_id) { + return false; + } + + dbq("START TRANSACTION"); + + $item = q("SELECT * FROM item WHERE id = %d FOR UPDATE", + intval($item_id) + ); + if (!$item) { + dbq("COMMIT"); return false; } + $item = $item[0]; + $o = json_decode($item['obj'], true); if ($o && array_key_exists('anyOf', $o)) { $multi = true; @@ -2161,16 +2175,30 @@ class Activity { } logger('updated_poll: ' . print_r($o, true), LOGGER_DATA); if ($answer_found && !$found) { - q("update item set obj = '%s', edited = '%s' where id = %d", + $u = q("update item set obj = '%s', edited = '%s' where id = %d", dbesc(json_encode($o)), dbesc(datetime_convert()), intval($item['id']) ); - Master::Summon(['Notifier', 'wall-new', $item['id'], $post['mid'] /* trick queueworker de-duplication */ ]); - return true; + if ($u) { + dbq("COMMIT"); + + if ($multi) { + // wait some seconds for possible multiple answers to be processed + // before calling the notifier + sleep(3); + } + + Master::Summon(['Notifier', 'wall-new', $item['id']]); + return true; + } + + dbq("ROLLBACK"); + } + dbq("COMMIT"); return false; } @@ -2456,7 +2484,7 @@ class Activity { $s['attach'] = $a; } - $a = self::decode_iconfig($act->obj); + $a = self::decode_iconfig($act->data); if ($a) { $s['iconfig'] = $a; } @@ -2786,8 +2814,9 @@ class Activity { set_iconfig($s, 'diaspora', 'fields', $diaspora_rawmsg, 1); } - set_iconfig($s, 'activitypub', 'recips', $act->raw_recips); - + if ($act->raw_recips) { + set_iconfig($s, 'activitypub', 'recips', $act->raw_recips); + } $hookinfo = [ 'act' => $act, diff --git a/Zotlabs/Lib/Libsync.php b/Zotlabs/Lib/Libsync.php index 09b81dc83..19361c4ae 100644 --- a/Zotlabs/Lib/Libsync.php +++ b/Zotlabs/Lib/Libsync.php @@ -135,8 +135,7 @@ class Libsync { $info['collection_members'] = $r; } - $interval = ((get_config('system', 'delivery_interval') !== false) - ? intval(get_config('system', 'delivery_interval')) : 2); + $interval = get_config('queueworker', 'queue_interval', 500000); logger('Packet: ' . print_r($info, true), LOGGER_DATA, LOG_DEBUG); @@ -155,19 +154,26 @@ class Libsync { ]); + /* $x = q("select count(outq_hash) as total from outq where outq_delivered = 0"); + if (intval($x[0]['total']) > intval(get_config('system', 'force_queue_threshold', 3000))) { logger('immediate delivery deferred.', LOGGER_DEBUG, LOG_INFO); Queue::update($hash); continue; } - + */ Master::Summon(['Deliver', $hash]); + + /* $total = $total - 1; + */ + + if ($interval) { + usleep($interval); + } - if ($interval && $total) - @time_sleep_until(microtime(true) + (float)$interval); } } @@ -184,8 +190,6 @@ class Libsync { require_once('include/import.php'); -hz_syslog(print_r($arr, true)); - $result = []; $keychange = ((array_key_exists('keychange', $arr)) ? true : false); @@ -858,7 +862,9 @@ hz_syslog(print_r($arr, true)); ); } - // update connection timestamp if this is the site we're talking to + // Update connection timestamp if this is the site we're talking to. + // Also mark all entries from the current site with different sitekeys + // deleted (the site has been re-installed) // This only happens when called from import_xchan $current_site = false; @@ -872,6 +878,12 @@ hz_syslog(print_r($arr, true)); intval($r[0]['hubloc_id']), dbesc($t) ); + + q("update hubloc set hubloc_error = 1, hubloc_deleted = 1 where hubloc_url = '%s' and hubloc_sitekey != '%s'", + dbesc($r[0]['hubloc_url']), + dbesc($r[0]['hubloc_sitekey']) + ); + $current_site = true; } diff --git a/Zotlabs/Lib/Libzot.php b/Zotlabs/Lib/Libzot.php index b0d33e055..c635fdb17 100644 --- a/Zotlabs/Lib/Libzot.php +++ b/Zotlabs/Lib/Libzot.php @@ -291,6 +291,12 @@ class Libzot { } $m = parse_url($url); + + if (!$m) { + logger('zot_refresh: could not parse url'); + return false; + } + $site_url = unparse_url([ 'scheme' => $m['scheme'], 'host' => $m['host'] ]); $s = q("select site_dead from site where site_url = '%s' limit 1", diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php new file mode 100644 index 000000000..fd2ebd7e1 --- /dev/null +++ b/Zotlabs/Lib/QueueWorker.php @@ -0,0 +1,357 @@ +<?php + +namespace Zotlabs\Lib; + +use Ramsey\Uuid\Uuid; +use Ramsey\Uuid\Exception\UnableToBuildUuidException; + +class QueueWorker { + + public static $queueworker = null; + public static $maxworkers = 0; + public static $workermaxage = 0; + public static $workersleep = 100; + public static $default_priorities = [ + 'Notifier' => 10, + 'Deliver' => 10, + 'Cache_query' => 10, + 'Content_importer' => 1, + 'File_importer' => 1, + 'Channel_purge' => 1, + 'Directory' => 1 + ]; + + // Exceptions for processtimeout ($workermaxage) value. + // Currently the value is overriden with 3600 seconds (1h). + public static $long_running_cmd = [ + 'Queue' + ]; + + private static function qstart() { + q('START TRANSACTION'); + } + + private static function qcommit() { + q("COMMIT"); + } + + private static function qrollback() { + q("ROLLBACK"); + } + + public static function Summon($argv) { + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + + if (isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => count($argv), 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG); + logger(print_r($workinfo, true)); + return; + } + + self::qstart(); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid), + dbesc($argv[0]) + ); + if (!$r) { + self::qrollback(); + logger("INSERT FAILED", LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + + $workers = self::GetWorkerCount(); + if ($workers < self::$maxworkers) { + logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + } + + public static function Release($argv) { + + if ($argv[0] !== 'Queueworker') { + + $priority = 0; // @TODO allow reprioritization + if (isset(self::$default_priorities[$argv[0]])) { + $priority = self::$default_priorities[$argv[0]]; + } + + $workinfo = ['argc' => count($argv), 'argv' => $argv]; + $workinfo_json = json_encode($workinfo); + $uuid = self::getUuid($workinfo_json); + + $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'", + dbesc($uuid) + ); + if ($r) { + logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG); + logger(print_r($workinfo, true)); + return; + } + + self::qstart(); + $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')", + intval($priority), + $workinfo_json, + dbesc($uuid), + dbesc($argv[0]) + ); + if (!$r) { + self::qrollback(); + logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG); + return; + } + self::qcommit(); + logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG); + } + + self::Process(); + } + + public static function GetWorkerCount() { + if (self::$maxworkers == 0) { + self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4); + self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4; + } + if (self::$workermaxage == 0) { + self::$workermaxage = get_config('queueworker', 'max_queueworker_age'); + self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300; + } + + q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s", + db_utcnow() + ); + + //usleep(self::$workersleep); + + $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null"); + logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG); + + return intval($workers[0]['total']); + } + + public static function GetWorkerID() { + if (self::$queueworker) { + return self::$queueworker; + } + + $wid = uniqid('', true); + + usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker. + + $workers = self::GetWorkerCount(); + + if ($workers >= self::$maxworkers) { + logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG); + return false; + } + + self::$queueworker = $wid; + + return $wid; + } + + private static function getWorkId() { + self::GetWorkerCount(); + + self::qstart(); + + // This is probably the better solution but is not supported by mariadb < 10.6 which is still used a lot. + // $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;"); + + $work = dbq("SELECT workerq_id, workerq_cmd FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE;"); + + if (!$work) { + self::qcommit(); + return false; + } + + $id = $work[0]['workerq_id']; + $cmd = $work[0]['workerq_cmd']; + $age = self::$workermaxage; + + if (in_array($cmd, self::$long_running_cmd)) { + $age = 3600; // 1h TODO: make this configurable + } + + $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d", + self::$queueworker, + db_utcnow(), + db_quoteinterval($age . " SECOND"), + intval($id) + ); + + if (!$work) { + self::qrollback(); + logger("Could not update workerq.", LOGGER_DEBUG); + return false; + } + + logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG); + self::qcommit(); + + return $id; + } + + public static function Process() { + $sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100)); + $auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0); + + if (!self::GetWorkerID()) { + if ($auto_queue_worker_sleep) { + set_config('queueworker', 'queue_worker_sleep', $sleep + 100); + } + + logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG); + killme(); + } + + if ($auto_queue_worker_sleep && $sleep > 100) { + $next_sleep = $sleep - 100; + set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep)); + } + + $jobs = 0; + $workid = self::getWorkId(); + $load_average_sleep = false; + self::$workersleep = $sleep; + self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100); + + if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) { + // very experimental! + $load_average_sleep = true; + } + + while ($workid) { + + if ($load_average_sleep) { + $load_average = sys_getloadavg(); + self::$workersleep = intval($load_average[0]) * 10000; + + if (!self::$workersleep) { + self::$workersleep = 100; + } + } + + logger('queue_worker_sleep: ' . self::$workersleep, LOGGER_DEBUG); + + usleep(self::$workersleep); + + $workitem = dbq("SELECT * FROM workerq WHERE workerq_id = $workid"); + + if ($workitem) { + // At least SOME work to do.... in case there's more, let's ramp up workers. + $workers = self::GetWorkerCount(); + + if ($workers < self::$maxworkers) { + logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG); + $phpbin = get_config('system', 'phpbin', 'php'); + proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']); + } + + $jobs++; + + logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG); + + $workinfo = json_decode($workitem[0]['workerq_data'], true); + $argv = $workinfo['argv']; + + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $argv = flatten_array_recursive($argv); + $argc = count($argv); + $rnd = random_string(); + + logger('PROCESSING: ' . $rnd . ' ' . print_r($argv[0], true)); + + $cls::run($argc, $argv); + + logger('COMPLETED: ' . $rnd); + + // @FIXME: Right now we assume that if we get a return, everything is OK. + // At some point we may want to test whether the run returns true/false + // and requeue the work to be tried again if needed. But we probably want + // to implement some sort of "retry interval" first. + + dbq("delete from workerq where workerq_id = $workid"); + } + else { + logger("NO WORKITEM!", LOGGER_DEBUG); + } + $workid = self::getWorkId(); + } + logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG); + } + + public static function ClearQueue() { + $work = q("select * from workerq"); + while ($work) { + foreach ($work as $workitem) { + $workinfo = json_decode($workitem['v'], true); + $argc = $workinfo['argc']; + $argv = $workinfo['argv']; + + logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG); + + if (!isset($argv[0])) { + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + continue; + } + + $cls = '\\Zotlabs\\Daemon\\' . $argv[0]; + $cls::run($argc, $argv); + + q("delete from workerq where workerq_id = %d", + $work[0]['workerq_id'] + ); + + //Give the server .3 seconds to catch its breath between tasks. + //This will hopefully keep it from crashing to it's knees entirely + //if the last task ended up initiating other parallel processes + //(eg. polling remotes) + usleep(300000); + } + + //Make sure nothing new came in + $work = q("select * from workerq"); + } + } + + /** + * @brief Generate a name-based v5 UUID with custom namespace + * + * @param string $data + * @return string $uuid + */ + private static function getUuid(string $data) { + $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a'; + try { + $uuid = Uuid::uuid5($namespace, $data)->toString(); + } catch (UnableToBuildUuidException $e) { + logger('UUID generation failed'); + return ''; + } + return $uuid; + } + +} diff --git a/Zotlabs/Lib/Webfinger.php b/Zotlabs/Lib/Webfinger.php index 16d54010c..a0a4aef47 100644 --- a/Zotlabs/Lib/Webfinger.php +++ b/Zotlabs/Lib/Webfinger.php @@ -52,15 +52,21 @@ class Webfinger { if(strpos($resource,'http') === 0) { $m = parse_url($resource); - if($m) { - if(isset($m['scheme']) && $m['scheme'] !== 'https') { - return false; - } - self::$server = $m['host'] . ((isset($m['port'])) ? ':' . $m['port'] : ''); + + if (!$m) { + return false; } - else { + + if(isset($m['scheme']) && $m['scheme'] !== 'https') { return false; } + + if(!isset($m['host'])) { + return false; + } + + self::$server = $m['host'] . ((isset($m['port'])) ? ':' . $m['port'] : ''); + } elseif(strpos($resource,'tag:') === 0) { $arr = explode(':',$resource); // split the tag |