aboutsummaryrefslogtreecommitdiffstats
path: root/Zotlabs/Daemon
diff options
context:
space:
mode:
Diffstat (limited to 'Zotlabs/Daemon')
-rw-r--r--Zotlabs/Daemon/Channel_purge.php34
-rw-r--r--Zotlabs/Daemon/Content_importer.php77
-rw-r--r--Zotlabs/Daemon/Cron.php6
-rw-r--r--Zotlabs/Daemon/Directory.php7
-rw-r--r--Zotlabs/Daemon/Externals.php193
-rw-r--r--Zotlabs/Daemon/File_importer.php71
-rw-r--r--Zotlabs/Daemon/Notifier.php57
-rw-r--r--Zotlabs/Daemon/Onepoll.php27
8 files changed, 320 insertions, 152 deletions
diff --git a/Zotlabs/Daemon/Channel_purge.php b/Zotlabs/Daemon/Channel_purge.php
new file mode 100644
index 000000000..9fceb0fb9
--- /dev/null
+++ b/Zotlabs/Daemon/Channel_purge.php
@@ -0,0 +1,34 @@
+<?php
+
+namespace Zotlabs\Daemon;
+
+class Channel_purge {
+
+ static public function run($argc,$argv) {
+
+ cli_startup();
+
+ $channel_id = intval($argv[1]);
+
+ $channel = q("select * from channel where channel_id = %d and channel_removed = 1",
+ intval($channel_id)
+ );
+
+ if (! $channel) {
+ return;
+ }
+
+ do {
+ $r = q("select id from item where uid = %d and item_deleted = 0 limit 1000",
+ intval($channel_id)
+ );
+ if ($r) {
+ foreach ($r as $rv) {
+ drop_item($rv['id'], false);
+ }
+ }
+ } while ($r);
+
+ return;
+ }
+}
diff --git a/Zotlabs/Daemon/Content_importer.php b/Zotlabs/Daemon/Content_importer.php
new file mode 100644
index 000000000..67f1c8e80
--- /dev/null
+++ b/Zotlabs/Daemon/Content_importer.php
@@ -0,0 +1,77 @@
+<?php
+
+namespace Zotlabs\Daemon;
+
+use Zotlabs\Web\HTTPSig;
+use Zotlabs\Lib\PConfig;
+
+
+require_once('include/cli_startup.php');
+require_once('include/attach.php');
+require_once('include/import.php');
+
+class Content_importer {
+
+ static public function run($argc,$argv) {
+ cli_startup();
+
+ $page = $argv[1];
+ $since = $argv[2];
+ $until = $argv[3];
+ $channel_address = $argv[4];
+ $hz_server = urldecode($argv[5]);
+
+ $m = parse_url($hz_server);
+
+ $channel = channelx_by_nick($channel_address);
+ if(! $channel) {
+ logger('channel not found');
+ return;
+ }
+
+ $headers = [
+ 'X-API-Token' => random_string(),
+ 'X-API-Request' => $hz_server . '/api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page ,
+ 'Host' => $m['host'],
+ '(request-target)' => 'get /api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page ,
+ ];
+
+ $headers = HTTPSig::create_sig($headers,$channel['channel_prvkey'], channel_url($channel),true,'sha512');
+
+ $x = z_fetch_url($hz_server . '/api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page,false,$redirects,[ 'headers' => $headers ]);
+
+ // logger('item fetch: ' . print_r($x,true));
+
+ if(! $x['success']) {
+ logger('no API response',LOGGER_DEBUG);
+ killme();
+ }
+
+ $j = json_decode($x['body'],true);
+
+ if(! is_array($j['item']) || ! count($j['item'])) {
+ PConfig::Set($channel['channel_id'], 'import', 'content_completed', 1);
+ return;
+ }
+
+ $saved_notification_flags = notifications_off($channel['channel_id']);
+
+ import_items($channel,$j['item'],false,((array_key_exists('relocate',$j)) ? $j['relocate'] : null));
+
+ notifications_on($channel['channel_id'], $saved_notification_flags);
+
+ PConfig::Set($channel['channel_id'], 'import', 'content_progress', [
+ 'items_total' => $j['items_total'],
+ 'items_page' => $j['items_page'],
+ 'items_current_page' => count($j['item']),
+ 'last_page' => $page,
+ 'next_cmd' => ['Content_importer', sprintf('%d',$page + 1), $since, $until, $channel['channel_address'], urlencode($hz_server)]
+ ]);
+
+ $page++;
+
+ Master::Summon([ 'Content_importer', sprintf('%d',$page), $since, $until, $channel['channel_address'], urlencode($hz_server) ]);
+
+ return;
+ }
+}
diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php
index c0a190c8e..6629491de 100644
--- a/Zotlabs/Daemon/Cron.php
+++ b/Zotlabs/Daemon/Cron.php
@@ -36,7 +36,6 @@ class Cron {
// run queue delivery process in the background
Master::Summon(array('Queue'));
-
Master::Summon(array('Poller'));
/**
@@ -206,10 +205,9 @@ class Cron {
// pull in some public posts
-/* $disable_discover_tab = get_config('system', 'disable_discover_tab') || get_config('system', 'disable_discover_tab') === false;
+ $disable_discover_tab = get_config('system', 'disable_discover_tab') || get_config('system', 'disable_discover_tab') === false;
if (!$disable_discover_tab)
- Master::Summon(array('Externals'));
-*/
+ Master::Summon(['Externals']);
$restart = false;
diff --git a/Zotlabs/Daemon/Directory.php b/Zotlabs/Daemon/Directory.php
index 35d184206..3996b8079 100644
--- a/Zotlabs/Daemon/Directory.php
+++ b/Zotlabs/Daemon/Directory.php
@@ -49,8 +49,9 @@ class Directory {
);
// Now update all the connections
- if ($pushall)
+ if ($pushall) {
Master::Summon(array('Notifier', 'refresh_all', $channel['channel_id']));
+ }
return;
}
@@ -93,8 +94,8 @@ class Directory {
}
// Now update all the connections
- if ($pushall)
+ if ($pushall) {
Master::Summon(array('Notifier', 'refresh_all', $channel['channel_id']));
-
+ }
}
}
diff --git a/Zotlabs/Daemon/Externals.php b/Zotlabs/Daemon/Externals.php
index 064b3f71d..81414d02d 100644
--- a/Zotlabs/Daemon/Externals.php
+++ b/Zotlabs/Daemon/Externals.php
@@ -3,6 +3,7 @@
namespace Zotlabs\Daemon;
use Zotlabs\Lib\Activity;
+use Zotlabs\Lib\Libzot;
use Zotlabs\Lib\ActivityStreams;
use Zotlabs\Lib\ASCollection;
@@ -31,25 +32,55 @@ class Externals {
$url = $arr['url'];
}
else {
+ $networks = ['zot6'];
+
+ if (plugin_is_installed('pubcrawl')) {
+ $networks[] = 'activitypub';
+ }
+
+ stringify_array_elms($networks);
+ $networks_str = implode(',', $networks);
+
$randfunc = db_getfunc('RAND');
// fixme this query does not deal with directory realms.
-
- $r = q("select site_url, site_pull from site where site_url != '%s'
- and site_flags != %d and site_type = %d
- and site_dead = 0 and site_project like '%s' and site_version > '5.3.1' order by $randfunc limit 1",
+ //$r = q("select site_url, site_pull from site where site_url != '%s'
+ //and site_flags != %d and site_type = %d
+ //and site_dead = 0 and site_project like '%s' and site_version > '5.3.1' order by $randfunc limit 1",
+ //dbesc(z_root()),
+ //intval(DIRECTORY_MODE_STANDALONE),
+ //intval(SITE_TYPE_ZOT),
+ //dbesc('hubzilla%')
+ //);
+
+ $r = q("SELECT * FROM hubloc
+ LEFT JOIN abook ON abook_xchan = hubloc_hash
+ LEFT JOIN site ON site_url = hubloc_url WHERE
+ hubloc_network IN ( $networks_str ) AND
+ abook_xchan IS NULL AND
+ hubloc_url != '%s' AND
+ hubloc_updated > '%s' AND
+ hubloc_primary = 1 AND hubloc_deleted = 0 AND
+ site_dead = 0
+ ORDER BY $randfunc LIMIT 1",
dbesc(z_root()),
- intval(DIRECTORY_MODE_STANDALONE),
- intval(SITE_TYPE_ZOT),
- dbesc('hubzilla%')
+ datetime_convert('UTC', 'UTC', 'now - 30 days')
);
- if ($r)
- $url = $r[0]['site_url'];
+
+ $contact = $r[0];
+
+ if ($contact) {
+ $url = $contact['hubloc_id_url'];
+ }
+ }
+
+ if (!$url) {
+ continue;
}
$blacklisted = false;
- if (!check_siteallowed($url)) {
+ if (!check_siteallowed($contact['hubloc_url'])) {
logger('blacklisted site: ' . $url);
$blacklisted = true;
}
@@ -59,123 +90,65 @@ class Externals {
// make sure we can eventually break out if somebody blacklists all known sites
if ($blacklisted) {
- if ($attempts > 20)
+ if ($attempts > 5)
break;
$attempts--;
continue;
}
- if ($url) {
-
- $max = intval(get_config('system', 'max_imported_posts', 30));
- if (intval($max)) {
- logger('externals: fetching outbox');
-
- $feed_url = $url . '/zotfeed';
- $obj = new ASCollection($feed_url, $importer, 0, $max);
- $messages = $obj->get();
+ $cl = Activity::get_actor_collections($contact['hubloc_hash']);
+ if(empty($cl)) {
+ $cl = get_xconfig($contact['hubloc_hash'], 'activitypub', 'collections');
+ }
- if ($messages) {
- foreach ($messages as $message) {
- if (is_string($message)) {
- $message = Activity::fetch($message, $importer);
- }
- $AS = new ActivityStreams($message);
- if ($AS->is_valid() && is_array($AS->obj)) {
- $item = Activity::decode_note($AS);
- Activity::store($importer, $importer['xchan_hash'], $AS, $item, true);
- $total++;
- }
- }
- }
- logger('externals: import_public_posts: ' . $total . ' messages imported', LOGGER_DEBUG);
+ if (is_array($cl) && array_key_exists('outbox', $cl)) {
+ $url = $cl['outbox'];
+ }
+ else {
+ $url = str_replace('/channel/', '/outbox/', $contact['hubloc_id_url']);
+ if ($url) {
+ $url .= '?top=1';
}
}
- }
- return;
-
- /* $total = 0;
- $attempts = 0;
-
- logger('externals: startup', LOGGER_DEBUG);
-
- // pull in some public posts
- while ($total == 0 && $attempts < 3) {
- $arr = ['url' => ''];
- call_hooks('externals_url_select', $arr);
-
- if ($arr['url']) {
- $url = $arr['url'];
- }
- else {
- $randfunc = db_getfunc('RAND');
-
- // fixme this query does not deal with directory realms.
-
- $r = q("select site_url, site_pull from site where site_url != '%s' and site_flags != %d and site_type = %d and site_dead = 0 order by $randfunc limit 1",
- dbesc(z_root()),
- intval(DIRECTORY_MODE_STANDALONE),
- intval(SITE_TYPE_ZOT)
- );
- if ($r)
- $url = $r[0]['site_url'];
- }
-
- $blacklisted = false;
+ if ($url) {
+ logger('fetching outbox: ' . $url);
- if (!check_siteallowed($url)) {
- logger('blacklisted site: ' . $url);
- $blacklisted = true;
- }
+ $obj = new ASCollection($url, $importer, 0, 10);
+ $messages = $obj->get();
- $attempts++;
+ if ($messages) {
+ foreach ($messages as $message) {
+ if (is_string($message)) {
+ $message = Activity::fetch($message, $importer);
+ }
- // make sure we can eventually break out if somebody blacklists all known sites
+ if ($message['type'] !== 'Create') {
+ continue;
+ }
- if ($blacklisted) {
- if ($attempts > 20)
- break;
- $attempts--;
- continue;
- }
+ if ($contact['hubloc_network'] === 'zot6') {
+ // make sure we only fetch top level items
+ if (isset($message['object']['inReplyTo'])) {
+ continue;
+ }
- if ($url) {
- if ($r[0]['site_pull'] > NULL_DATE)
- $mindate = urlencode(datetime_convert('', '', $r[0]['site_pull'] . ' - 1 day'));
- else {
- $days = get_config('externals', 'since_days');
- if ($days === false)
- $days = 15;
- $mindate = urlencode(datetime_convert('', '', 'now - ' . intval($days) . ' days'));
+ Libzot::fetch_conversation($importer, $message['object']['id']);
+ $total++;
+ continue;
}
- $feedurl = $url . '/zotfeed?f=&mindate=' . $mindate;
-
- logger('externals: pulling public content from ' . $feedurl, LOGGER_DEBUG);
-
- $x = z_fetch_url($feedurl);
- if (($x) && ($x['success'])) {
-
- q("update site set site_pull = '%s' where site_url = '%s'",
- dbesc(datetime_convert()),
- dbesc($url)
- );
-
- $j = json_decode($x['body'], true);
- if ($j['success'] && $j['messages']) {
- $sys = get_sys_channel();
- foreach ($j['messages'] as $message) {
- // on these posts, clear any route info.
- $message['route'] = '';
- process_delivery(['hash' => 'undefined'], get_item_elements($message),
- [['hash' => $sys['xchan_hash']]], false, true);
- $total++;
- }
- logger('externals: import_public_posts: ' . $total . ' messages imported', LOGGER_DEBUG);
- }
+ $AS = new ActivityStreams($message);
+ if ($AS->is_valid() && is_array($AS->obj)) {
+ $item = Activity::decode_note($AS);
+ Activity::store($importer, $contact['abook_xchan'], $AS, $item);
+ $total++;
}
}
- }*/
+ }
+ logger('fetched messages count: ' . $total);
+ }
+ }
+ return;
}
}
diff --git a/Zotlabs/Daemon/File_importer.php b/Zotlabs/Daemon/File_importer.php
new file mode 100644
index 000000000..7067e152d
--- /dev/null
+++ b/Zotlabs/Daemon/File_importer.php
@@ -0,0 +1,71 @@
+<?php
+
+namespace Zotlabs\Daemon;
+
+use Zotlabs\Web\HTTPSig;
+use Zotlabs\Lib\PConfig;
+
+
+require_once('include/cli_startup.php');
+require_once('include/attach.php');
+require_once('include/import.php');
+
+class File_importer {
+
+ static public function run($argc,$argv) {
+
+ cli_startup();
+
+ $page = $argv[1];
+ $channel_address = $argv[2];
+ $hz_server = urldecode($argv[3]);
+
+ $m = parse_url($hz_server);
+
+ $channel = channelx_by_nick($channel_address);
+ if(! $channel) {
+ logger('channel not found');
+ return;
+ }
+
+ $headers = [
+ 'X-API-Token' => random_string(),
+ 'X-API-Request' => $hz_server . '/api/z/1.0/file/export_page?f=records=1&page=' . $page,
+ 'Host' => $m['host'],
+ '(request-target)' => 'get /api/z/1.0/file/export_page?f=records=1&page=' . $page,
+ ];
+
+ $headers = HTTPSig::create_sig($headers,$channel['channel_prvkey'],channel_url($channel),true,'sha512');
+
+ // TODO: implement total count
+ $x = z_fetch_url($hz_server . '/api/z/1.0/file/export_page?f=records=1&page=' . $page, false, $redirects, [ 'headers' => $headers ]);
+ // logger('file fetch: ' . print_r($x,true));
+
+ if(! $x['success']) {
+ logger('no API response',LOGGER_DEBUG);
+ killme();
+ }
+
+ $j = json_decode($x['body'],true);
+
+ if(! is_array($j['results'][0]['attach']) || ! count($j['results'][0]['attach'])) {
+ PConfig::Set($channel['channel_id'], 'import', 'files_completed', 1);
+ return;
+ }
+
+ $r = sync_files($channel, $j['results']);
+
+ PConfig::Set($channel['channel_id'], 'import', 'files_progress', [
+ 'files_total' => $j['total'],
+ 'files_page' => 1, // export page atm returns just one file
+ 'last_page' => $page,
+ 'next_cmd' => ['File_importer',sprintf('%d',$page + 1), $channel['channel_address'], urlencode($hz_server)]
+ ]);
+
+ $page++;
+
+ Master::Summon([ 'File_importer',sprintf('%d',$page), $channel['channel_address'], urlencode($hz_server) ]);
+
+ return;
+ }
+}
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php
index 0ae887932..368a9229d 100644
--- a/Zotlabs/Daemon/Notifier.php
+++ b/Zotlabs/Daemon/Notifier.php
@@ -95,7 +95,6 @@ class Notifier {
return;
}
-
self::$deliveries = [];
self::$recipients = [];
self::$env_recips = [];
@@ -170,7 +169,7 @@ class Notifier {
elseif ($cmd === 'refresh_all') {
logger('notifier: refresh_all: ' . $item_id);
- self::$channel = channelx_by_n($item_id);
+ self::$channel = channelx_by_n($item_id, true);
$r = q("select abook_xchan from abook where abook_channel = %d",
intval($item_id)
@@ -180,6 +179,11 @@ class Notifier {
self::$recipients[] = $rr['abook_xchan'];
}
}
+
+ // In case we deleted the channel, our abook entry has already vanished.
+ // In order to be able to update our clones we need to add ourself here.
+ self::$recipients[] = self::$channel['channel_hash'];
+
self::$private = false;
self::$packet_type = 'refresh';
}
@@ -190,14 +194,14 @@ class Notifier {
return;
}
- self::$channel = channelx_by_n($item_id);
+ self::$channel = channelx_by_n($item_id, true);
self::$recipients = [$xchan];
self::$private = true;
self::$packet_type = 'purge';
}
elseif ($cmd === 'purge_all') {
logger('notifier: purge_all: ' . $item_id);
- self::$channel = channelx_by_n($item_id);
+ self::$channel = channelx_by_n($item_id, true);
self::$recipients = [];
self::$private = false;
self::$packet_type = 'purge';
@@ -443,7 +447,6 @@ class Notifier {
}
}
-
$narr = [
'channel' => self::$channel,
'upstream' => $upstream,
@@ -526,16 +529,18 @@ class Notifier {
*/
- $hublist = []; // this provides an easily printable list for the logs
- $dhubs = []; // delivery hubs where we store our resulting unique array
- $keys = []; // array of keys to check uniquness for zot hubs
- $urls = []; // array of urls to check uniqueness of hubs from other networks
- $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all
- $dead = []; // known dead hubs - report them as undeliverable
+ $hublist = []; // this provides an easily printable list for the logs
+ $dhubs = []; // delivery hubs where we store our resulting unique array
+ $keys = []; // array of keys to check uniquness for zot hubs
+ $urls = []; // array of urls to check uniqueness of hubs from other networks
+ $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all
+ $dead_hosts = []; // known dead hubs - report them as undeliverable
foreach ($hubs as $hub) {
if (isset($hub['site_dead']) && intval($hub['site_dead'])) {
- $dead[] = $hub;
+ if(!in_array($hub['hubloc_host'], $dead_hosts)) {
+ $dead_hosts[] = $hub['hubloc_host'];
+ }
continue;
}
@@ -674,21 +679,19 @@ class Notifier {
do_delivery(self::$deliveries);
}
- if ($dead) {
- foreach ($dead as $deceased) {
- if (is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) {
- q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue )
- values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ",
- dbesc($target_item['mid']),
- dbesc($deceased['hubloc_host']),
- dbesc($deceased['hubloc_host']),
- dbesc($deceased['hubloc_host']),
- dbesc('undeliverable/unresponsive site'),
- dbesc(datetime_convert()),
- dbesc(self::$channel['channel_hash']),
- dbesc(new_uuid())
- );
- }
+ if ($dead_hosts && is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) {
+ foreach ($dead_hosts as $deceased_host) {
+ $r = q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue )
+ values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ",
+ dbesc($target_item['mid']),
+ dbesc($deceased_host),
+ dbesc($deceased_host),
+ dbesc($deceased_host),
+ dbesc('undeliverable/unresponsive site'),
+ dbesc(datetime_convert()),
+ dbesc(self::$channel['channel_hash']),
+ dbesc(new_uuid())
+ );
}
}
diff --git a/Zotlabs/Daemon/Onepoll.php b/Zotlabs/Daemon/Onepoll.php
index 5374f49d5..79fd06df9 100644
--- a/Zotlabs/Daemon/Onepoll.php
+++ b/Zotlabs/Daemon/Onepoll.php
@@ -48,15 +48,11 @@ class Onepoll {
$contact = $contacts[0];
$importer_uid = $contact['abook_channel'];
- $r = q("SELECT * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1",
- intval($importer_uid)
- );
+ $importer = channelx_by_n($importer_uid);
- if (!$r)
+ if (!$importer)
return;
- $importer = $r[0];
-
logger("onepoll: poll: ({$contact['id']}) IMPORTER: {$importer['xchan_name']}, CONTACT: {$contact['xchan_name']}");
$last_update = ((($contact['abook_updated'] === $contact['abook_created']) || ($contact['abook_updated'] <= NULL_DATE))
@@ -135,19 +131,34 @@ class Onepoll {
$url = $cl['outbox'];
}
else {
- $url = str_replace('/poco/', '/zotfeed/', $contact['xchan_connurl']);
+ $url = str_replace('/poco/', '/outbox/', $contact['xchan_connurl']);
}
if ($url) {
logger('fetching outbox');
- $url = $url . '?date_begin=' . urlencode($last_update);
+ $url = $url . '?date_begin=' . urlencode($last_update);
+
+ if($contact['xchan_network'] === 'zot6') {
+ $url = $url . '&top=1';
+ }
+
$obj = new ASCollection($url, $importer, 0, $max);
$messages = $obj->get();
+
if ($messages) {
foreach ($messages as $message) {
if (is_string($message)) {
$message = Activity::fetch($message, $importer);
}
+
+ if ($contact['xchan_network'] === 'zot6') {
+ // make sure we only fetch top level items
+ if ($message['type'] === 'Create' && !isset($message['object']['inReplyTo'])) {
+ Libzot::fetch_conversation($importer, $message['object']['id']);
+ }
+ continue;
+ }
+
$AS = new ActivityStreams($message);
if ($AS->is_valid() && is_array($AS->obj)) {
$item = Activity::decode_note($AS);