$channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); if($cmd === 'permission_create') call_hooks('permissions_create',$perm_update); elseif($cmd === 'permission_accept') call_hooks('permissions_accept',$perm_update); elseif($cmd === 'permission_reject') call_hooks('permissions_reject',$perm_update); else call_hooks('permissions_update',$perm_update); if($perm_update['success']) { if($perm_update['deliveries']) { $deliveries[] = $perm_update['deliveries']; do_delivery($deliveries); } return; } else { $recipients[] = $r[0]['abook_xchan']; $private = false; $packet_type = 'refresh'; $packet_recips = array(array('guid' => $r[0]['xchan_guid'],'guid_sig' => $r[0]['xchan_guid_sig'],'hash' => $r[0]['xchan_hash'])); } } } } elseif($cmd === 'refresh_all') { logger('notifier: refresh_all: ' . $item_id); $uid = $item_id; $channel = channelx_by_n($item_id); $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); if($r) { foreach($r as $rr) { $recipients[] = $rr['abook_xchan']; } } $private = false; $packet_type = 'refresh'; } elseif($cmd === 'location') { logger('notifier: location: ' . $item_id); $s = q("select * from channel where channel_id = %d limit 1", intval($item_id) ); if($s) $channel = $s[0]; $uid = $item_id; $recipients = array(); $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); if($r) { foreach($r as $rr) { $recipients[] = $rr['abook_xchan']; } } $encoded_item = array('locations' => Libzot::encode_locations($channel),'type' => 'location', 'encoding' => 'zot'); $target_item = array('aid' => $channel['channel_account_id'],'uid' => $channel['channel_id']); $private = false; $packet_type = 'location'; $location = true; } elseif($cmd === 'purge') { $xchan = $argv[3]; logger('notifier: purge: ' . $item_id . ' => ' . $xchan); if (! $xchan) { return; } $channel = channelx_by_n($item_id); $recipients[] = $xchan; $private = true; $packet_type = 'purge'; $packet_recips[] = ['hash' => $xchan]; } elseif($cmd === 'purge_all') { logger('notifier: purge_all: ' . $item_id); $channel = channelx_by_n($item_id); $recipients = []; $r = q("select abook_xchan from abook where abook_channel = %d and abook_self = 0", intval($item_id) ); if (! $r) { return; } foreach ($r as $rr) { $recipients[] = $rr['abook_xchan']; $packet_recips[] = ['hash' => $rr['abook_xchan']]; } $private = false; $packet_type = 'purge'; } else { // Normal items // Fetch the target item $r = q("SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1", intval($item_id) ); if(! $r) return; xchan_query($r); $r = fetch_post_tags($r); $target_item = $r[0]; if(in_array($target_item['author']['xchan_network'], ['rss', 'anon'])) { logger('notifier: target item author is not a fetchable actor', LOGGER_DEBUG); return; } $deleted_item = false; if(intval($target_item['item_deleted'])) { logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG); $deleted_item = true; } if(! in_array(intval($target_item['item_type']), [ ITEM_TYPE_POST ] )) { $hookinfo=[ 'targetitem'=>$target_item, 'deliver'=>false ]; if (intval($target_item['item_type'] == ITEM_TYPE_CUSTOM)) { call_hooks('customitem_deliver',$hookinfo); } if (!$hookinfo['deliver']) { logger('notifier: target item not forwardable: type ' . $target_item['item_type'], LOGGER_DEBUG); return; } $target_item = $hookinfo['targetitem']; } // Check for non published items, but allow an exclusion for transmitting hidden file activities if(intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) || intval($target_item['item_blocked']) || ( intval($target_item['item_hidden']) && ($target_item['obj_type'] !== ACTIVITY_OBJ_FILE))) { logger('notifier: target item not published, so not forwardable', LOGGER_DEBUG); return; } if(strpos($target_item['postopts'],'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG); return; } $s = q("select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", intval($target_item['uid']) ); if($s) $channel = $s[0]; if($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) { logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); return; } if($target_item['mid'] === $target_item['parent_mid']) { $parent_item = $target_item; $top_level_post = true; } else { // fetch the parent item $r = q("SELECT * from item where id = %d order by id asc", intval($target_item['parent']) ); if(! $r) return; if(strpos($r[0]['postopts'],'nodeliver') !== false) { logger('notifier: target item is undeliverable', LOGGER_DEBUG, LOG_NOTICE); return; } xchan_query($r); $r = fetch_post_tags($r); $parent_item = $r[0]; $top_level_post = false; } // avoid looping of discover items 12/4/2014 if($sys && $parent_item['uid'] == $sys['channel_id']) return; $encoded_item = encode_item($target_item); // Re-use existing signature unless the activity type changed to a Tombstone, which won't verify. $m = ((intval($target_item['item_deleted'])) ? '' : get_iconfig($target_item,'activitystreams','signed_data')); if($m) { $activity = json_decode($m,true); } else { $activity = array_merge(['@context' => [ ACTIVITYSTREAMS_JSONLD_REV, 'https://w3id.org/security/v1', z_root() . ZOT_APSCHEMA_REV ]], Activity::encode_activity($target_item) ); } logger('target_item: ' . print_r($target_item,true), LOGGER_DEBUG); logger('encoded: ' . print_r($activity,true), LOGGER_DEBUG); // Send comments to the owner to re-deliver to everybody in the conversation // We only do this if the item in question originated on this site. This prevents looping. // To clarify, a site accepting a new comment is responsible for sending it to the owner for relay. // Relaying should never be initiated on a post that arrived from elsewhere. // We should normally be able to rely on ITEM_ORIGIN, but start_delivery_chain() incorrectly set this // flag on comments for an extended period. So we'll also call comment_local_origin() which looks at // the hostname in the message_id and provides a second (fallback) opinion. $relay_to_owner = (((! $top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item)) ? true : false); $uplink = false; // $cmd === 'relay' indicates the owner is sending it to the original recipients // don't allow the item in the relay command to relay to owner under any circumstances, it will loop logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); // tag_deliver'd post which needs to be sent back to the original author if(($cmd === 'uplink') && intval($parent_item['item_uplink']) && (! $top_level_post)) { logger('notifier: uplink'); $uplink = true; } if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) { logger('notifier: followup relay', LOGGER_DEBUG); $recipients = array(($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); $private = true; if(! $encoded_item['flags']) $encoded_item['flags'] = array(); $encoded_item['flags'][] = 'relay'; $upstream = true; } else { logger('notifier: normal distribution', LOGGER_DEBUG); if($cmd === 'relay') logger('notifier: owner relay'); $upstream = false; // if our parent is a tag_delivery recipient, uplink to the original author causing // a delivery fork. if(($parent_item) && intval($parent_item['item_uplink']) && (! $top_level_post) && ($cmd !== 'uplink')) { // don't uplink a relayed post to the relay owner if($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { logger('notifier: uplinking this item'); Master::Summon(array('Notifier','uplink',$item_id)); } } $private = false; $recipients = collect_recipients($parent_item,$private); if ($top_level_post) { // remove clones who will receive the post via sync $recipients = array_diff($recipients, [ $target_item['owner_xchan'] ]); } // FIXME add any additional recipients such as mentions, etc. // don't send deletions onward for other people's stuff // TODO verify this is needed - copied logic from same place in old code if(intval($target_item['item_deleted']) && (! intval($target_item['item_wall']))) { logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE); return; } } } $walltowall = (($top_level_post && $channel['xchan_hash'] === $target_item['author_xchan']) ? true : false); // Generic delivery section, we have an encoded item and recipients // Now start the delivery process $x = $encoded_item; $x['title'] = 'private'; $x['body'] = 'private'; logger('notifier: encoded item: ' . print_r($x,true), LOGGER_DATA, LOG_DEBUG); //logger('notifier: encoded activity: ' . print_r($activity,true), LOGGER_DATA, LOG_DEBUG); stringify_array_elms($recipients); if(! $recipients) { logger('no recipients'); return; } // logger('notifier: recipients: ' . print_r($recipients,true), LOGGER_NORMAL, LOG_DEBUG); $env_recips = (($private) ? array() : null); $details = q("select xchan_hash, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . protect_sprintf(implode(',',$recipients)) . ")"); $recip_list = array(); if($details) { foreach($details as $d) { $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; if($private) { $env_recips[] = [ 'guid' => $d['xchan_guid'], 'guid_sig' => $d['xchan_guid_sig'], 'hash' => $d['xchan_hash'] ]; } } } $narr = [ 'channel' => $channel, 'upstream' => $upstream, 'env_recips' => $env_recips, 'packet_recips' => $packet_recips, 'recipients' => $recipients, 'item' => $item, 'target_item' => $target_item, 'parent_item' => $parent_item, 'top_level_post' => $top_level_post, 'private' => $private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, 'mail' => $mail, 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), 'location' => $location, 'request' => $request, 'normal_mode' => $normal_mode, 'packet_type' => $packet_type, 'walltowall' => $walltowall, 'queued' => [] ]; call_hooks('notifier_process', $narr); if($narr['queued']) { foreach($narr['queued'] as $pq) $deliveries[] = $pq; } // notifier_process can alter the recipient list $recipients = $narr['recipients']; $env_recips = $narr['env_recips']; $packet_recips = $narr['packet_recips']; if(($private) && (! $env_recips)) { // shouldn't happen logger('notifier: private message with no envelope recipients.' . print_r($argv,true), LOGGER_NORMAL, LOG_NOTICE); } logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG); // Now we have collected recipients (except for external mentions, FIXME) // Let's reduce this to a set of hubs; checking that the site is not dead. $hubs = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc left join site on site_url = hubloc_url where hubloc_hash in (" . protect_sprintf(implode(',',$recipients)) . ") and hubloc_error = 0 and hubloc_deleted = 0" ); // public posts won't make it to the local public stream unless there's a recipient on this site. // This code block sees if it's a public post and localhost is missing, and if so adds an entry for the local sys channel to the $hubs list if (! $private) { $found_localhost = false; if ($hubs) { foreach ($hubs as $h) { if ($h['hubloc_url'] === z_root()) { $found_localhost = true; break; } } } if (! $found_localhost) { $localhub = q("select hubloc.*, site.site_crypto, site.site_flags, site.site_version, site.site_project, site.site_dead from hubloc left join site on site_url = hubloc_url where hubloc_id_url = '%s' and hubloc_error = 0 and hubloc_deleted = 0", dbesc(z_root() . '/channel/sys') ); if ($localhub) { $hubs = array_merge($hubs, $localhub); } } } if(! $hubs) { logger('notifier: no hubs', LOGGER_NORMAL, LOG_NOTICE); return; } /** * Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey, * since it may have been a re-install which has not yet been detected and pruned. * For other networks which don't have or require sitekeys, we'll have to use the URL */ $hublist = []; // this provides an easily printable list for the logs $dhubs = []; // delivery hubs where we store our resulting unique array $keys = []; // array of keys to check uniquness for zot hubs $urls = []; // array of urls to check uniqueness of hubs from other networks $hub_env = []; // per-hub envelope so we don't broadcast the entire envelope to all $dead = []; // known dead hubs - report them as undeliverable foreach($hubs as $hub) { if (intval($hub['site_dead'])) { $dead[] = $hub; continue; } if($env_recips) { foreach($env_recips as $er) { if($hub['hubloc_hash'] === $er['hash']) { if(! array_key_exists($hub['hubloc_host'] . $hub['hubloc_sitekey'], $hub_env)) { $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] = []; } $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']][] = $er; } } } if($hub['hubloc_network'] == 'zot') { if(! in_array($hub['hubloc_sitekey'],$keys)) { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; $dhubs[] = $hub; $keys[] = $hub['hubloc_sitekey']; } } else { if(! in_array($hub['hubloc_url'],$urls)) { if($hub['hubloc_url'] === z_root()) { //deliver to local hub first array_unshift($hublist, $hub['hubloc_host'] . ' ' . $hub['hubloc_network']); array_unshift($dhubs, $hub); } else { $hublist[] = $hub['hubloc_host'] . ' ' . $hub['hubloc_network']; $dhubs[] = $hub; } $urls[] = $hub['hubloc_url']; } } } logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG, LOG_DEBUG); foreach($dhubs as $hub) { logger('notifier_hub: ' . $hub['hubloc_url'],LOGGER_DEBUG); if(! in_array($hub['hubloc_network'], [ 'zot','zot6' ])) { $narr = [ 'channel' => $channel, 'upstream' => $upstream, 'env_recips' => $env_recips, 'packet_recips' => $packet_recips, 'recipients' => $recipients, 'item' => $item, 'target_item' => $target_item, 'parent_item' => $parent_item, 'hub' => $hub, 'top_level_post' => $top_level_post, 'private' => $private, 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, 'mail' => $mail, 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), 'location' => $location, 'request' => $request, 'normal_mode' => $normal_mode, 'packet_type' => $packet_type, 'walltowall' => $walltowall, 'queued' => [] ]; call_hooks('notifier_hub',$narr); if($narr['queued']) { foreach($narr['queued'] as $pq) $deliveries[] = $pq; } continue; } // singleton deliveries by definition 'not got zot'. // Single deliveries are other federated networks (plugins) and we're essentially // delivering only to those that have this site url in their abook_instance // and only from within a sync operation. This means if you post from a clone, // and a connection is connected to one of your other clones; assuming that hub // is running it will receive a sync packet. On receipt of this sync packet it // will invoke a delivery to those connections which are connected to just that // hub instance. if($cmd === 'single_mail' || $cmd === 'single_activity') { continue; } if(! in_array($hub['hubloc_network'], [ 'zot','zot6' ])) { continue; } // Do not change this to a uuid as long as we have traditional zot servers // in the loop. The signature verification step can't handle dashes in the // hashes. $hash = random_string(48); $packet = null; $pmsg = ''; if($packet_type === 'refresh' || $packet_type === 'purge') { if($hub['hubloc_network'] === 'zot6') { $packet = Libzot::build_packet($channel, $packet_type, ids_to_array($packet_recips,'hash')); } else { $packet = zot_build_packet($channel,$packet_type,(($packet_recips) ? $packet_recips : null)); } } if($packet_type === 'keychange' && $hub['hubloc_network'] === 'zot') { $pmsg = get_pconfig($channel['channel_id'],'system','keychange'); $packet = zot_build_packet($channel,$packet_type,(($packet_recips) ? $packet_recips : null)); } elseif($packet_type === 'request' && $hub['hubloc_network'] === 'zot') { $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); $packet = zot_build_packet($channel,$packet_type,$env,$hub['hubloc_sitekey'],$hub['site_crypto'], $hash, array('message_id' => $request_message_id) ); } if($packet) { Queue::insert( [ 'hash' => $hash, 'account_id' => $channel['channel_account_id'], 'channel_id' => $channel['channel_id'], 'posturl' => $hub['hubloc_callback'], 'driver' => $hub['hubloc_network'], 'notify' => $packet, 'msg' => (($pmsg) ? json_encode($pmsg) : '') ] ); } else { $env = (($hub_env && $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']]) ? $hub_env[$hub['hubloc_host'] . $hub['hubloc_sitekey']] : ''); if($hub['hubloc_network'] === 'zot6') { $zenv = []; if($env) { foreach($env as $e) { $zenv[] = $e['hash']; } } $packet_type = (($upstream || $uplink) ? 'response' : 'activity'); // block zot private reshares from zot6, as this could cause a number of privacy issues // due to parenting differences between the reshare implementations. In zot a reshare is // a standalone parent activity and in zot6 it is a followup/child of the original activity. // For public reshares, some comments to the reshare on the zot fork will not make it to zot6 // due to these different message models. This cannot be prevented at this time. if($packet_type === 'activity' && $activity['type'] === 'Announce' && intval($target_item['item_private'])) { continue; } $packet = Libzot::build_packet($channel,$packet_type,$zenv,$activity,'activitystreams',(($private) ? $hub['hubloc_sitekey'] : null),$hub['site_crypto']); } else { // currently zot6 delivery is only performed on normal items and not sync items or mail or anything else // Eventually we will do this for all deliveries, but for now ensure this is precisely what we are dealing // with before switching to zot6 as the primary zot6 handler checks for the existence of a message delivery report // to trigger dequeue'ing $z6 = (($encoded_item && $encoded_item['type'] === 'activity' && (! array_key_exists('allow_cid',$encoded_item))) ? true : false); if($z6) { $packet = zot6_build_packet($channel,'notify',$env, json_encode($encoded_item), (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'],$hash); } else { $packet = zot_build_packet($channel,'notify',$env, (($private) ? $hub['hubloc_sitekey'] : null), $hub['site_crypto'],$hash); } } // remove this after most hubs have updated to version 5.0 if(stripos($hub['site_project'], 'hubzilla') !== false && version_compare($hub['site_version'], '4.7.3', '<=')) { if($encoded_item['type'] === 'mail') { $encoded_item['from']['network'] = 'zot'; $encoded_item['from']['guid_sig'] = str_replace('sha256.', '', $encoded_item['from']['guid_sig']); } else { $encoded_item['owner']['network'] = 'zot'; $encoded_item['owner']['guid_sig'] = str_replace('sha256.', '', $encoded_item['owner']['guid_sig']); if(strpos($encoded_item['author']['url'], z_root()) === 0) { $encoded_item['author']['network'] = 'zot'; $encoded_item['author']['guid_sig'] = str_replace('sha256.', '', $encoded_item['author']['guid_sig']); } } } Queue::insert( [ 'hash' => $hash, 'account_id' => $target_item['aid'], 'channel_id' => $target_item['uid'], 'posturl' => $hub['hubloc_callback'], 'driver' => $hub['hubloc_network'], 'notify' => $packet, 'msg' => json_encode($encoded_item) ] ); // only create delivery reports for normal undeleted items if(is_array($target_item) && array_key_exists('postopts',$target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s','%s','%s','%s','%s','%s','%s' ) ", dbesc($target_item['mid']), dbesc($hub['hubloc_host']), dbesc($hub['hubloc_host']), dbesc('queued'), dbesc(datetime_convert()), dbesc($channel['channel_hash']), dbesc($hash) ); } } $deliveries[] = $hash; } if($normal_mode) { $x = q("select * from hook where hook = 'notifier_normal'"); if($x) { Master::Summon( [ 'Deliver_hooks', $target_item['id'] ] ); } } if($deliveries) do_delivery($deliveries); logger('notifier: basic loop complete.', LOGGER_DEBUG); if ($dead) { foreach ($dead as $deceased) { if (is_array($target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_name, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s', '%s','%s','%s','%s','%s','%s','%s' ) ", dbesc($target_item['mid']), dbesc($deceased['hubloc_host']), dbesc($deceased['hubloc_host']), dbesc($deceased['hubloc_host']), dbesc('undeliverable/unresponsive site'), dbesc(datetime_convert()), dbesc($channel['channel_hash']), dbesc(random_string(48)) ); } } } call_hooks('notifier_end',$target_item); logger('notifer: complete.'); return; } }