aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Zotlabs/Lib')
-rw-r--r--Zotlabs/Lib/Activity.php43
-rw-r--r--Zotlabs/Lib/Libsync.php28
-rw-r--r--Zotlabs/Lib/Libzot.php6
-rw-r--r--Zotlabs/Lib/QueueWorker.php357
-rw-r--r--Zotlabs/Lib/Webfinger.php18
5 files changed, 431 insertions, 21 deletions
diff --git a/Zotlabs/Lib/Activity.php b/Zotlabs/Lib/Activity.php
index 9dbb15c28..1a1031909 100644
--- a/Zotlabs/Lib/Activity.php
+++ b/Zotlabs/Lib/Activity.php
@@ -672,6 +672,7 @@ class Activity {
}
}
}
+
return $ret;
}
@@ -2080,16 +2081,29 @@ class Activity {
}
- static function update_poll($item, $post) {
+ static function update_poll($item_id, $post) {
$multi = false;
$mid = $post['mid'];
$content = $post['title'];
+ if (!$item_id) {
+ return false;
+ }
+
+ dbq("START TRANSACTION");
+
+ $item = q("SELECT * FROM item WHERE id = %d FOR UPDATE",
+ intval($item_id)
+ );
+
if (!$item) {
+ dbq("COMMIT");
return false;
}
+ $item = $item[0];
+
$o = json_decode($item['obj'], true);
if ($o && array_key_exists('anyOf', $o)) {
$multi = true;
@@ -2161,16 +2175,30 @@ class Activity {
}
logger('updated_poll: ' . print_r($o, true), LOGGER_DATA);
if ($answer_found && !$found) {
- q("update item set obj = '%s', edited = '%s' where id = %d",
+ $u = q("update item set obj = '%s', edited = '%s' where id = %d",
dbesc(json_encode($o)),
dbesc(datetime_convert()),
intval($item['id'])
);
- Master::Summon(['Notifier', 'wall-new', $item['id'], $post['mid'] /* trick queueworker de-duplication */ ]);
- return true;
+ if ($u) {
+ dbq("COMMIT");
+
+ if ($multi) {
+ // wait some seconds for possible multiple answers to be processed
+ // before calling the notifier
+ sleep(3);
+ }
+
+ Master::Summon(['Notifier', 'wall-new', $item['id']]);
+ return true;
+ }
+
+ dbq("ROLLBACK");
+
}
+ dbq("COMMIT");
return false;
}
@@ -2456,7 +2484,7 @@ class Activity {
$s['attach'] = $a;
}
- $a = self::decode_iconfig($act->obj);
+ $a = self::decode_iconfig($act->data);
if ($a) {
$s['iconfig'] = $a;
}
@@ -2786,8 +2814,9 @@ class Activity {
set_iconfig($s, 'diaspora', 'fields', $diaspora_rawmsg, 1);
}
- set_iconfig($s, 'activitypub', 'recips', $act->raw_recips);
-
+ if ($act->raw_recips) {
+ set_iconfig($s, 'activitypub', 'recips', $act->raw_recips);
+ }
$hookinfo = [
'act' => $act,
diff --git a/Zotlabs/Lib/Libsync.php b/Zotlabs/Lib/Libsync.php
index 09b81dc83..19361c4ae 100644
--- a/Zotlabs/Lib/Libsync.php
+++ b/Zotlabs/Lib/Libsync.php
@@ -135,8 +135,7 @@ class Libsync {
$info['collection_members'] = $r;
}
- $interval = ((get_config('system', 'delivery_interval') !== false)
- ? intval(get_config('system', 'delivery_interval')) : 2);
+ $interval = get_config('queueworker', 'queue_interval', 500000);
logger('Packet: ' . print_r($info, true), LOGGER_DATA, LOG_DEBUG);
@@ -155,19 +154,26 @@ class Libsync {
]);
+ /*
$x = q("select count(outq_hash) as total from outq where outq_delivered = 0");
+
if (intval($x[0]['total']) > intval(get_config('system', 'force_queue_threshold', 3000))) {
logger('immediate delivery deferred.', LOGGER_DEBUG, LOG_INFO);
Queue::update($hash);
continue;
}
-
+ */
Master::Summon(['Deliver', $hash]);
+
+ /*
$total = $total - 1;
+ */
+
+ if ($interval) {
+ usleep($interval);
+ }
- if ($interval && $total)
- @time_sleep_until(microtime(true) + (float)$interval);
}
}
@@ -184,8 +190,6 @@ class Libsync {
require_once('include/import.php');
-hz_syslog(print_r($arr, true));
-
$result = [];
$keychange = ((array_key_exists('keychange', $arr)) ? true : false);
@@ -858,7 +862,9 @@ hz_syslog(print_r($arr, true));
);
}
- // update connection timestamp if this is the site we're talking to
+ // Update connection timestamp if this is the site we're talking to.
+ // Also mark all entries from the current site with different sitekeys
+ // deleted (the site has been re-installed)
// This only happens when called from import_xchan
$current_site = false;
@@ -872,6 +878,12 @@ hz_syslog(print_r($arr, true));
intval($r[0]['hubloc_id']),
dbesc($t)
);
+
+ q("update hubloc set hubloc_error = 1, hubloc_deleted = 1 where hubloc_url = '%s' and hubloc_sitekey != '%s'",
+ dbesc($r[0]['hubloc_url']),
+ dbesc($r[0]['hubloc_sitekey'])
+ );
+
$current_site = true;
}
diff --git a/Zotlabs/Lib/Libzot.php b/Zotlabs/Lib/Libzot.php
index b0d33e055..c635fdb17 100644
--- a/Zotlabs/Lib/Libzot.php
+++ b/Zotlabs/Lib/Libzot.php
@@ -291,6 +291,12 @@ class Libzot {
}
$m = parse_url($url);
+
+ if (!$m) {
+ logger('zot_refresh: could not parse url');
+ return false;
+ }
+
$site_url = unparse_url([ 'scheme' => $m['scheme'], 'host' => $m['host'] ]);
$s = q("select site_dead from site where site_url = '%s' limit 1",
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php
new file mode 100644
index 000000000..fd2ebd7e1
--- /dev/null
+++ b/Zotlabs/Lib/QueueWorker.php
@@ -0,0 +1,357 @@
+<?php
+
+namespace Zotlabs\Lib;
+
+use Ramsey\Uuid\Uuid;
+use Ramsey\Uuid\Exception\UnableToBuildUuidException;
+
+class QueueWorker {
+
+ public static $queueworker = null;
+ public static $maxworkers = 0;
+ public static $workermaxage = 0;
+ public static $workersleep = 100;
+ public static $default_priorities = [
+ 'Notifier' => 10,
+ 'Deliver' => 10,
+ 'Cache_query' => 10,
+ 'Content_importer' => 1,
+ 'File_importer' => 1,
+ 'Channel_purge' => 1,
+ 'Directory' => 1
+ ];
+
+ // Exceptions for processtimeout ($workermaxage) value.
+ // Currently the value is overriden with 3600 seconds (1h).
+ public static $long_running_cmd = [
+ 'Queue'
+ ];
+
+ private static function qstart() {
+ q('START TRANSACTION');
+ }
+
+ private static function qcommit() {
+ q("COMMIT");
+ }
+
+ private static function qrollback() {
+ q("ROLLBACK");
+ }
+
+ public static function Summon($argv) {
+
+ if ($argv[0] !== 'Queueworker') {
+
+ $priority = 0; // @TODO allow reprioritization
+
+ if (isset(self::$default_priorities[$argv[0]])) {
+ $priority = self::$default_priorities[$argv[0]];
+ }
+
+ $workinfo = ['argc' => count($argv), 'argv' => $argv];
+ $workinfo_json = json_encode($workinfo);
+ $uuid = self::getUuid($workinfo_json);
+
+ $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
+ dbesc($uuid)
+ );
+ if ($r) {
+ logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG);
+ logger(print_r($workinfo, true));
+ return;
+ }
+
+ self::qstart();
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')",
+ intval($priority),
+ $workinfo_json,
+ dbesc($uuid),
+ dbesc($argv[0])
+ );
+ if (!$r) {
+ self::qrollback();
+ logger("INSERT FAILED", LOGGER_DEBUG);
+ return;
+ }
+ self::qcommit();
+ logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
+ }
+
+ $workers = self::GetWorkerCount();
+ if ($workers < self::$maxworkers) {
+ logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
+ $phpbin = get_config('system', 'phpbin', 'php');
+ proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
+ }
+ }
+
+ public static function Release($argv) {
+
+ if ($argv[0] !== 'Queueworker') {
+
+ $priority = 0; // @TODO allow reprioritization
+ if (isset(self::$default_priorities[$argv[0]])) {
+ $priority = self::$default_priorities[$argv[0]];
+ }
+
+ $workinfo = ['argc' => count($argv), 'argv' => $argv];
+ $workinfo_json = json_encode($workinfo);
+ $uuid = self::getUuid($workinfo_json);
+
+ $r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
+ dbesc($uuid)
+ );
+ if ($r) {
+ logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG);
+ logger(print_r($workinfo, true));
+ return;
+ }
+
+ self::qstart();
+ $r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid, workerq_cmd) VALUES (%d, '%s', '%s', '%s')",
+ intval($priority),
+ $workinfo_json,
+ dbesc($uuid),
+ dbesc($argv[0])
+ );
+ if (!$r) {
+ self::qrollback();
+ logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG);
+ return;
+ }
+ self::qcommit();
+ logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
+ }
+
+ self::Process();
+ }
+
+ public static function GetWorkerCount() {
+ if (self::$maxworkers == 0) {
+ self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4);
+ self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4;
+ }
+ if (self::$workermaxage == 0) {
+ self::$workermaxage = get_config('queueworker', 'max_queueworker_age');
+ self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300;
+ }
+
+ q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s",
+ db_utcnow()
+ );
+
+ //usleep(self::$workersleep);
+
+ $workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null");
+ logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG);
+
+ return intval($workers[0]['total']);
+ }
+
+ public static function GetWorkerID() {
+ if (self::$queueworker) {
+ return self::$queueworker;
+ }
+
+ $wid = uniqid('', true);
+
+ usleep(mt_rand(300000, 1000000)); //Sleep .3 - 1 seconds before creating a new worker.
+
+ $workers = self::GetWorkerCount();
+
+ if ($workers >= self::$maxworkers) {
+ logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
+ return false;
+ }
+
+ self::$queueworker = $wid;
+
+ return $wid;
+ }
+
+ private static function getWorkId() {
+ self::GetWorkerCount();
+
+ self::qstart();
+
+ // This is probably the better solution but is not supported by mariadb < 10.6 which is still used a lot.
+ // $work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
+
+ $work = dbq("SELECT workerq_id, workerq_cmd FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE;");
+
+ if (!$work) {
+ self::qcommit();
+ return false;
+ }
+
+ $id = $work[0]['workerq_id'];
+ $cmd = $work[0]['workerq_cmd'];
+ $age = self::$workermaxage;
+
+ if (in_array($cmd, self::$long_running_cmd)) {
+ $age = 3600; // 1h TODO: make this configurable
+ }
+
+ $work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
+ self::$queueworker,
+ db_utcnow(),
+ db_quoteinterval($age . " SECOND"),
+ intval($id)
+ );
+
+ if (!$work) {
+ self::qrollback();
+ logger("Could not update workerq.", LOGGER_DEBUG);
+ return false;
+ }
+
+ logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG);
+ self::qcommit();
+
+ return $id;
+ }
+
+ public static function Process() {
+ $sleep = intval(get_config('queueworker', 'queue_worker_sleep', 100));
+ $auto_queue_worker_sleep = get_config('queueworker', 'auto_queue_worker_sleep', 0);
+
+ if (!self::GetWorkerID()) {
+ if ($auto_queue_worker_sleep) {
+ set_config('queueworker', 'queue_worker_sleep', $sleep + 100);
+ }
+
+ logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
+ killme();
+ }
+
+ if ($auto_queue_worker_sleep && $sleep > 100) {
+ $next_sleep = $sleep - 100;
+ set_config('queueworker', 'queue_worker_sleep', (($next_sleep < 100) ? 100 : $next_sleep));
+ }
+
+ $jobs = 0;
+ $workid = self::getWorkId();
+ $load_average_sleep = false;
+ self::$workersleep = $sleep;
+ self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);
+
+ if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) {
+ // very experimental!
+ $load_average_sleep = true;
+ }
+
+ while ($workid) {
+
+ if ($load_average_sleep) {
+ $load_average = sys_getloadavg();
+ self::$workersleep = intval($load_average[0]) * 10000;
+
+ if (!self::$workersleep) {
+ self::$workersleep = 100;
+ }
+ }
+
+ logger('queue_worker_sleep: ' . self::$workersleep, LOGGER_DEBUG);
+
+ usleep(self::$workersleep);
+
+ $workitem = dbq("SELECT * FROM workerq WHERE workerq_id = $workid");
+
+ if ($workitem) {
+ // At least SOME work to do.... in case there's more, let's ramp up workers.
+ $workers = self::GetWorkerCount();
+
+ if ($workers < self::$maxworkers) {
+ logger($workers . '/' . self::$maxworkers . ' workers active', LOGGER_DEBUG);
+ $phpbin = get_config('system', 'phpbin', 'php');
+ proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
+ }
+
+ $jobs++;
+
+ logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG);
+
+ $workinfo = json_decode($workitem[0]['workerq_data'], true);
+ $argv = $workinfo['argv'];
+
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $argv = flatten_array_recursive($argv);
+ $argc = count($argv);
+ $rnd = random_string();
+
+ logger('PROCESSING: ' . $rnd . ' ' . print_r($argv[0], true));
+
+ $cls::run($argc, $argv);
+
+ logger('COMPLETED: ' . $rnd);
+
+ // @FIXME: Right now we assume that if we get a return, everything is OK.
+ // At some point we may want to test whether the run returns true/false
+ // and requeue the work to be tried again if needed. But we probably want
+ // to implement some sort of "retry interval" first.
+
+ dbq("delete from workerq where workerq_id = $workid");
+ }
+ else {
+ logger("NO WORKITEM!", LOGGER_DEBUG);
+ }
+ $workid = self::getWorkId();
+ }
+ logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG);
+ }
+
+ public static function ClearQueue() {
+ $work = q("select * from workerq");
+ while ($work) {
+ foreach ($work as $workitem) {
+ $workinfo = json_decode($workitem['v'], true);
+ $argc = $workinfo['argc'];
+ $argv = $workinfo['argv'];
+
+ logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG);
+
+ if (!isset($argv[0])) {
+ q("delete from workerq where workerq_id = %d",
+ $work[0]['workerq_id']
+ );
+ continue;
+ }
+
+ $cls = '\\Zotlabs\\Daemon\\' . $argv[0];
+ $cls::run($argc, $argv);
+
+ q("delete from workerq where workerq_id = %d",
+ $work[0]['workerq_id']
+ );
+
+ //Give the server .3 seconds to catch its breath between tasks.
+ //This will hopefully keep it from crashing to it's knees entirely
+ //if the last task ended up initiating other parallel processes
+ //(eg. polling remotes)
+ usleep(300000);
+ }
+
+ //Make sure nothing new came in
+ $work = q("select * from workerq");
+ }
+ }
+
+ /**
+ * @brief Generate a name-based v5 UUID with custom namespace
+ *
+ * @param string $data
+ * @return string $uuid
+ */
+ private static function getUuid(string $data) {
+ $namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a';
+ try {
+ $uuid = Uuid::uuid5($namespace, $data)->toString();
+ } catch (UnableToBuildUuidException $e) {
+ logger('UUID generation failed');
+ return '';
+ }
+ return $uuid;
+ }
+
+}
diff --git a/Zotlabs/Lib/Webfinger.php b/Zotlabs/Lib/Webfinger.php
index 16d54010c..a0a4aef47 100644
--- a/Zotlabs/Lib/Webfinger.php
+++ b/Zotlabs/Lib/Webfinger.php
@@ -52,15 +52,21 @@ class Webfinger {
if(strpos($resource,'http') === 0) {
$m = parse_url($resource);
- if($m) {
- if(isset($m['scheme']) && $m['scheme'] !== 'https') {
- return false;
- }
- self::$server = $m['host'] . ((isset($m['port'])) ? ':' . $m['port'] : '');
+
+ if (!$m) {
+ return false;
}
- else {
+
+ if(isset($m['scheme']) && $m['scheme'] !== 'https') {
return false;
}
+
+ if(!isset($m['host'])) {
+ return false;
+ }
+
+ self::$server = $m['host'] . ((isset($m['port'])) ? ':' . $m['port'] : '');
+
}
elseif(strpos($resource,'tag:') === 0) {
$arr = explode(':',$resource); // split the tag