diff options
Diffstat (limited to 'Zotlabs/Daemon/Notifier.php')
-rw-r--r-- | Zotlabs/Daemon/Notifier.php | 816 |
1 files changed, 320 insertions, 496 deletions
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php index 28c512d4a..368a9229d 100644 --- a/Zotlabs/Daemon/Notifier.php +++ b/Zotlabs/Daemon/Notifier.php @@ -6,37 +6,27 @@ use Zotlabs\Lib\Libzot; use Zotlabs\Lib\Activity; use Zotlabs\Lib\Queue; -require_once('include/queue_fn.php'); require_once('include/html2plain.php'); require_once('include/conversation.php'); -require_once('include/zot.php'); require_once('include/items.php'); require_once('include/bbcode.php'); - /* - * This file was at one time responsible for doing all deliveries, but this caused - * big problems on shared hosting systems, where the process might get killed by the - * hosting provider and nothing would get delivered. - * It now only delivers one message under certain cases, and invokes a queued - * delivery mechanism (include/deliver.php) to deliver individual contacts at - * controlled intervals. - * This has a much better chance of surviving random processes getting killed - * by the hosting provider. + * Notifier - message dispatch and preparation for delivery * * The basic flow is: * Identify the type of message * Collect any information that needs to be sent * Convert it into a suitable generic format for sending - * Figure out who the recipients are and if we need to relay + * Figure out who the recipients are and if we need to relay * through a conversation owner - * Once we know what recipients are involved, collect a list of + * Once we know what recipients are involved, collect a list of * destination sites * Build and store a queue item for each unique site and invoke * a delivery process for each site or a small number of sites (1-3) * and add a slight delay between each delivery invocation if desired (usually) - * + * */ /* @@ -54,17 +44,15 @@ require_once('include/bbcode.php'); * event (in events.php) * expire (in items.php) * like (in like.php, poke.php) - * mail (in message.php) * tag (in photos.php, poke.php, tagger.php) * tgroup (in items.php) * wall-new (in photos.php, item.php) * * and ITEM_ID is the id of the item in the database that needs to be sent to others. * - * ZOT + * ZOT * permission_create abook_id * permission_accept abook_id - * permission_reject abook_id * permission_update abook_id * refresh_all channel_id * purge channel_id xchan_hash @@ -72,7 +60,6 @@ require_once('include/bbcode.php'); * expire channel_id * relay item_id (item was relayed to owner, we will deliver it as owner) * single_activity item_id (deliver to a singleton network from the appropriate clone) - * single_mail mail_id (deliver to a singleton network from the appropriate clone) * location channel_id * request channel_id xchan_hash message_id * rating xlink_id @@ -81,201 +68,153 @@ require_once('include/bbcode.php'); */ - class Notifier { - static public function run($argc,$argv){ + static public $deliveries = []; + static public $recipients = []; + static public $env_recips = []; + static public $packet_type = 'activity'; + static public $encoding = 'activitystreams'; + static public $encoded_item = null; + static public $channel = null; + static public $private = false; - if($argc < 3) + static public function run($argc, $argv) { + + if ($argc < 3) { return; + } - logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG); + logger('notifier: invoked: ' . print_r($argv, true), LOGGER_DEBUG); $cmd = $argv[1]; $item_id = $argv[2]; - if(! $item_id) + if (!$item_id) { return; + } - $sys = get_sys_channel(); - - $deliveries = array(); + self::$deliveries = []; + self::$recipients = []; + self::$env_recips = []; + self::$packet_type = 'activity'; + self::$encoding = 'activitystreams'; + self::$encoded_item = null; + self::$channel = null; + self::$private = false; - $request = false; - $mail = false; - $top_level = false; - $location = false; - $recipients = array(); - $url_recipients = array(); + $sys = get_sys_channel(); $normal_mode = true; - $packet_type = 'undefined'; - - if($cmd === 'mail' || $cmd === 'single_mail') { - $normal_mode = false; - $mail = true; - $private = true; - $message = q("SELECT * FROM mail WHERE id = %d LIMIT 1", - intval($item_id) - ); - if(! $message) { - return; - } - xchan_mail_query($message[0]); - $uid = $message[0]['channel_id']; - $recipients[] = $message[0]['from_xchan']; // include clones - $recipients[] = $message[0]['to_xchan']; - $item = $message[0]; - - $encoded_item = encode_mail($item); - - $s = q("select * from channel where channel_id = %d limit 1", - intval($item['channel_id']) - ); - if($s) - $channel = $s[0]; - - } - elseif($cmd === 'request') { - $channel_id = $item_id; - $xchan = $argv[3]; - $request_message_id = $argv[4]; - - $s = q("select * from channel where channel_id = %d limit 1", - intval($channel_id) - ); - if($s) - $channel = $s[0]; - $private = true; - $recipients[] = $xchan; - $packet_type = 'request'; - $normal_mode = false; - } - elseif($cmd === 'keychange') { - $channel = channelx_by_n($item_id); - $r = q("select abook_xchan from abook where abook_channel = %d", + if ($cmd === 'keychange') { + self::$channel = channelx_by_n($item_id); + $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); - if($r) { - foreach($r as $rr) { - $recipients[] = $rr['abook_xchan']; + if ($r) { + foreach ($r as $rr) { + self::$recipients[] = $rr['abook_xchan']; } } - $private = false; - $packet_type = 'keychange'; - $normal_mode = false; + self::$private = false; + self::$packet_type = 'keychange'; + self::$encoded_item = get_pconfig(self::$channel['channel_id'], 'system', 'keychange'); + self::$encoding = 'zot'; + $normal_mode = false; } - elseif(in_array($cmd, [ 'permission_update', 'permission_reject', 'permission_accept', 'permission_create' ])) { - // Get the (single) recipient + elseif (in_array($cmd, ['permission_update', 'permission_accept', 'permission_create'])) { + // Get the (single) recipient $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", intval($item_id) ); - if($r) { - $uid = $r[0]['abook_channel']; + if ($r) { + $recip = $r[0]; + // Get the sender - $channel = channelx_by_n($uid); - if($channel) { - $perm_update = array('sender' => $channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); - - if($cmd === 'permission_create') - call_hooks('permissions_create',$perm_update); - elseif($cmd === 'permission_accept') - call_hooks('permissions_accept',$perm_update); - elseif($cmd === 'permission_reject') - call_hooks('permissions_reject',$perm_update); - else - call_hooks('permissions_update',$perm_update); - - if($perm_update['success']) { - if($perm_update['deliveries']) { - $deliveries[] = $perm_update['deliveries']; - do_delivery($deliveries); + self::$channel = channelx_by_n($recip['abook_channel']); + if (self::$channel) { + $perm_update = ['sender' => self::$channel, 'recipient' => $recip, 'success' => false, 'deliveries' => '']; + + switch ($cmd) { + case 'permission_create': + call_hooks('permissions_create', $perm_update); + break; + case 'permission_accept': + call_hooks('permissions_accept', $perm_update); + break; + case 'permission_update': + call_hooks('permissions_update', $perm_update); + break; + default: + break; + } + + if ($perm_update['success']) { + if ($perm_update['deliveries']) { + self::$deliveries[] = $perm_update['deliveries']; + do_delivery(self::$deliveries); } - return; + return; } else { - $recipients[] = $r[0]['abook_xchan']; - $private = false; - $packet_type = 'refresh'; - $packet_recips = array(array('guid' => $r[0]['xchan_guid'],'guid_sig' => $r[0]['xchan_guid_sig'],'hash' => $r[0]['xchan_hash'])); + self::$recipients[] = $recip['abook_xchan']; + self::$private = false; + self::$packet_type = 'refresh'; + self::$env_recips = [$recip['xchan_hash']]; } } } } - elseif($cmd === 'refresh_all') { + elseif ($cmd === 'refresh_all') { logger('notifier: refresh_all: ' . $item_id); - $uid = $item_id; - $channel = channelx_by_n($item_id); - $r = q("select abook_xchan from abook where abook_channel = %d", - intval($item_id) - ); - if($r) { - foreach($r as $rr) { - $recipients[] = $rr['abook_xchan']; - } - } - $private = false; - $packet_type = 'refresh'; - } - elseif($cmd === 'location') { - logger('notifier: location: ' . $item_id); - $s = q("select * from channel where channel_id = %d limit 1", - intval($item_id) - ); - if($s) - $channel = $s[0]; - $uid = $item_id; - $recipients = array(); + + self::$channel = channelx_by_n($item_id, true); + $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); - if($r) { - foreach($r as $rr) { - $recipients[] = $rr['abook_xchan']; + if ($r) { + foreach ($r as $rr) { + self::$recipients[] = $rr['abook_xchan']; } } - $encoded_item = array('locations' => zot_encode_locations($channel),'type' => 'location', 'encoding' => 'zot'); - $target_item = array('aid' => $channel['channel_account_id'],'uid' => $channel['channel_id']); - $private = false; - $packet_type = 'location'; - $location = true; + // In case we deleted the channel, our abook entry has already vanished. + // In order to be able to update our clones we need to add ourself here. + self::$recipients[] = self::$channel['channel_hash']; + + self::$private = false; + self::$packet_type = 'refresh'; } - elseif($cmd === 'purge') { + elseif ($cmd === 'purge') { $xchan = $argv[3]; logger('notifier: purge: ' . $item_id . ' => ' . $xchan); - if (! $xchan) { + if (!$xchan) { return; } - $channel = channelx_by_n($item_id); - $recipients[] = $xchan; - $private = true; - $packet_type = 'purge'; - $packet_recips[] = ['hash' => $xchan]; + self::$channel = channelx_by_n($item_id, true); + self::$recipients = [$xchan]; + self::$private = true; + self::$packet_type = 'purge'; } - elseif($cmd === 'purge_all') { - + elseif ($cmd === 'purge_all') { logger('notifier: purge_all: ' . $item_id); - $channel = channelx_by_n($item_id); + self::$channel = channelx_by_n($item_id, true); + self::$recipients = []; + self::$private = false; + self::$packet_type = 'purge'; - $recipients = []; $r = q("select abook_xchan from abook where abook_channel = %d and abook_self = 0", intval($item_id) ); - if (! $r) { + if (!$r) { return; } foreach ($r as $rr) { - $recipients[] = $rr['abook_xchan']; - $packet_recips[] = ['hash' => $rr['abook_xchan']]; + self::$recipients[] = $rr['abook_xchan']; } - - $private = false; - $packet_type = 'purge'; - - } else { @@ -283,38 +222,34 @@ class Notifier { // Fetch the target item - $r = q("SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1", + $r = q("SELECT * FROM item WHERE id = %d AND parent != 0", intval($item_id) ); - - if(! $r) + if (!$r) { return; + } xchan_query($r); - $r = fetch_post_tags($r); - + $target_item = $r[0]; - if(in_array($target_item['author']['xchan_network'], ['rss', 'anon'])) { + if (in_array($target_item['author']['xchan_network'], ['rss', 'anon'])) { logger('notifier: target item author is not a fetchable actor', LOGGER_DEBUG); return; } - $deleted_item = false; - - if(intval($target_item['item_deleted'])) { + if (intval($target_item['item_deleted'])) { logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG); - $deleted_item = true; } - if(! in_array(intval($target_item['item_type']), [ ITEM_TYPE_POST ] )) { - $hookinfo=[ - 'targetitem'=>$target_item, - 'deliver'=>false + if (!in_array(intval($target_item['item_type']), [ITEM_TYPE_POST])) { + $hookinfo = [ + 'targetitem' => $target_item, + 'deliver' => false ]; if (intval($target_item['item_type'] == ITEM_TYPE_CUSTOM)) { - call_hooks('customitem_deliver',$hookinfo); + call_hooks('customitem_deliver', $hookinfo); } if (!$hookinfo['deliver']) { @@ -328,14 +263,20 @@ class Notifier { // Check for non published items, but allow an exclusion for transmitting hidden file activities - if(intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) || - intval($target_item['item_blocked']) || - ( intval($target_item['item_hidden']) && ($target_item['obj_type'] !== ACTIVITY_OBJ_FILE))) { + if (intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) || + intval($target_item['item_blocked']) || + (intval($target_item['item_hidden']) && ($target_item['obj_type'] !== ACTIVITY_OBJ_FILE))) { logger('notifier: target item not published, so not forwardable', LOGGER_DEBUG); return; } - if(strpos($target_item['postopts'],'nodeliver') !== false) { + // follow/unfollow is for internal use only + if (in_array($target_item['verb'], [ACTIVITY_FOLLOW, ACTIVITY_UNFOLLOW])) { + logger('not fowarding follow/unfollow note activity'); + return; + } + + if (strpos($target_item['postopts'], 'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG); return; } @@ -343,79 +284,75 @@ class Notifier { $s = q("select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", intval($target_item['uid']) ); - if($s) - $channel = $s[0]; + if ($s) { + self::$channel = $s[0]; + } - if($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) { - logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); + if (self::$channel['channel_hash'] !== $target_item['author_xchan'] && self::$channel['channel_hash'] !== $target_item['owner_xchan']) { + logger("notifier: Sending channel " . self::$channel['channel_hash'] . " is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); return; } - - if($target_item['mid'] === $target_item['parent_mid']) { - $parent_item = $target_item; + if ($target_item['mid'] === $target_item['parent_mid']) { + $parent_item = $target_item; $top_level_post = true; } else { // fetch the parent item - $r = q("SELECT * from item where id = %d order by id asc", + $r = q("SELECT * FROM item WHERE id = %d", intval($target_item['parent']) ); - if(! $r) + if (!$r) { return; + } - if(strpos($r[0]['postopts'],'nodeliver') !== false) { + if (strpos($r[0]['postopts'], 'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG, LOG_NOTICE); return; } xchan_query($r); $r = fetch_post_tags($r); - - $parent_item = $r[0]; + + $parent_item = $r[0]; $top_level_post = false; } // avoid looping of discover items 12/4/2014 - - if($sys && $parent_item['uid'] == $sys['channel_id']) + if ($sys && $parent_item['uid'] == $sys['channel_id']) { return; + } - $encoded_item = encode_item($target_item); - + $m = get_iconfig($target_item, 'activitypub', 'signed_data'); // Re-use existing signature unless the activity type changed to a Tombstone, which won't verify. - $m = ((intval($target_item['item_deleted'])) ? '' : get_iconfig($target_item,'activitystreams','signed_data')); - - if($m) { - $activity = json_decode($m,true); + if ($m && (!intval($target_item['item_deleted']))) { + self::$encoded_item = json_decode($m, true); } else { - $activity = array_merge(['@context' => [ + self::$encoded_item = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV - ]], Activity::encode_activity($target_item) + ]], Activity::encode_activity($target_item) ); - } + } - logger('target_item: ' . print_r($target_item,true), LOGGER_DEBUG); - logger('encoded: ' . print_r($activity,true), LOGGER_DEBUG); + logger('target_item: ' . print_r($target_item, true), LOGGER_DEBUG); + logger('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG); // Send comments to the owner to re-deliver to everybody in the conversation // We only do this if the item in question originated on this site. This prevents looping. // To clarify, a site accepting a new comment is responsible for sending it to the owner for relay. - // Relaying should never be initiated on a post that arrived from elsewhere. + // Relaying should never be initiated on a post that arrived from elsewhere. // We should normally be able to rely on ITEM_ORIGIN, but start_delivery_chain() incorrectly set this // flag on comments for an extended period. So we'll also call comment_local_origin() which looks at - // the hostname in the message_id and provides a second (fallback) opinion. - - $relay_to_owner = (((! $top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item)) ? true : false); - - + // the hostname in the message_id and provides a second (fallback) opinion. - $uplink = false; + $relay_to_owner = (!$top_level_post && intval($target_item['item_origin']) && comment_local_origin($target_item)); + $uplink = false; + $upstream = false; // $cmd === 'relay' indicates the owner is sending it to the original recipients // don't allow the item in the relay command to relay to owner under any circumstances, it will loop @@ -425,154 +362,141 @@ class Notifier { // tag_deliver'd post which needs to be sent back to the original author - if(($cmd === 'uplink') && intval($parent_item['item_uplink']) && (! $top_level_post)) { - logger('notifier: uplink'); - $uplink = true; - } + if (($cmd === 'uplink') && intval($parent_item['item_uplink']) && (!$top_level_post)) { + logger('notifier: uplink'); + $uplink = true; + self::$packet_type = 'response'; + } - if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) { + if (($relay_to_owner || $uplink) && ($cmd !== 'relay')) { logger('notifier: followup relay', LOGGER_DEBUG); - $recipients = array(($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); - $private = true; - if(! $encoded_item['flags']) - $encoded_item['flags'] = array(); - $encoded_item['flags'][] = 'relay'; - $upstream = true; + $sendto = (($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); + self::$recipients = [$sendto]; + self::$private = true; + $upstream = true; + self::$packet_type = 'response'; } else { - logger('notifier: normal distribution', LOGGER_DEBUG); - if($cmd === 'relay') - logger('notifier: owner relay'); - $upstream = false; + if ($cmd === 'relay') { + logger('owner relay (downstream delivery)'); + } + else { + logger('normal (downstream) distribution', LOGGER_DEBUG); + } + + if ($parent_item && $parent_item['item_private'] !== $target_item['item_private']) { + logger('conversation privacy mismatch - downstream delivery prevented'); + return; + } + // if our parent is a tag_delivery recipient, uplink to the original author causing - // a delivery fork. - - if(($parent_item) && intval($parent_item['item_uplink']) && (! $top_level_post) && ($cmd !== 'uplink')) { + // a delivery fork. + if ($parent_item && intval($parent_item['item_uplink']) && !$top_level_post && $cmd !== 'uplink') { // don't uplink a relayed post to the relay owner - if($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { + if ($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { logger('notifier: uplinking this item'); - Master::Summon(array('Notifier','uplink',$item_id)); + Master::Summon(['Notifier', 'uplink', $item_id]); } } - $private = false; - $recipients = collect_recipients($parent_item,$private); + self::$private = false; + self::$recipients = collect_recipients($parent_item, self::$private); + // FIXME add any additional recipients such as mentions, etc. if ($top_level_post) { // remove clones who will receive the post via sync - $recipients = array_diff($recipients, [ $target_item['owner_xchan'] ]); - } - - // FIXME add any additional recipients such as mentions, etc. + self::$recipients = array_values(array_diff(self::$recipients, [$target_item['owner_xchan']])); + } // don't send deletions onward for other people's stuff - // TODO verify this is needed - copied logic from same place in old code - - if(intval($target_item['item_deleted']) && (! intval($target_item['item_wall']))) { + if (intval($target_item['item_deleted']) && (!intval($target_item['item_wall']))) { logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE); return; } } } - $walltowall = (($top_level_post && $channel['xchan_hash'] === $target_item['author_xchan']) ? true : false); - // Generic delivery section, we have an encoded item and recipients // Now start the delivery process - $x = $encoded_item; - $x['title'] = 'private'; - $x['body'] = 'private'; - logger('notifier: encoded item: ' . print_r($x,true), LOGGER_DATA, LOG_DEBUG); - - //logger('notifier: encoded activity: ' . print_r($activity,true), LOGGER_DATA, LOG_DEBUG); + logger('encoded item: ' . print_r(self::$encoded_item, true), LOGGER_DATA, LOG_DEBUG); - stringify_array_elms($recipients); - if(! $recipients) { + stringify_array_elms(self::$recipients); + if (!self::$recipients) { logger('no recipients'); return; } - // logger('notifier: recipients: ' . print_r($recipients,true), LOGGER_NORMAL, LOG_DEBUG); + // logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG); - $env_recips = (($private) ? array() : null); - - $details = q("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',',$recipients)) . ")"); + if (!count(self::$env_recips)) { + self::$env_recips = ((self::$private) ? [] : null); + } - $recip_list = array(); + $recip_list = []; - if($details) { - foreach($details as $d) { + $details = dbq("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ")"); - $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; - if($private) { - $env_recips[] = [ - 'guid' => $d['xchan_guid'], - 'guid_sig' => $d['xchan_guid_sig'], - 'hash' => $d['xchan_hash'] - ]; + if ($details) { + foreach ($details as $d) { + $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; + if (self::$private) { + self::$env_recips[] = $d['xchan_hash']; } } } - $narr = [ - 'channel' => $channel, + 'channel' => self::$channel, 'upstream' => $upstream, - 'env_recips' => $env_recips, - 'packet_recips' => $packet_recips, - 'recipients' => $recipients, - 'item' => $item, + 'env_recips' => self::$env_recips, + 'recipients' => self::$recipients, 'target_item' => $target_item, 'parent_item' => $parent_item, 'top_level_post' => $top_level_post, - 'private' => $private, + 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, - 'mail' => $mail, - 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), - 'location' => $location, - 'request' => $request, + 'single' => ($cmd === 'single_activity'), 'normal_mode' => $normal_mode, - 'packet_type' => $packet_type, - 'walltowall' => $walltowall, + 'packet_type' => self::$packet_type, 'queued' => [] ]; call_hooks('notifier_process', $narr); - if($narr['queued']) { - foreach($narr['queued'] as $pq) - $deliveries[] = $pq; + if ($narr['queued']) { + foreach ($narr['queued'] as $pq) + self::$deliveries[] = $pq; } // notifier_process can alter the recipient list - $recipients = $narr['recipients']; - $env_recips = $narr['env_recips']; - $packet_recips = $narr['packet_recips']; + self::$recipients = $narr['recipients']; + self::$env_recips = $narr['env_recips']; - if(($private) && (! $env_recips)) { + if (self::$private && !self::$env_recips) { // shouldn't happen - logger('notifier: private message with no envelope recipients.' . print_r($argv,true), LOGGER_NORMAL, LOG_NOTICE); + logger('notifier: private message with no envelope recipients.' . print_r($argv, true), LOGGER_NORMAL, LOG_NOTICE); + return; } - - logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG); - + + logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list, true), LOGGER_DEBUG); + // Now we have collected recipients (except for external mentions, FIXME) // Let's reduce this to a set of hubs; checking that the site is not dead. - $hubs = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc left join site on site_url = hubloc_url - where hubloc_hash in (" . protect_sprintf(implode(',',$recipients)) . ") + $hubs = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_dead from hubloc left join site on site_url = hubloc_url + where hubloc_hash in (" . protect_sprintf(implode(',', self::$recipients)) . ") and hubloc_error = 0 and hubloc_deleted = 0" ); - // public posts won't make it to the local public stream unless there's a recipient on this site. + // public posts won't make it to the local public stream unless there's a recipient on this site. // This code block sees if it's a public post and localhost is missing, and if so adds an entry for the local sys channel to the $hubs list - if (! $private) { + if (!self::$private) { $found_localhost = false; if ($hubs) { foreach ($hubs as $h) { @@ -582,115 +506,108 @@ class Notifier { } } } - if (! $found_localhost) { - $localhub = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc - left join site on site_url = hubloc_url where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0", + if (!$found_localhost) { + $localhub = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_dead from hubloc left join site on site_url = hubloc_url + where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0", dbesc(z_root() . '/channel/sys') ); if ($localhub) { - $hubs = array_merge($hubs, $localhub); + $hubs = array_merge($localhub, $hubs); } } } - - if(! $hubs) { + + if (!$hubs) { logger('notifier: no hubs', LOGGER_NORMAL, LOG_NOTICE); return; } /** - * Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey, + * Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey, * since it may have been a re-install which has not yet been detected and pruned. * For other networks which don't have or require sitekeys, we'll have to use the URL */ - $hublist = []; // this provides an easily printable list for the logs - $dhubs = []; // delivery hubs where we store our resulting unique array - $keys = []; // array of keys to check uniquness for zot hubs - $urls = []; // array of urls to check uniqueness of hubs from other networks - $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all - $dead = []; // known dead hubs - report them as undeliverable - - foreach($hubs as $hub) { + $hublist = []; // this provides an easily printable list for the logs + $dhubs = []; // delivery hubs where we store our resulting unique array + $keys = []; // array of keys to check uniquness for zot hubs + $urls = []; // array of urls to check uniqueness of hubs from other networks + $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all + $dead_hosts = []; // known dead hubs - report them as undeliverable - if (intval($hub['site_dead'])) { - $dead[] = $hub; + foreach ($hubs as $hub) { + if (isset($hub['site_dead']) && intval($hub['site_dead'])) { + if(!in_array($hub['hubloc_host'], $dead_hosts)) { + $dead_hosts[] = $hub['hubloc_host']; + } continue; } - if($env_recips) { - foreach($env_recips as $er) { - if($hub['hubloc_hash'] === $er['hash']) { - if(! array_key_exists($hub['hubloc_host'] . $hub['hubloc_sitekey'], $hub_env)) { - $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] = []; + if (self::$env_recips) { + foreach (self::$env_recips as $er) { + if ($hub['hubloc_hash'] === $er) { + if (!array_key_exists($hub['hubloc_site_id'], $hub_env)) { + $hub_env[$hub['hubloc_site_id']] = []; } - $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']][] = $er; + $hub_env[$hub['hubloc_site_id']][] = $er; } } } - - if($hub['hubloc_network'] == 'zot') { - if(! in_array($hub['hubloc_sitekey'],$keys)) { - $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; - $dhubs[] = $hub; - $keys[] = $hub['hubloc_sitekey']; - } - } - else { - if(! in_array($hub['hubloc_url'],$urls)) { - if($hub['hubloc_url'] === z_root()) { + if ($hub['hubloc_network'] === 'zot6') { + if (!in_array($hub['hubloc_sitekey'], $keys)) { + if ($hub['hubloc_url'] === z_root()) { //deliver to local hub first array_unshift($hublist, $hub['hubloc_host'] . ' ' . $hub['hubloc_network']); array_unshift($dhubs, $hub); } else { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; - $dhubs[] = $hub; + $dhubs[] = $hub; } - $urls[] = $hub['hubloc_url']; + $keys[] = $hub['hubloc_sitekey']; + } + } + else { + if (!in_array($hub['hubloc_url'], $urls)) { + $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; + $dhubs[] = $hub; + $urls[] = $hub['hubloc_url']; } } } - logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG, LOG_DEBUG); + logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist, true), LOGGER_DEBUG, LOG_DEBUG); - foreach($dhubs as $hub) { + foreach ($dhubs as $hub) { - logger('notifier_hub: ' . $hub['hubloc_url'],LOGGER_DEBUG); + logger('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG); - if(! in_array($hub['hubloc_network'], [ 'zot','zot6' ])) { + if ($hub['hubloc_network'] !== 'zot6') { $narr = [ - 'channel' => $channel, + 'channel' => self::$channel, 'upstream' => $upstream, - 'env_recips' => $env_recips, - 'packet_recips' => $packet_recips, - 'recipients' => $recipients, - 'item' => $item, + 'env_recips' => self::$env_recips, + 'recipients' => self::$recipients, 'target_item' => $target_item, 'parent_item' => $parent_item, 'hub' => $hub, 'top_level_post' => $top_level_post, - 'private' => $private, + 'private' => self::$private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, - 'mail' => $mail, - 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), - 'location' => $location, - 'request' => $request, + 'single' => $cmd === 'single_activity', 'normal_mode' => $normal_mode, - 'packet_type' => $packet_type, - 'walltowall' => $walltowall, + 'packet_type' => self::$packet_type, 'queued' => [] ]; - - call_hooks('notifier_hub',$narr); - if($narr['queued']) { - foreach($narr['queued'] as $pq) - $deliveries[] = $pq; + call_hooks('notifier_hub', $narr); + if ($narr['queued']) { + foreach ($narr['queued'] as $pq) + self::$deliveries[] = $pq; } continue; @@ -705,176 +622,83 @@ class Notifier { // will invoke a delivery to those connections which are connected to just that // hub instance. - if($cmd === 'single_mail' || $cmd === 'single_activity') { + if ($cmd === 'single_activity') { continue; } - if(! in_array($hub['hubloc_network'], [ 'zot','zot6' ])) { - continue; - } - - // Do not change this to a uuid as long as we have traditional zot servers - // in the loop. The signature verification step can't handle dashes in the - // hashes. - - $hash = random_string(48); + // default: zot protocol - $packet = null; - $pmsg = ''; - - if($packet_type === 'refresh' || $packet_type === 'purge') { - if($hub['hubloc_network'] === 'zot6') { - $packet = Libzot::build_packet($channel, $packet_type, ids_to_array($packet_recips,'hash')); - } - else { - $packet = zot_build_packet($channel,$packet_type,(($packet_recips) ? $packet_recips : null)); - } - } - if($packet_type === 'keychange' && $hub['hubloc_network'] === 'zot') { - $pmsg = get_pconfig($channel['channel_id'],'system','keychange'); - $packet = zot_build_packet($channel,$packet_type,(($packet_recips) ? $packet_recips : null)); - } - elseif($packet_type === 'request' && $hub['hubloc_network'] === 'zot') { - $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); - $packet = zot_build_packet($channel,$packet_type,$env,$hub['hubloc_sitekey'],$hub['site_crypto'], - $hash, array('message_id' => $request_message_id) - ); - } + $hash = new_uuid(); - if($packet) { - Queue::insert( - [ - 'hash' => $hash, - 'account_id' => $channel['channel_account_id'], - 'channel_id' => $channel['channel_id'], - 'posturl' => $hub['hubloc_callback'], - 'driver' => $hub['hubloc_network'], - 'notify' => $packet, - 'msg' => (($pmsg) ? json_encode($pmsg) : '') - ] - ); + $env = (($hub_env && $hub_env[$hub['hubloc_site_id']]) ? $hub_env[$hub['hubloc_site_id']] : ''); + if ((self::$private) && (!$env)) { + continue; } - else { - $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); - - - if($hub['hubloc_network'] === 'zot6') { - $zenv = []; - if($env) { - foreach($env as $e) { - $zenv[] = $e['hash']; - } - } - - $packet_type = (($upstream || $uplink) ? 'response' : 'activity'); - - // block zot private reshares from zot6, as this could cause a number of privacy issues - // due to parenting differences between the reshare implementations. In zot a reshare is - // a standalone parent activity and in zot6 it is a followup/child of the original activity. - // For public reshares, some comments to the reshare on the zot fork will not make it to zot6 - // due to these different message models. This cannot be prevented at this time. - - if($packet_type === 'activity' && $activity['type'] === 'Announce' && intval($target_item['item_private'])) { - continue; - } - - $packet = Libzot::build_packet($channel,$packet_type,$zenv,$activity,'activitystreams',(($private) ? $hub['hubloc_sitekey'] : null),$hub['site_crypto']); - } - else { - // currently zot6 delivery is only performed on normal items and not sync items or mail or anything else - // Eventually we will do this for all deliveries, but for now ensure this is precisely what we are dealing - // with before switching to zot6 as the primary zot6 handler checks for the existence of a message delivery report - // to trigger dequeue'ing - - $z6 = (($encoded_item && $encoded_item['type'] === 'activity' && (! array_key_exists('allow_cid',$encoded_item))) ? true : false); - if($z6) { - $packet = zot6_build_packet($channel,'notify',$env, json_encode($encoded_item), (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'],$hash); - } - else { - $packet = zot_build_packet($channel,'notify',$env, (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'],$hash); - - } - } + $packet = Libzot::build_packet(self::$channel, self::$packet_type, $env, self::$encoded_item, self::$encoding, ((self::$private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto']); - // remove this after most hubs have updated to version 5.0 - if(stripos($hub['site_project'], 'hubzilla') !== false && version_compare($hub['site_version'], '4.7.3', '<=')) { - if($encoded_item['type'] === 'mail') { - $encoded_item['from']['network'] = 'zot'; - $encoded_item['from']['guid_sig'] = str_replace('sha256.', '', $encoded_item['from']['guid_sig']); - } - else { - $encoded_item['owner']['network'] = 'zot'; - $encoded_item['owner']['guid_sig'] = str_replace('sha256.', '', $encoded_item['owner']['guid_sig']); - if(strpos($encoded_item['author']['url'], z_root()) === 0) { - $encoded_item['author']['network'] = 'zot'; - $encoded_item['author']['guid_sig'] = str_replace('sha256.', '', $encoded_item['author']['guid_sig']); - } - } - } + Queue::insert( + [ + 'hash' => $hash, + 'account_id' => self::$channel['channel_account_id'], + 'channel_id' => self::$channel['channel_id'], + 'posturl' => $hub['hubloc_callback'], + 'notify' => $packet, + 'msg' => EMPTY_STR + ] + ); - Queue::insert( - [ - 'hash' => $hash, - 'account_id' => $target_item['aid'], - 'channel_id' => $target_item['uid'], - 'posturl' => $hub['hubloc_callback'], - 'driver' => $hub['hubloc_network'], - 'notify' => $packet, - 'msg' => json_encode($encoded_item) - ] + // only create delivery reports for normal undeleted items + if (is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { + q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) + values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", + dbesc($target_item['mid']), + dbesc($hub['hubloc_host']), + dbesc($hub['hubloc_host']), + dbesc($hub['hubloc_host']), + dbesc('queued'), + dbesc(datetime_convert()), + dbesc(self::$channel['channel_hash']), + dbesc($hash) ); - - // only create delivery reports for normal undeleted items - if(is_array($target_item) && array_key_exists('postopts',$target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { - q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s','%s','%s','%s','%s','%s','%s' ) ", - dbesc($target_item['mid']), - dbesc($hub['hubloc_host']), - dbesc($hub['hubloc_host']), - dbesc('queued'), - dbesc(datetime_convert()), - dbesc($channel['channel_hash']), - dbesc($hash) - ); - } } - $deliveries[] = $hash; + self::$deliveries[] = $hash; + } - - if($normal_mode) { + + if ($normal_mode) { + // This wastes a process if there are no delivery hooks configured, so check this before launching the new process $x = q("select * from hook where hook = 'notifier_normal'"); - if($x) { - Master::Summon( [ 'Deliver_hooks', $target_item['id'] ] ); + if ($x) { + Master::Summon(['Deliver_hooks', $target_item['id']]); } } - if($deliveries) - do_delivery($deliveries); - - logger('notifier: basic loop complete.', LOGGER_DEBUG); - - if ($dead) { - foreach ($dead as $deceased) { - if (is_array($target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { - q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) - values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", - dbesc($target_item['mid']), - dbesc($deceased['hubloc_host']), - dbesc($deceased['hubloc_host']), - dbesc($deceased['hubloc_host']), - dbesc('undeliverable/unresponsive site'), - dbesc(datetime_convert()), - dbesc($channel['channel_hash']), - dbesc(random_string(48)) - ); - } + if (self::$deliveries) { + do_delivery(self::$deliveries); + } + + if ($dead_hosts && is_array($target_item) && (!$target_item['item_deleted']) && (!get_config('system', 'disable_dreport'))) { + foreach ($dead_hosts as $deceased_host) { + $r = q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) + values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", + dbesc($target_item['mid']), + dbesc($deceased_host), + dbesc($deceased_host), + dbesc($deceased_host), + dbesc('undeliverable/unresponsive site'), + dbesc(datetime_convert()), + dbesc(self::$channel['channel_hash']), + dbesc(new_uuid()) + ); } } - call_hooks('notifier_end',$target_item); + call_hooks('notifier_end', $target_item); logger('notifer: complete.'); + return; } |