diff options
Diffstat (limited to 'Zotlabs/Daemon')
-rw-r--r-- | Zotlabs/Daemon/Channel_purge.php | 34 | ||||
-rw-r--r-- | Zotlabs/Daemon/Content_importer.php | 77 | ||||
-rw-r--r-- | Zotlabs/Daemon/Cron.php | 18 | ||||
-rw-r--r-- | Zotlabs/Daemon/Cron_daily.php | 2 | ||||
-rw-r--r-- | Zotlabs/Daemon/Deliver.php | 4 | ||||
-rw-r--r-- | Zotlabs/Daemon/Delxitems.php | 25 | ||||
-rw-r--r-- | Zotlabs/Daemon/Directory.php | 9 | ||||
-rw-r--r-- | Zotlabs/Daemon/Expire.php | 1 | ||||
-rw-r--r-- | Zotlabs/Daemon/Externals.php | 193 | ||||
-rw-r--r-- | Zotlabs/Daemon/File_importer.php | 71 | ||||
-rw-r--r-- | Zotlabs/Daemon/Gprobe.php | 28 | ||||
-rw-r--r-- | Zotlabs/Daemon/Notifier.php | 672 | ||||
-rw-r--r-- | Zotlabs/Daemon/Onedirsync.php | 14 | ||||
-rw-r--r-- | Zotlabs/Daemon/Onepoll.php | 62 | ||||
-rw-r--r-- | Zotlabs/Daemon/Poller.php | 11 | ||||
-rw-r--r-- | Zotlabs/Daemon/Queue.php | 17 | ||||
-rw-r--r-- | Zotlabs/Daemon/Ratenotif.php | 126 | ||||
-rw-r--r-- | Zotlabs/Daemon/Thumbnail.php | 1 |
18 files changed, 641 insertions, 724 deletions
diff --git a/Zotlabs/Daemon/Channel_purge.php b/Zotlabs/Daemon/Channel_purge.php new file mode 100644 index 000000000..9fceb0fb9 --- /dev/null +++ b/Zotlabs/Daemon/Channel_purge.php @@ -0,0 +1,34 @@ +<?php + +namespace Zotlabs\Daemon; + +class Channel_purge { + + static public function run($argc,$argv) { + + cli_startup(); + + $channel_id = intval($argv[1]); + + $channel = q("select * from channel where channel_id = %d and channel_removed = 1", + intval($channel_id) + ); + + if (! $channel) { + return; + } + + do { + $r = q("select id from item where uid = %d and item_deleted = 0 limit 1000", + intval($channel_id) + ); + if ($r) { + foreach ($r as $rv) { + drop_item($rv['id'], false); + } + } + } while ($r); + + return; + } +} diff --git a/Zotlabs/Daemon/Content_importer.php b/Zotlabs/Daemon/Content_importer.php new file mode 100644 index 000000000..67f1c8e80 --- /dev/null +++ b/Zotlabs/Daemon/Content_importer.php @@ -0,0 +1,77 @@ +<?php + +namespace Zotlabs\Daemon; + +use Zotlabs\Web\HTTPSig; +use Zotlabs\Lib\PConfig; + + +require_once('include/cli_startup.php'); +require_once('include/attach.php'); +require_once('include/import.php'); + +class Content_importer { + + static public function run($argc,$argv) { + cli_startup(); + + $page = $argv[1]; + $since = $argv[2]; + $until = $argv[3]; + $channel_address = $argv[4]; + $hz_server = urldecode($argv[5]); + + $m = parse_url($hz_server); + + $channel = channelx_by_nick($channel_address); + if(! $channel) { + logger('channel not found'); + return; + } + + $headers = [ + 'X-API-Token' => random_string(), + 'X-API-Request' => $hz_server . '/api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page , + 'Host' => $m['host'], + '(request-target)' => 'get /api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page , + ]; + + $headers = HTTPSig::create_sig($headers,$channel['channel_prvkey'], channel_url($channel),true,'sha512'); + + $x = z_fetch_url($hz_server . '/api/z/1.0/item/export_page?f=&since=' . urlencode($since) . '&until=' . urlencode($until) . '&page=' . $page,false,$redirects,[ 'headers' => $headers ]); + + // logger('item fetch: ' . print_r($x,true)); + + if(! $x['success']) { + logger('no API response',LOGGER_DEBUG); + killme(); + } + + $j = json_decode($x['body'],true); + + if(! is_array($j['item']) || ! count($j['item'])) { + PConfig::Set($channel['channel_id'], 'import', 'content_completed', 1); + return; + } + + $saved_notification_flags = notifications_off($channel['channel_id']); + + import_items($channel,$j['item'],false,((array_key_exists('relocate',$j)) ? $j['relocate'] : null)); + + notifications_on($channel['channel_id'], $saved_notification_flags); + + PConfig::Set($channel['channel_id'], 'import', 'content_progress', [ + 'items_total' => $j['items_total'], + 'items_page' => $j['items_page'], + 'items_current_page' => count($j['item']), + 'last_page' => $page, + 'next_cmd' => ['Content_importer', sprintf('%d',$page + 1), $since, $until, $channel['channel_address'], urlencode($hz_server)] + ]); + + $page++; + + Master::Summon([ 'Content_importer', sprintf('%d',$page), $since, $until, $channel['channel_address'], urlencode($hz_server) ]); + + return; + } +} diff --git a/Zotlabs/Daemon/Cron.php b/Zotlabs/Daemon/Cron.php index 5c0330806..6629491de 100644 --- a/Zotlabs/Daemon/Cron.php +++ b/Zotlabs/Daemon/Cron.php @@ -36,7 +36,6 @@ class Cron { // run queue delivery process in the background Master::Summon(array('Queue')); - Master::Summon(array('Poller')); /** @@ -48,13 +47,6 @@ class Cron { db_quoteinterval('3 MINUTE') ); - // expire any expired mail - - q("delete from mail where expires > '%s' and expires < %s ", - dbesc(NULL_DATE), - db_utcnow() - ); - require_once('include/account.php'); remove_expired_registrations(); @@ -165,11 +157,6 @@ class Cron { } } - - // check if any connections transitioned to zot6 and upgrade the connections to zot6 at this hub if so. - require_once('include/connections.php'); - z6trans_connections(); - require_once('include/attach.php'); attach_upgrade(); @@ -218,10 +205,9 @@ class Cron { // pull in some public posts -/* $disable_discover_tab = get_config('system', 'disable_discover_tab') || get_config('system', 'disable_discover_tab') === false; + $disable_discover_tab = get_config('system', 'disable_discover_tab') || get_config('system', 'disable_discover_tab') === false; if (!$disable_discover_tab) - Master::Summon(array('Externals')); -*/ + Master::Summon(['Externals']); $restart = false; diff --git a/Zotlabs/Daemon/Cron_daily.php b/Zotlabs/Daemon/Cron_daily.php index bebccca9d..d1b74a032 100644 --- a/Zotlabs/Daemon/Cron_daily.php +++ b/Zotlabs/Daemon/Cron_daily.php @@ -95,8 +95,6 @@ class Cron_daily { remove_obsolete_hublocs(); remove_duplicate_singleton_hublocs(); - z6_discover(); - $date = datetime_convert(); call_hooks('cron_daily', $date); diff --git a/Zotlabs/Daemon/Deliver.php b/Zotlabs/Daemon/Deliver.php index f8149ee69..400ef697b 100644 --- a/Zotlabs/Daemon/Deliver.php +++ b/Zotlabs/Daemon/Deliver.php @@ -2,7 +2,7 @@ namespace Zotlabs\Daemon; -require_once('include/queue_fn.php'); +use Zotlabs\Lib\Queue; class Deliver { @@ -23,7 +23,7 @@ class Deliver { ); if ($r) { - queue_deliver($r[0], true); + Queue::deliver($r[0], true); } } diff --git a/Zotlabs/Daemon/Delxitems.php b/Zotlabs/Daemon/Delxitems.php new file mode 100644 index 000000000..264354ca0 --- /dev/null +++ b/Zotlabs/Daemon/Delxitems.php @@ -0,0 +1,25 @@ +<?php + +namespace Zotlabs\Daemon; + +require_once('include/connections.php'); + +/* + * Daemon to remove 'item' resources in the background from a removed connection + */ + +class Delxitems { + + static public function run($argc, $argv) { + + cli_startup(); + + if($argc != 3) { + return; + } + + remove_abook_items($argv[1], $argv[2]); + + return; + } +} diff --git a/Zotlabs/Daemon/Directory.php b/Zotlabs/Daemon/Directory.php index 1f307b273..3996b8079 100644 --- a/Zotlabs/Daemon/Directory.php +++ b/Zotlabs/Daemon/Directory.php @@ -49,8 +49,9 @@ class Directory { ); // Now update all the connections - if ($pushall) + if ($pushall) { Master::Summon(array('Notifier', 'refresh_all', $channel['channel_id'])); + } return; } @@ -74,7 +75,7 @@ class Directory { * the directory packet. That means we'll try again on the next poll run. */ - $hash = random_string(); + $hash = new_uuid(); Queue::insert(array( 'hash' => $hash, @@ -93,8 +94,8 @@ class Directory { } // Now update all the connections - if ($pushall) + if ($pushall) { Master::Summon(array('Notifier', 'refresh_all', $channel['channel_id'])); - + } } } diff --git a/Zotlabs/Daemon/Expire.php b/Zotlabs/Daemon/Expire.php index c4ff8aec6..99fe68b6f 100644 --- a/Zotlabs/Daemon/Expire.php +++ b/Zotlabs/Daemon/Expire.php @@ -2,6 +2,7 @@ namespace Zotlabs\Daemon; +require_once('include/items.php'); class Expire { diff --git a/Zotlabs/Daemon/Externals.php b/Zotlabs/Daemon/Externals.php index 064b3f71d..fb35a65c7 100644 --- a/Zotlabs/Daemon/Externals.php +++ b/Zotlabs/Daemon/Externals.php @@ -3,6 +3,7 @@ namespace Zotlabs\Daemon; use Zotlabs\Lib\Activity; +use Zotlabs\Lib\Libzot; use Zotlabs\Lib\ActivityStreams; use Zotlabs\Lib\ASCollection; @@ -31,25 +32,55 @@ class Externals { $url = $arr['url']; } else { + $networks = ['zot6']; + + if (plugin_is_installed('pubcrawl')) { + $networks[] = 'activitypub'; + } + + stringify_array_elms($networks); + $networks_str = implode(',', $networks); + $randfunc = db_getfunc('RAND'); // fixme this query does not deal with directory realms. - - $r = q("select site_url, site_pull from site where site_url != '%s' - and site_flags != %d and site_type = %d - and site_dead = 0 and site_project like '%s' and site_version > '5.3.1' order by $randfunc limit 1", + //$r = q("select site_url, site_pull from site where site_url != '%s' + //and site_flags != %d and site_type = %d + //and site_dead = 0 and site_project like '%s' and site_version > '5.3.1' order by $randfunc limit 1", + //dbesc(z_root()), + //intval(DIRECTORY_MODE_STANDALONE), + //intval(SITE_TYPE_ZOT), + //dbesc('hubzilla%') + //); + + $r = q("SELECT * FROM hubloc + LEFT JOIN abook ON abook_xchan = hubloc_hash + LEFT JOIN site ON site_url = hubloc_url WHERE + hubloc_network IN ( $networks_str ) AND + abook_xchan IS NULL AND + hubloc_url != '%s' AND + hubloc_updated > '%s' AND + hubloc_primary = 1 AND hubloc_deleted = 0 AND + site_dead = 0 + ORDER BY $randfunc LIMIT 1", dbesc(z_root()), - intval(DIRECTORY_MODE_STANDALONE), - intval(SITE_TYPE_ZOT), - dbesc('hubzilla%') + datetime_convert('UTC', 'UTC', 'now - 30 days') ); - if ($r) - $url = $r[0]['site_url']; + + $contact = $r[0]; + + if ($contact) { + $url = $contact['hubloc_id_url']; + } + } + + if (!$url) { + continue; } $blacklisted = false; - if (!check_siteallowed($url)) { + if (!check_siteallowed($contact['hubloc_url'])) { logger('blacklisted site: ' . $url); $blacklisted = true; } @@ -59,123 +90,67 @@ class Externals { // make sure we can eventually break out if somebody blacklists all known sites if ($blacklisted) { - if ($attempts > 20) + if ($attempts > 5) break; $attempts--; continue; } - if ($url) { - - $max = intval(get_config('system', 'max_imported_posts', 30)); - if (intval($max)) { - logger('externals: fetching outbox'); - - $feed_url = $url . '/zotfeed'; - $obj = new ASCollection($feed_url, $importer, 0, $max); - $messages = $obj->get(); + $cl = Activity::get_actor_collections($contact['hubloc_hash']); + if(empty($cl)) { + $cl = get_xconfig($contact['hubloc_hash'], 'activitypub', 'collections'); + } - if ($messages) { - foreach ($messages as $message) { - if (is_string($message)) { - $message = Activity::fetch($message, $importer); - } - $AS = new ActivityStreams($message); - if ($AS->is_valid() && is_array($AS->obj)) { - $item = Activity::decode_note($AS); - Activity::store($importer, $importer['xchan_hash'], $AS, $item, true); - $total++; - } - } - } - logger('externals: import_public_posts: ' . $total . ' messages imported', LOGGER_DEBUG); + if (is_array($cl) && array_key_exists('outbox', $cl)) { + $url = $cl['outbox']; + } + else { + $url = str_replace('/channel/', '/outbox/', $contact['hubloc_id_url']); + if ($url) { + $url .= '?top=1'; } } - } - return; - - /* $total = 0; - $attempts = 0; - - logger('externals: startup', LOGGER_DEBUG); - // pull in some public posts - - while ($total == 0 && $attempts < 3) { - $arr = ['url' => '']; - call_hooks('externals_url_select', $arr); - - if ($arr['url']) { - $url = $arr['url']; - } - else { - $randfunc = db_getfunc('RAND'); - - // fixme this query does not deal with directory realms. - - $r = q("select site_url, site_pull from site where site_url != '%s' and site_flags != %d and site_type = %d and site_dead = 0 order by $randfunc limit 1", - dbesc(z_root()), - intval(DIRECTORY_MODE_STANDALONE), - intval(SITE_TYPE_ZOT) - ); - if ($r) - $url = $r[0]['site_url']; - } + if ($url) { + logger('fetching outbox: ' . $url); - $blacklisted = false; + $obj = new ASCollection($url, $importer, 0, 10); + $messages = $obj->get(); - if (!check_siteallowed($url)) { - logger('blacklisted site: ' . $url); - $blacklisted = true; - } + if ($messages) { + foreach ($messages as $message) { + if (is_string($message)) { + $message = Activity::fetch($message, $importer); + } - $attempts++; + if ($message['type'] !== 'Create') { + continue; + } - // make sure we can eventually break out if somebody blacklists all known sites + if ($contact['hubloc_network'] === 'zot6') { + // make sure we only fetch top level items + if (isset($message['object']['inReplyTo'])) { + continue; + } - if ($blacklisted) { - if ($attempts > 20) - break; - $attempts--; - continue; - } + $obj_id = isset($message['object']['id']) ?? $message['object']; - if ($url) { - if ($r[0]['site_pull'] > NULL_DATE) - $mindate = urlencode(datetime_convert('', '', $r[0]['site_pull'] . ' - 1 day')); - else { - $days = get_config('externals', 'since_days'); - if ($days === false) - $days = 15; - $mindate = urlencode(datetime_convert('', '', 'now - ' . intval($days) . ' days')); + Libzot::fetch_conversation($importer, $obj_id); + $total++; + continue; } - $feedurl = $url . '/zotfeed?f=&mindate=' . $mindate; - - logger('externals: pulling public content from ' . $feedurl, LOGGER_DEBUG); - - $x = z_fetch_url($feedurl); - if (($x) && ($x['success'])) { - - q("update site set site_pull = '%s' where site_url = '%s'", - dbesc(datetime_convert()), - dbesc($url) - ); - - $j = json_decode($x['body'], true); - if ($j['success'] && $j['messages']) { - $sys = get_sys_channel(); - foreach ($j['messages'] as $message) { - // on these posts, clear any route info. - $message['route'] = ''; - process_delivery(['hash' => 'undefined'], get_item_elements($message), - [['hash' => $sys['xchan_hash']]], false, true); - $total++; - } - logger('externals: import_public_posts: ' . $total . ' messages imported', LOGGER_DEBUG); - } + $AS = new ActivityStreams($message); + if ($AS->is_valid() && is_array($AS->obj)) { + $item = Activity::decode_note($AS); + Activity::store($importer, $contact['abook_xchan'], $AS, $item); + $total++; } } - }*/ + } + logger('fetched messages count: ' . $total); + } + } + return; } } diff --git a/Zotlabs/Daemon/File_importer.php b/Zotlabs/Daemon/File_importer.php new file mode 100644 index 000000000..7067e152d --- /dev/null +++ b/Zotlabs/Daemon/File_importer.php @@ -0,0 +1,71 @@ +<?php + +namespace Zotlabs\Daemon; + +use Zotlabs\Web\HTTPSig; +use Zotlabs\Lib\PConfig; + + +require_once('include/cli_startup.php'); +require_once('include/attach.php'); +require_once('include/import.php'); + +class File_importer { + + static public function run($argc,$argv) { + + cli_startup(); + + $page = $argv[1]; + $channel_address = $argv[2]; + $hz_server = urldecode($argv[3]); + + $m = parse_url($hz_server); + + $channel = channelx_by_nick($channel_address); + if(! $channel) { + logger('channel not found'); + return; + } + + $headers = [ + 'X-API-Token' => random_string(), + 'X-API-Request' => $hz_server . '/api/z/1.0/file/export_page?f=records=1&page=' . $page, + 'Host' => $m['host'], + '(request-target)' => 'get /api/z/1.0/file/export_page?f=records=1&page=' . $page, + ]; + + $headers = HTTPSig::create_sig($headers,$channel['channel_prvkey'],channel_url($channel),true,'sha512'); + + // TODO: implement total count + $x = z_fetch_url($hz_server . '/api/z/1.0/file/export_page?f=records=1&page=' . $page, false, $redirects, [ 'headers' => $headers ]); + // logger('file fetch: ' . print_r($x,true)); + + if(! $x['success']) { + logger('no API response',LOGGER_DEBUG); + killme(); + } + + $j = json_decode($x['body'],true); + + if(! is_array($j['results'][0]['attach']) || ! count($j['results'][0]['attach'])) { + PConfig::Set($channel['channel_id'], 'import', 'files_completed', 1); + return; + } + + $r = sync_files($channel, $j['results']); + + PConfig::Set($channel['channel_id'], 'import', 'files_progress', [ + 'files_total' => $j['total'], + 'files_page' => 1, // export page atm returns just one file + 'last_page' => $page, + 'next_cmd' => ['File_importer',sprintf('%d',$page + 1), $channel['channel_address'], urlencode($hz_server)] + ]); + + $page++; + + Master::Summon([ 'File_importer',sprintf('%d',$page), $channel['channel_address'], urlencode($hz_server) ]); + + return; + } +} diff --git a/Zotlabs/Daemon/Gprobe.php b/Zotlabs/Daemon/Gprobe.php index 29efcf475..1124ead54 100644 --- a/Zotlabs/Daemon/Gprobe.php +++ b/Zotlabs/Daemon/Gprobe.php @@ -15,19 +15,31 @@ class Gprobe { return; $url = hex2bin($argv[1]); + $is_webbie = false; + $r = null; - if (!strpos($url, '@')) - return; + if (filter_var($url, FILTER_VALIDATE_EMAIL)) { + $is_webbie = true; - $r = q("select * from hubloc where hubloc_addr = '%s' and hubloc_network = 'zot6' limit 1", - dbesc($url) - ); + $r = q("select * from hubloc where hubloc_addr = '%s' and hubloc_network = 'zot6' limit 1", + dbesc($url) + ); + } + elseif (filter_var($url, FILTER_VALIDATE_URL)) { + $r = q("select * from hubloc where hubloc_id_url = '%s' and hubloc_network = 'zot6' limit 1", + dbesc($url) + ); + } if (!$r) { - $href = Webfinger::zot_url(punify($url)); - if ($href) { - $zf = Zotfinger::exec($href, null); + if ($is_webbie) { + $url = Webfinger::zot_url(punify($url)); } + + if ($url) { + $zf = Zotfinger::exec($url, null); + } + if (is_array($zf) && array_path_exists('signature/signer', $zf) && $zf['signature']['signer'] === $href && intval($zf['signature']['header_valid'])) { Libzot::import_xchan($zf['data']); } diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php index 741078422..8aee08fe6 100644 --- a/Zotlabs/Daemon/Notifier.php +++ b/Zotlabs/Daemon/Notifier.php @@ -5,24 +5,16 @@ namespace Zotlabs\Daemon; use Zotlabs\Lib\Libzot; use Zotlabs\Lib\Activity; use Zotlabs\Lib\Queue; +use Zotlabs\Lib\LDSignatures; -require_once('include/queue_fn.php'); require_once('include/html2plain.php'); require_once('include/conversation.php'); -require_once('include/zot.php'); require_once('include/items.php'); require_once('include/bbcode.php'); /* - * This file was at one time responsible for doing all deliveries, but this caused - * big problems on shared hosting systems, where the process might get killed by the - * hosting provider and nothing would get delivered. - * It now only delivers one message under certain cases, and invokes a queued - * delivery mechanism (include/deliver.php) to deliver individual contacts at - * controlled intervals. - * This has a much better chance of surviving random processes getting killed - * by the hosting provider. + * Notifier - message dispatch and preparation for delivery * * The basic flow is: * Identify the type of message @@ -53,7 +45,6 @@ require_once('include/bbcode.php'); * event (in events.php) * expire (in items.php) * like (in like.php, poke.php) - * mail (in message.php) * tag (in photos.php, poke.php, tagger.php) * tgroup (in items.php) * wall-new (in photos.php, item.php) @@ -63,7 +54,6 @@ require_once('include/bbcode.php'); * ZOT * permission_create abook_id * permission_accept abook_id - * permission_reject abook_id * permission_update abook_id * refresh_all channel_id * purge channel_id xchan_hash @@ -71,7 +61,6 @@ require_once('include/bbcode.php'); * expire channel_id * relay item_id (item was relayed to owner, we will deliver it as owner) * single_activity item_id (deliver to a singleton network from the appropriate clone) - * single_mail mail_id (deliver to a singleton network from the appropriate clone) * location channel_id * request channel_id xchan_hash message_id * rating xlink_id @@ -82,164 +71,126 @@ require_once('include/bbcode.php'); class Notifier { + static public $deliveries = []; + static public $recipients = []; + static public $env_recips = []; + static public $packet_type = 'activity'; + static public $encoding = 'activitystreams'; + static public $encoded_item = null; + static public $channel = null; + static public $private = false; + // $fragment can contain additional info to omit de-duplication in the queueworker. + // E.g. if an item is updated many times in a row from different sources (multiple vote updates) the + // update source mid or a timestamp or random string can be added. + static public $fragment = null; + static public function run($argc, $argv) { - if ($argc < 3) + if ($argc < 3) { return; + } logger('notifier: invoked: ' . print_r($argv, true), LOGGER_DEBUG); $cmd = $argv[1]; - $item_id = $argv[2]; - if (!$item_id) + if (!$item_id) { return; - - $sys = get_sys_channel(); - - $deliveries = []; - - $request = false; - $mail = false; - $location = false; - $recipients = []; - $normal_mode = true; - $packet_type = 'undefined'; - - if ($cmd === 'mail' || $cmd === 'single_mail') { - $normal_mode = false; - $mail = true; - $private = true; - $message = q("SELECT * FROM mail WHERE id = %d LIMIT 1", - intval($item_id) - ); - if (!$message) { - return; - } - xchan_mail_query($message[0]); - $uid = $message[0]['channel_id']; - $recipients[] = $message[0]['from_xchan']; // include clones - $recipients[] = $message[0]['to_xchan']; - $item = $message[0]; - $encoded_item = encode_mail($item); - - $s = q("select * from channel where channel_id = %d limit 1", - intval($uid) - ); - if ($s) - $channel = $s[0]; - } - elseif ($cmd === 'request') { - $channel_id = $item_id; - $xchan = $argv[3]; - $request_message_id = $argv[4]; - - $s = q("select * from channel where channel_id = %d limit 1", - intval($channel_id) - ); - if ($s) - $channel = $s[0]; - $private = true; - $recipients[] = $xchan; - $packet_type = 'request'; - $normal_mode = false; - } - elseif ($cmd === 'keychange') { - $channel = channelx_by_n($item_id); - - $r = q("select abook_xchan from abook where abook_channel = %d", + self::$deliveries = []; + self::$recipients = []; + self::$env_recips = []; + self::$packet_type = 'activity'; + self::$encoding = 'activitystreams'; + self::$encoded_item = null; + self::$channel = null; + self::$private = false; + self::$fragment = null; + + $sys = get_sys_channel(); + $normal_mode = true; + + if ($cmd === 'keychange') { + self::$channel = channelx_by_n($item_id); + $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); if ($r) { foreach ($r as $rr) { - $recipients[] = $rr['abook_xchan']; + self::$recipients[] = $rr['abook_xchan']; } } - $private = false; - $packet_type = 'keychange'; - $normal_mode = false; + self::$private = false; + self::$packet_type = 'keychange'; + self::$encoded_item = get_pconfig(self::$channel['channel_id'], 'system', 'keychange'); + self::$encoding = 'zot'; + $normal_mode = false; } - elseif (in_array($cmd, ['permission_update', 'permission_reject', 'permission_accept', 'permission_create'])) { + elseif (in_array($cmd, ['permission_update', 'permission_accept', 'permission_create'])) { // Get the (single) recipient $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", intval($item_id) ); if ($r) { - $uid = $r[0]['abook_channel']; + $recip = $r[0]; + // Get the sender - $channel = channelx_by_n($uid); - if ($channel) { - $perm_update = ['sender' => $channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => '']; - - if ($cmd === 'permission_create') - call_hooks('permissions_create', $perm_update); - elseif ($cmd === 'permission_accept') - call_hooks('permissions_accept', $perm_update); - elseif ($cmd === 'permission_reject') - call_hooks('permissions_reject', $perm_update); - else - call_hooks('permissions_update', $perm_update); + self::$channel = channelx_by_n($recip['abook_channel']); + if (self::$channel) { + $perm_update = ['sender' => self::$channel, 'recipient' => $recip, 'success' => false, 'deliveries' => '']; + + switch ($cmd) { + case 'permission_create': + call_hooks('permissions_create', $perm_update); + break; + case 'permission_accept': + call_hooks('permissions_accept', $perm_update); + break; + case 'permission_update': + call_hooks('permissions_update', $perm_update); + break; + default: + break; + } if ($perm_update['success']) { if ($perm_update['deliveries']) { - $deliveries[] = $perm_update['deliveries']; - do_delivery($deliveries); + self::$deliveries[] = $perm_update['deliveries']; + do_delivery(self::$deliveries); } return; } else { - $recipients[] = $r[0]['abook_xchan']; - $private = false; - $packet_type = 'refresh'; - $packet_recips = [['guid' => $r[0]['xchan_guid'], 'guid_sig' => $r[0]['xchan_guid_sig'], 'hash' => $r[0]['xchan_hash']]]; + self::$recipients[] = $recip['abook_xchan']; + self::$private = false; + self::$packet_type = 'refresh'; + self::$env_recips = [$recip['xchan_hash']]; } } } } elseif ($cmd === 'refresh_all') { logger('notifier: refresh_all: ' . $item_id); - $uid = $item_id; - $channel = channelx_by_n($item_id); - - $r = q("select abook_xchan from abook where abook_channel = %d", - intval($uid) - ); - if ($r) { - foreach ($r as $rr) { - $recipients[] = $rr['abook_xchan']; - } - } - $private = false; - $packet_type = 'refresh'; - } - elseif ($cmd === 'location') { - logger('notifier: location: ' . $item_id); - $s = q("select * from channel where channel_id = %d limit 1", - intval($item_id) - ); - if ($s) - $channel = $s[0]; - $uid = $item_id; - $recipients = []; + self::$channel = channelx_by_n($item_id, true); - $r = q("select abook_xchan from abook where abook_channel = %d", - intval($uid) + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) ); if ($r) { foreach ($r as $rr) { - $recipients[] = $rr['abook_xchan']; + self::$recipients[] = $rr['abook_xchan']; } } - $encoded_item = ['locations' => Libzot::encode_locations($channel), 'type' => 'location', 'encoding' => 'zot']; - $target_item = ['aid' => $channel['channel_account_id'], 'uid' => $channel['channel_id']]; - $private = false; - $packet_type = 'location'; - $location = true; + // In case we deleted the channel, our abook entry has already vanished. + // In order to be able to update our clones we need to add ourself here. + self::$recipients[] = self::$channel['channel_hash']; + + self::$private = false; + self::$packet_type = 'refresh'; } elseif ($cmd === 'purge') { $xchan = $argv[3]; @@ -248,33 +199,27 @@ class Notifier { return; } - $channel = channelx_by_n($item_id); - $recipients[] = $xchan; - $private = true; - $packet_type = 'purge'; - $packet_recips[] = ['hash' => $xchan]; + self::$channel = channelx_by_n($item_id, true); + self::$recipients = [$xchan]; + self::$private = true; + self::$packet_type = 'purge'; } elseif ($cmd === 'purge_all') { - logger('notifier: purge_all: ' . $item_id); - $channel = channelx_by_n($item_id); + self::$channel = channelx_by_n($item_id, true); + self::$recipients = []; + self::$private = false; + self::$packet_type = 'purge'; - $recipients = []; - $r = q("select abook_xchan from abook where abook_channel = %d and abook_self = 0", + $r = q("select abook_xchan from abook where abook_channel = %d and abook_self = 0", intval($item_id) ); if (!$r) { return; } foreach ($r as $rr) { - $recipients[] = $rr['abook_xchan']; - $packet_recips[] = ['hash' => $rr['abook_xchan']]; + self::$recipients[] = $rr['abook_xchan']; } - - $private = false; - $packet_type = 'purge'; - - } else { @@ -282,20 +227,21 @@ class Notifier { // Fetch the target item - $r = q("SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1", + self::$fragment = $argv[3] ?? ''; + + $r = q("SELECT * FROM item WHERE id = %d AND parent != 0", intval($item_id) ); - - if (!$r) + if (!$r) { return; + } xchan_query($r); - $r = fetch_post_tags($r); $target_item = $r[0]; - if (in_array($target_item['author']['xchan_network'], ['rss', 'anon'])) { + if (in_array($target_item['author']['xchan_network'], ['rss', 'anon', 'token'])) { logger('notifier: target item author is not a fetchable actor', LOGGER_DEBUG); return; } @@ -345,11 +291,12 @@ class Notifier { $s = q("select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", intval($target_item['uid']) ); - if ($s) - $channel = $s[0]; + if ($s) { + self::$channel = $s[0]; + } - if ($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) { - logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); + if (self::$channel['channel_hash'] !== $target_item['author_xchan'] && self::$channel['channel_hash'] !== $target_item['owner_xchan']) { + logger("notifier: Sending channel " . self::$channel['channel_hash'] . " is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); return; } @@ -359,12 +306,13 @@ class Notifier { } else { // fetch the parent item - $r = q("SELECT * from item where id = %d order by id asc", + $r = q("SELECT * FROM item WHERE id = %d", intval($target_item['parent']) ); - if (!$r) + if (!$r) { return; + } if (strpos($r[0]['postopts'], 'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG, LOG_NOTICE); @@ -379,29 +327,28 @@ class Notifier { } // avoid looping of discover items 12/4/2014 - - if ($sys && $parent_item['uid'] == $sys['channel_id']) + if ($sys && $parent_item['uid'] == $sys['channel_id']) { return; + } - $encoded_item = encode_item($target_item); - + $m = get_iconfig($target_item, 'activitypub', 'signed_data'); // Re-use existing signature unless the activity type changed to a Tombstone, which won't verify. - $m = ((intval($target_item['item_deleted'])) ? '' : get_iconfig($target_item, 'activitypub', 'signed_data')); - - if ($m) { - $activity = json_decode($m, true); + if ($m && (!intval($target_item['item_deleted']))) { + self::$encoded_item = json_decode($m, true); } else { - $activity = array_merge(['@context' => [ + + self::$encoded_item = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV ]], Activity::encode_activity($target_item) ); + self::$encoded_item['signature'] = LDSignatures::sign(self::$encoded_item, self::$channel); } logger('target_item: ' . print_r($target_item, true), LOGGER_DEBUG); - logger('encoded: ' . print_r($activity, true), LOGGER_DEBUG); + logger('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG); // Send comments to the owner to re-deliver to everybody in the conversation // We only do this if the item in question originated on this site. This prevents looping. @@ -412,9 +359,9 @@ class Notifier { // flag on comments for an extended period. So we'll also call comment_local_origin() which looks at // the hostname in the message_id and provides a second (fallback) opinion. - $relay_to_owner = (((!$top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item)) ? true : false); - - $uplink = false; + $relay_to_owner = (!$top_level_post && intval($target_item['item_origin']) && comment_local_origin($target_item)); + $uplink = false; + $upstream = false; // $cmd === 'relay' indicates the owner is sending it to the original recipients // don't allow the item in the relay command to relay to owner under any circumstances, it will loop @@ -426,27 +373,34 @@ class Notifier { if (($cmd === 'uplink') && intval($parent_item['item_uplink']) && (!$top_level_post)) { logger('notifier: uplink'); - $uplink = true; + $uplink = true; + self::$packet_type = 'response'; } if (($relay_to_owner || $uplink) && ($cmd !== 'relay')) { logger('notifier: followup relay', LOGGER_DEBUG); - $recipients = [($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']]; - $private = true; - if (!$encoded_item['flags']) - $encoded_item['flags'] = []; - $encoded_item['flags'][] = 'relay'; - $upstream = true; + $sendto = (($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); + self::$recipients = [$sendto]; + self::$private = true; + $upstream = true; + self::$packet_type = 'response'; } else { - logger('notifier: normal distribution', LOGGER_DEBUG); - if ($cmd === 'relay') - logger('notifier: owner relay'); - $upstream = false; + if ($cmd === 'relay') { + logger('owner relay (downstream delivery)'); + } + else { + logger('normal (downstream) distribution', LOGGER_DEBUG); + } + + if ($parent_item && $parent_item['item_private'] !== $target_item['item_private']) { + logger('conversation privacy mismatch - downstream delivery prevented'); + return; + } + // if our parent is a tag_delivery recipient, uplink to the original author causing // a delivery fork. - - if (($parent_item) && intval($parent_item['item_uplink']) && (!$top_level_post) && ($cmd !== 'uplink')) { + if ($parent_item && intval($parent_item['item_uplink']) && !$top_level_post && $cmd !== 'uplink') { // don't uplink a relayed post to the relay owner if ($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { logger('notifier: uplinking this item'); @@ -454,98 +408,87 @@ class Notifier { } } - $private = false; - $recipients = collect_recipients($parent_item, $private); + self::$private = false; + self::$recipients = collect_recipients($parent_item, self::$private); + + // FIXME add any additional recipients such as mentions, etc. if ($top_level_post) { // remove clones who will receive the post via sync - $recipients = array_diff($recipients, [$target_item['owner_xchan']]); + self::$recipients = array_values(array_diff(self::$recipients, [$target_item['owner_xchan']])); } - // FIXME add any additional recipients such as mentions, etc. - + // don't send deletions onward for other people's stuff + if (intval($target_item['item_deleted']) && (!intval($target_item['item_wall']))) { + logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE); + return; + } } } - $walltowall = (($top_level_post && $channel['xchan_hash'] === $target_item['author_xchan']) ? true : false); - // Generic delivery section, we have an encoded item and recipients // Now start the delivery process - $x = $encoded_item; - $x['title'] = 'private'; - $x['body'] = 'private'; - logger('notifier: encoded item: ' . print_r($x, true), LOGGER_DATA, LOG_DEBUG); + logger('encoded item: ' . print_r(self::$encoded_item, true), LOGGER_DATA, LOG_DEBUG); - //logger('notifier: encoded activity: ' . print_r($activity,true), LOGGER_DATA, LOG_DEBUG); - - stringify_array_elms($recipients); - if (!$recipients) { + stringify_array_elms(self::$recipients); + if (!self::$recipients) { logger('no recipients'); return; } - // logger('notifier: recipients: ' . print_r($recipients,true), LOGGER_NORMAL, LOG_DEBUG); - - $env_recips = (($private) ? [] : null); + // logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG); - $details = q("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',', $recipients)) . ")"); + if (!count(self::$env_recips)) { + self::$env_recips = ((self::$private) ? [] : null); + } $recip_list = []; + $details = dbq("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ")"); + if ($details) { foreach ($details as $d) { $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; - if ($private) { - $env_recips[] = [ - 'guid' => $d['xchan_guid'], - 'guid_sig' => $d['xchan_guid_sig'], - 'hash' => $d['xchan_hash'] - ]; + if (self::$private) { + self::$env_recips[] = $d['xchan_hash']; } } } - $narr = [ - 'channel' => $channel, + 'channel' => self::$channel, 'upstream' => $upstream, - 'env_recips' => $env_recips, - 'packet_recips' => $packet_recips, - 'recipients' => $recipients, - 'item' => $item, + 'env_recips' => self::$env_recips, + 'recipients' => self::$recipients, 'target_item' => $target_item, 'parent_item' => $parent_item, 'top_level_post' => $top_level_post, - 'private' => $private, + 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, - 'mail' => $mail, - 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), - 'location' => $location, - 'request' => $request, + 'single' => ($cmd === 'single_activity'), 'normal_mode' => $normal_mode, - 'packet_type' => $packet_type, - 'walltowall' => $walltowall, + 'packet_type' => self::$packet_type, 'queued' => [] ]; call_hooks('notifier_process', $narr); if ($narr['queued']) { foreach ($narr['queued'] as $pq) - $deliveries[] = $pq; + self::$deliveries[] = $pq; } // notifier_process can alter the recipient list - $recipients = $narr['recipients']; - $env_recips = $narr['env_recips']; - $packet_recips = $narr['packet_recips']; + self::$recipients = $narr['recipients']; + self::$env_recips = $narr['env_recips']; - if (($private) && (!$env_recips)) { + if (self::$private && !self::$env_recips) { // shouldn't happen logger('notifier: private message with no envelope recipients.' . print_r($argv, true), LOGGER_NORMAL, LOG_NOTICE); + return; } logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list, true), LOGGER_DEBUG); @@ -554,15 +497,15 @@ class Notifier { // Now we have collected recipients (except for external mentions, FIXME) // Let's reduce this to a set of hubs; checking that the site is not dead. - $hubs = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc left join site on site_url = hubloc_url - where hubloc_hash in (" . protect_sprintf(implode(',', $recipients)) . ") + $hubs = dbq("select hubloc.*, site.site_crypto, site.site_flags, site.site_dead from hubloc left join site on site_url = hubloc_url + where hubloc_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ") and hubloc_error = 0 and hubloc_deleted = 0" ); // public posts won't make it to the local public stream unless there's a recipient on this site. // This code block sees if it's a public post and localhost is missing, and if so adds an entry for the local sys channel to the $hubs list - if (!$private) { + if (!self::$private) { $found_localhost = false; if ($hubs) { foreach ($hubs as $h) { @@ -573,12 +516,12 @@ class Notifier { } } if (!$found_localhost) { - $localhub = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc - left join site on site_url = hubloc_url where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0", + $localhub = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_dead from hubloc left join site on site_url = hubloc_url + where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0", dbesc(z_root() . '/channel/sys') ); if ($localhub) { - $hubs = array_merge($hubs, $localhub); + $hubs = array_merge($localhub, $hubs); } } } @@ -595,41 +538,36 @@ class Notifier { */ - $hublist = []; // this provides an easily printable list for the logs - $dhubs = []; // delivery hubs where we store our resulting unique array - $keys = []; // array of keys to check uniquness for zot hubs - $urls = []; // array of urls to check uniqueness of hubs from other networks - $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all - $dead = []; // known dead hubs - report them as undeliverable + $hublist = []; // this provides an easily printable list for the logs + $dhubs = []; // delivery hubs where we store our resulting unique array + $keys = []; // array of keys to check uniquness for zot hubs + $urls = []; // array of urls to check uniqueness of hubs from other networks + $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all + $dead_hosts = []; // known dead hubs - report them as undeliverable foreach ($hubs as $hub) { - - if (intval($hub['site_dead'])) { - $dead[] = $hub; + if (isset($hub['site_dead']) && intval($hub['site_dead'])) { + if(!in_array($hub['hubloc_host'], $dead_hosts)) { + $dead_hosts[] = $hub['hubloc_host']; + } continue; } - if ($env_recips) { - foreach ($env_recips as $er) { - if ($hub['hubloc_hash'] === $er['hash']) { - if (!array_key_exists($hub['hubloc_host'] . $hub['hubloc_sitekey'], $hub_env)) { - $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] = []; + if (self::$env_recips) { + foreach (self::$env_recips as $er) { + if ($hub['hubloc_hash'] === $er) { + if (!array_key_exists($hub['hubloc_site_id'], $hub_env)) { + $hub_env[$hub['hubloc_site_id']] = []; + } + if (!in_array($er, $hub_env[$hub['hubloc_site_id']])) { + $hub_env[$hub['hubloc_site_id']][] = $er; } - $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']][] = $er; } } } - - if ($hub['hubloc_network'] == 'zot') { + if ($hub['hubloc_network'] === 'zot6') { if (!in_array($hub['hubloc_sitekey'], $keys)) { - $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; - $dhubs[] = $hub; - $keys[] = $hub['hubloc_sitekey']; - } - } - else { - if (!in_array($hub['hubloc_url'], $urls)) { if ($hub['hubloc_url'] === z_root()) { //deliver to local hub first array_unshift($hublist, $hub['hubloc_host'] . ' ' . $hub['hubloc_network']); @@ -639,7 +577,14 @@ class Notifier { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; $dhubs[] = $hub; } - $urls[] = $hub['hubloc_url']; + $keys[] = $hub['hubloc_sitekey']; + } + } + else { + if (!in_array($hub['hubloc_url'], $urls)) { + $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; + $dhubs[] = $hub; + $urls[] = $hub['hubloc_url']; } } } @@ -650,37 +595,30 @@ class Notifier { logger('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG); - if (!in_array($hub['hubloc_network'], ['zot', 'zot6'])) { + if ($hub['hubloc_network'] !== 'zot6') { $narr = [ - 'channel' => $channel, + 'channel' => self::$channel, 'upstream' => $upstream, - 'env_recips' => $env_recips, - 'packet_recips' => $packet_recips, - 'recipients' => $recipients, - 'item' => $item, + 'env_recips' => self::$env_recips, + 'recipients' => self::$recipients, 'target_item' => $target_item, 'parent_item' => $parent_item, 'hub' => $hub, 'top_level_post' => $top_level_post, - 'private' => $private, + 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, - 'mail' => $mail, - 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), - 'location' => $location, - 'request' => $request, + 'single' => $cmd === 'single_activity', 'normal_mode' => $normal_mode, - 'packet_type' => $packet_type, - 'walltowall' => $walltowall, + 'packet_type' => self::$packet_type, 'queued' => [] ]; - call_hooks('notifier_hub', $narr); if ($narr['queued']) { foreach ($narr['queued'] as $pq) - $deliveries[] = $pq; + self::$deliveries[] = $pq; } continue; @@ -695,175 +633,95 @@ class Notifier { // will invoke a delivery to those connections which are connected to just that // hub instance. - if ($cmd === 'single_mail' || $cmd === 'single_activity') { - continue; - } - - if (!in_array($hub['hubloc_network'], ['zot', 'zot6'])) { + if ($cmd === 'single_activity') { continue; } - // Do not change this to a uuid as long as we have traditional zot servers - // in the loop. The signature verification step can't handle dashes in the - // hashes. + // default: zot protocol - $hash = random_string(48); + // Prevent zot6 delivery of group comment boosts, which are not required for conversational platforms. + // ActivityPub conversational platforms may wish to filter these if they don't want or require them. + // We will assume here that if $target_item exists and has a verb that it is an actual item structure + // so we won't need to check the existence of the other item fields prior to evaluation. - $packet = null; - $pmsg = ''; - - if ($packet_type === 'refresh' || $packet_type === 'purge') { - if ($hub['hubloc_network'] === 'zot6') { - $packet = Libzot::build_packet($channel, $packet_type, ids_to_array($packet_recips, 'hash')); - } - else { - $packet = zot_build_packet($channel, $packet_type, (($packet_recips) ? $packet_recips : null)); - } - } - if ($packet_type === 'keychange' && $hub['hubloc_network'] === 'zot') { - $pmsg = get_pconfig($channel['channel_id'], 'system', 'keychange'); - $packet = zot_build_packet($channel, $packet_type, (($packet_recips) ? $packet_recips : null)); - } - elseif ($packet_type === 'request' && $hub['hubloc_network'] === 'zot') { - $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); - $packet = zot_build_packet($channel, $packet_type, $env, $hub['hubloc_sitekey'], $hub['site_crypto'], - $hash, ['message_id' => $request_message_id] - ); - } + // This shouldn't produce false positives on comment boosts that were generated on other platforms + // because we won't be delivering them. - if ($packet) { - Queue::insert( - [ - 'hash' => $hash, - 'account_id' => $channel['channel_account_id'], - 'channel_id' => $channel['channel_id'], - 'posturl' => $hub['hubloc_callback'], - 'driver' => $hub['hubloc_network'], - 'notify' => $packet, - 'msg' => (($pmsg) ? json_encode($pmsg) : '') - ] - ); + if (isset($target_item) && isset($target_item['verb']) && $target_item['verb'] === 'Announce' && $target_item['author_xchan'] === $target_item['owner_xchan'] && ! intval($target_item['item_thread_top'])) { + continue; } - else { - $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); - - if ($hub['hubloc_network'] === 'zot6') { - $zenv = []; - if ($env) { - foreach ($env as $e) { - $zenv[] = $e['hash']; - } - } - - $packet_type = (($upstream || $uplink) ? 'response' : 'activity'); - - // block zot private reshares from zot6, as this could cause a number of privacy issues - // due to parenting differences between the reshare implementations. In zot a reshare is - // a standalone parent activity and in zot6 it is a followup/child of the original activity. - // For public reshares, some comments to the reshare on the zot fork will not make it to zot6 - // due to these different message models. This cannot be prevented at this time. - if ($packet_type === 'activity' && $activity['type'] === 'Announce' && intval($target_item['item_private'])) { - continue; - } - - $packet = Libzot::build_packet($channel, $packet_type, $zenv, $activity, 'activitystreams', (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto']); - } - else { - // currently zot6 delivery is only performed on normal items and not sync items or mail or anything else - // Eventually we will do this for all deliveries, but for now ensure this is precisely what we are dealing - // with before switching to zot6 as the primary zot6 handler checks for the existence of a message delivery report - // to trigger dequeue'ing - - $z6 = (($encoded_item && $encoded_item['type'] === 'activity' && (!array_key_exists('allow_cid', $encoded_item))) ? true : false); - if ($z6) { - $packet = zot6_build_packet($channel, 'notify', $env, json_encode($encoded_item), (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'], $hash); - } - else { - $packet = zot_build_packet($channel, 'notify', $env, (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'], $hash); + $hash = new_uuid(); - } - } + $env = (($hub_env && $hub_env[$hub['hubloc_site_id']]) ? $hub_env[$hub['hubloc_site_id']] : ''); + if ((self::$private) && (!$env)) { + continue; + } + $packet = Libzot::build_packet(self::$channel, self::$packet_type, $env, self::$encoded_item, self::$encoding, ((self::$private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto']); - // remove this after most hubs have updated to version 5.0 - if (stripos($hub['site_project'], 'hubzilla') !== false && version_compare($hub['site_version'], '4.7.3', '<=')) { - if ($encoded_item['type'] === 'mail') { - $encoded_item['from']['network'] = 'zot'; - $encoded_item['from']['guid_sig'] = str_replace('sha256.', '', $encoded_item['from']['guid_sig']); - } - else { - $encoded_item['owner']['network'] = 'zot'; - $encoded_item['owner']['guid_sig'] = str_replace('sha256.', '', $encoded_item['owner']['guid_sig']); - if (strpos($encoded_item['author']['url'], z_root()) === 0) { - $encoded_item['author']['network'] = 'zot'; - $encoded_item['author']['guid_sig'] = str_replace('sha256.', '', $encoded_item['author']['guid_sig']); - } - } - } + Queue::insert( + [ + 'hash' => $hash, + 'account_id' => self::$channel['channel_account_id'], + 'channel_id' => self::$channel['channel_id'], + 'posturl' => $hub['hubloc_callback'], + 'notify' => $packet, + 'msg' => EMPTY_STR + ] + ); - Queue::insert( - [ - 'hash' => $hash, - 'account_id' => $target_item['aid'], - 'channel_id' => $target_item['uid'], - 'posturl' => $hub['hubloc_callback'], - 'driver' => $hub['hubloc_network'], - 'notify' => $packet, - 'msg' => json_encode($encoded_item) - ] + // only create delivery reports for normal undeleted items + if (is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { + q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) + values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", + dbesc($target_item['mid']), + dbesc($hub['hubloc_host']), + dbesc($hub['hubloc_host']), + dbesc($hub['hubloc_host']), + dbesc('queued'), + dbesc(datetime_convert()), + dbesc(self::$channel['channel_hash']), + dbesc($hash) ); - - // only create delivery reports for normal undeleted items - if (is_array($target_item) && array_key_exists('postopts', $target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { - q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s','%s','%s','%s','%s','%s','%s' ) ", - dbesc($target_item['mid']), - dbesc($hub['hubloc_host']), - dbesc($hub['hubloc_host']), - dbesc('queued'), - dbesc(datetime_convert()), - dbesc($channel['channel_hash']), - dbesc($hash) - ); - } } - $deliveries[] = $hash; + self::$deliveries[] = $hash; + } if ($normal_mode) { + // This wastes a process if there are no delivery hooks configured, so check this before launching the new process $x = q("select * from hook where hook = 'notifier_normal'"); if ($x) { - Master::Summon(['Deliver_hooks', $target_item['id']]); + Master::Summon(['Deliver_hooks', $target_item['id'], self::$fragment]); } } - if ($deliveries) - do_delivery($deliveries); - - logger('notifier: basic loop complete.', LOGGER_DEBUG); - - if ($dead) { - foreach ($dead as $deceased) { - if (is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { - q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) - values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", - dbesc($target_item['mid']), - dbesc($deceased['hubloc_host']), - dbesc($deceased['hubloc_host']), - dbesc($deceased['hubloc_host']), - dbesc('undeliverable/unresponsive site'), - dbesc(datetime_convert()), - dbesc($channel['channel_hash']), - dbesc(random_string(48)) - ); - } + if (self::$deliveries) { + do_delivery(self::$deliveries); + } + + if ($dead_hosts && is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { + foreach ($dead_hosts as $deceased_host) { + $r = q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) + values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", + dbesc($target_item['mid']), + dbesc($deceased_host), + dbesc($deceased_host), + dbesc($deceased_host), + dbesc('undeliverable/unresponsive site'), + dbesc(datetime_convert()), + dbesc(self::$channel['channel_hash']), + dbesc(new_uuid()) + ); } } call_hooks('notifier_end', $target_item); logger('notifer: complete.'); + return; } diff --git a/Zotlabs/Daemon/Onedirsync.php b/Zotlabs/Daemon/Onedirsync.php index f29fbe5b8..ea995be9e 100644 --- a/Zotlabs/Daemon/Onedirsync.php +++ b/Zotlabs/Daemon/Onedirsync.php @@ -5,10 +5,6 @@ namespace Zotlabs\Daemon; use Zotlabs\Lib\Libzot; use Zotlabs\Lib\Libzotdir; -require_once('include/zot.php'); -require_once('include/dir_fns.php'); - - class Onedirsync { static public function run($argc, $argv) { @@ -29,6 +25,7 @@ class Onedirsync { if (!$r) return; + if (($r[0]['ud_flags'] & UPDATE_FLAGS_UPDATED) || (!$r[0]['ud_addr'])) return; @@ -54,25 +51,24 @@ class Onedirsync { // ignore doing an update if this ud_addr refers to a known dead hubloc $h = q("select * from hubloc where hubloc_addr = '%s'", - dbesc($r[0]['ud_addr']) + dbesc($r[0]['ud_addr']), ); $h = Libzot::zot_record_preferred($h); - if (($h) && ($h['hubloc_status'] & HUBLOC_OFFLINE)) { + if (($h) && (($h['hubloc_status'] & HUBLOC_OFFLINE) || $h['hubloc_deleted'] || $h['hubloc_error'])) { q("update updates set ud_flags = ( ud_flags | %d ) where ud_addr = '%s' and ( ud_flags & %d ) = 0 ", - intval(UPDATE_FLAGS_UPDATED), + intval(UPDATE_FLAGS_DELETED), dbesc($r[0]['ud_addr']), intval(UPDATE_FLAGS_UPDATED) ); - return; } // we might have to pull this out some day, but for now update_directory_entry() // runs zot_finger() and is kind of zot specific - if ($h && !in_array($h['hubloc_network'], ['zot6', 'zot'])) + if ($h && $h['hubloc_network'] !== 'zot6') return; Libzotdir::update_directory_entry($r[0]); diff --git a/Zotlabs/Daemon/Onepoll.php b/Zotlabs/Daemon/Onepoll.php index d747e65f3..79fd06df9 100644 --- a/Zotlabs/Daemon/Onepoll.php +++ b/Zotlabs/Daemon/Onepoll.php @@ -8,6 +8,7 @@ use Zotlabs\Lib\ASCollection; use Zotlabs\Lib\Libzot; require_once('include/socgraph.php'); +require_once('include/feedutils.php'); class Onepoll { @@ -44,18 +45,14 @@ class Onepoll { return; } - $contact = array_shift($contacts); + $contact = $contacts[0]; $importer_uid = $contact['abook_channel']; - $r = q("SELECT * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", - intval($importer_uid) - ); + $importer = channelx_by_n($importer_uid); - if (!$r) + if (!$importer) return; - $importer = $r[0]; - logger("onepoll: poll: ({$contact['id']}) IMPORTER: {$importer['xchan_name']}, CONTACT: {$contact['xchan_name']}"); $last_update = ((($contact['abook_updated'] === $contact['abook_created']) || ($contact['abook_updated'] <= NULL_DATE)) @@ -75,16 +72,12 @@ class Onepoll { return; } - if (!in_array($contact['xchan_network'], ['zot', 'zot6'])) + if ($contact['xchan_network'] !== 'zot6') return; // update permissions - if ($contact['xchan_network'] === 'zot6') - $x = Libzot::refresh($contact, $importer); - - if ($contact['xchan_network'] === 'zot') - $x = zot_refresh($contact, $importer); + $x = Libzot::refresh($contact, $importer); $responded = false; $updated = datetime_convert(); @@ -109,46 +102,63 @@ class Onepoll { return; $fetch_feed = true; - $x = null; // They haven't given us permission to see their stream - $can_view_stream = intval(get_abconfig($importer_uid, $contact['abook_xchan'], 'their_perms', 'view_stream')); - if (!$can_view_stream) + if (!$can_view_stream) { $fetch_feed = false; + } // we haven't given them permission to send us their stream - $can_send_stream = intval(get_abconfig($importer_uid, $contact['abook_xchan'], 'my_perms', 'send_stream')); - if (!$can_send_stream) + if (!$can_send_stream) { $fetch_feed = false; + } - if ($fetch_feed && $contact['xchan_network'] !== 'zot') { + if ($fetch_feed) { $max = intval(get_config('system', 'max_imported_posts', 30)); if (intval($max)) { - $cl = get_xconfig($contact['abook_xchan'], 'activitypub', 'collections'); + $cl = Activity::get_actor_collections($contact['abook_xchan']); + if(empty($cl)) { + $cl = get_xconfig($contact['abook_xchan'], 'activitypub', 'collections'); + } - if (is_array($cl) && $cl) { - $url = ((array_key_exists('outbox', $cl)) ? $cl['outbox'] : ''); + if (is_array($cl) && array_key_exists('outbox', $cl)) { + $url = $cl['outbox']; } else { - $url = str_replace('/poco/', '/zotfeed/', $contact['xchan_connurl']); + $url = str_replace('/poco/', '/outbox/', $contact['xchan_connurl']); } if ($url) { logger('fetching outbox'); - $url = $url . '?date_begin=' . urlencode($last_update); + $url = $url . '?date_begin=' . urlencode($last_update); + + if($contact['xchan_network'] === 'zot6') { + $url = $url . '&top=1'; + } + $obj = new ASCollection($url, $importer, 0, $max); $messages = $obj->get(); + if ($messages) { foreach ($messages as $message) { if (is_string($message)) { $message = Activity::fetch($message, $importer); } + + if ($contact['xchan_network'] === 'zot6') { + // make sure we only fetch top level items + if ($message['type'] === 'Create' && !isset($message['object']['inReplyTo'])) { + Libzot::fetch_conversation($importer, $message['object']['id']); + } + continue; + } + $AS = new ActivityStreams($message); if ($AS->is_valid() && is_array($AS->obj)) { $item = Activity::decode_note($AS); @@ -163,8 +173,10 @@ class Onepoll { // update the poco details for this connection $r = q("SELECT xlink_id from xlink where xlink_xchan = '%s' and xlink_updated > %s - INTERVAL %s and xlink_static = 0 limit 1", intval($contact['xchan_hash']), - db_utcnow(), db_quoteinterval('1 DAY') + db_utcnow(), + db_quoteinterval('1 DAY') ); + if (!$r) { poco_load($contact['xchan_hash'], $contact['xchan_connurl']); } diff --git a/Zotlabs/Daemon/Poller.php b/Zotlabs/Daemon/Poller.php index 762f1349c..88213a7c9 100644 --- a/Zotlabs/Daemon/Poller.php +++ b/Zotlabs/Daemon/Poller.php @@ -69,11 +69,11 @@ class Poller { abook.abook_channel, abook.abook_id, abook.abook_archived, abook.abook_pending, abook.abook_ignored, abook.abook_blocked, xchan.xchan_network, - account.account_lastlog, account.account_flags - FROM abook LEFT JOIN xchan on abook_xchan = xchan_hash + account.account_lastlog, account.account_flags + FROM abook LEFT JOIN xchan on abook_xchan = xchan_hash LEFT JOIN account on abook_account = account_id where abook_self = 0 - $sql_extra + $sql_extra AND (( account_flags = %d ) OR ( account_flags = %d )) $abandon_sql ORDER BY $randfunc", intval(ACCOUNT_OK), intval(ACCOUNT_UNVERIFIED) // FIXME @@ -102,8 +102,7 @@ class Poller { continue; } - - if (!in_array($contact['xchan_network'], ['zot', 'zot6'])) + if ($contact['xchan_network'] !== 'zot6') continue; if ($c == $t) { @@ -191,7 +190,7 @@ class Poller { set_config('system', 'lastpoll', datetime_convert()); - //All done - clear the lockfile + //All done - clear the lockfile @unlink($lockfile); diff --git a/Zotlabs/Daemon/Queue.php b/Zotlabs/Daemon/Queue.php index e1f4b73de..41aaf45ed 100644 --- a/Zotlabs/Daemon/Queue.php +++ b/Zotlabs/Daemon/Queue.php @@ -2,8 +2,7 @@ namespace Zotlabs\Daemon; -require_once('include/queue_fn.php'); -require_once('include/zot.php'); +use Zotlabs\Lib\Queue as LibQueue; class Queue { @@ -48,17 +47,17 @@ class Queue { else { // For the first 12 hours we'll try to deliver every 15 minutes - // After that, we'll only attempt delivery once per hour. - // This currently only handles the default queue drivers ('zot' or '') which we will group by posturl + // After that, we'll only attempt delivery once per hour. + // This currently only handles the default queue drivers ('zot' or '') which we will group by posturl // so that we don't start off a thousand deliveries for a couple of dead hubs. // The zot driver will deliver everything destined for a single hub once contact is made (*if* contact is made). // Other drivers will have to do something different here and may need their own query. - // Note: this requires some tweaking as new posts to long dead hubs once a day will keep them in the + // Note: this requires some tweaking as new posts to long dead hubs once a day will keep them in the // "every 15 minutes" category. We probably need to prioritise them when inserted into the queue // or just prior to this query based on recent and long-term delivery history. If we have good reason to believe - // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once - // or twice a day. + // the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once + // or twice a day. $sqlrandfunc = db_getfunc('rand'); @@ -67,7 +66,7 @@ class Queue { ); while ($r) { foreach ($r as $rv) { - queue_deliver($rv); + LibQueue::deliver($rv); } $r = q("SELECT *,$sqlrandfunc as rn FROM outq WHERE outq_delivered = 0 and outq_scheduled < %s order by rn limit 1", db_utcnow() @@ -78,7 +77,7 @@ class Queue { return; foreach ($r as $rv) { - queue_deliver($rv); + LibQueue::deliver($rv); } } } diff --git a/Zotlabs/Daemon/Ratenotif.php b/Zotlabs/Daemon/Ratenotif.php deleted file mode 100644 index 8afde2c4c..000000000 --- a/Zotlabs/Daemon/Ratenotif.php +++ /dev/null @@ -1,126 +0,0 @@ -<?php - -namespace Zotlabs\Daemon; - -require_once('include/zot.php'); -require_once('include/queue_fn.php'); - - -class Ratenotif { - - static public function run($argc,$argv) { - - - // Deprecated - return; - - - require_once("datetime.php"); - require_once('include/items.php'); - - if($argc < 3) - return; - - - logger('ratenotif: invoked: ' . print_r($argv,true), LOGGER_DEBUG); - - $cmd = $argv[1]; - - $item_id = $argv[2]; - - - if($cmd === 'rating') { - $r = q("select * from xlink where xlink_id = %d and xlink_static = 1 limit 1", - intval($item_id) - ); - if(! $r) { - logger('rating not found'); - return; - } - - $encoded_item = array( - 'type' => 'rating', - 'encoding' => 'zot', - 'target' => $r[0]['xlink_link'], - 'rating' => intval($r[0]['xlink_rating']), - 'rating_text' => $r[0]['xlink_rating_text'], - 'signature' => $r[0]['xlink_sig'], - 'edited' => $r[0]['xlink_updated'] - ); - } - - $channel = channelx_by_hash($r[0]['xlink_xchan']); - if(! $channel) { - logger('no channel'); - return; - } - - - $primary = get_directory_primary(); - - if(! $primary) - return; - - - $interval = ((get_config('system','delivery_interval') !== false) - ? intval(get_config('system','delivery_interval')) : 2 ); - - $deliveries_per_process = intval(get_config('system','delivery_batch_count')); - - if($deliveries_per_process <= 0) - $deliveries_per_process = 1; - - $deliver = array(); - - $x = z_fetch_url($primary . '/regdir'); - if($x['success']) { - $j = json_decode($x['body'],true); - if($j && $j['success'] && is_array($j['directories'])) { - - foreach($j['directories'] as $h) { - if($h == z_root()) - continue; - - $hash = random_string(); - $n = zot_build_packet($channel,'notify',null,null,'',$hash); - - queue_insert(array( - 'hash' => $hash, - 'account_id' => $channel['channel_account_id'], - 'channel_id' => $channel['channel_id'], - 'posturl' => $h . '/post', - 'notify' => $n, - 'msg' => json_encode($encoded_item) - )); - - - $x = q("select count(outq_hash) as total from outq where outq_delivered = 0"); - if(intval($x[0]['total']) > intval(get_config('system','force_queue_threshold',300))) { - logger('immediate delivery deferred.', LOGGER_DEBUG, LOG_INFO); - update_queue_item($hash); - continue; - } - - $deliver[] = $hash; - - if(count($deliver) >= $deliveries_per_process) { - Master::Summon(array('Deliver',$deliver)); - $deliver = array(); - if($interval) - @time_sleep_until(microtime(true) + (float) $interval); - } - } - - // catch any stragglers - - if(count($deliver)) { - Master::Summon(array('Deliver',$deliver)); - } - } - } - - logger('ratenotif: complete.'); - return; - - } -} diff --git a/Zotlabs/Daemon/Thumbnail.php b/Zotlabs/Daemon/Thumbnail.php index 72034b870..3688e8ae5 100644 --- a/Zotlabs/Daemon/Thumbnail.php +++ b/Zotlabs/Daemon/Thumbnail.php @@ -45,7 +45,6 @@ class Thumbnail { return; } - $default_controller = null; $files = glob('Zotlabs/Thumbs/*.php'); |