diff options
author | redmatrix <git@macgirvin.com> | 2016-05-19 19:42:45 -0700 |
---|---|---|
committer | redmatrix <git@macgirvin.com> | 2016-05-19 19:42:45 -0700 |
commit | 5b2474238eb0d257db14b0668ef25eab92e53fea (patch) | |
tree | 30518577199eeb91e902ed1f3b39401268eece6c /Zotlabs/Daemon/Notifier.php | |
parent | 6e7d7c50174ffe3db78c5318dde0d9b0b1f416b8 (diff) | |
download | volse-hubzilla-5b2474238eb0d257db14b0668ef25eab92e53fea.tar.gz volse-hubzilla-5b2474238eb0d257db14b0668ef25eab92e53fea.tar.bz2 volse-hubzilla-5b2474238eb0d257db14b0668ef25eab92e53fea.zip |
first phase of daemon refactoring
Diffstat (limited to 'Zotlabs/Daemon/Notifier.php')
-rw-r--r-- | Zotlabs/Daemon/Notifier.php | 659 |
1 files changed, 659 insertions, 0 deletions
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php new file mode 100644 index 000000000..919f98823 --- /dev/null +++ b/Zotlabs/Daemon/Notifier.php @@ -0,0 +1,659 @@ +<?php /** @file */ + +namespace Zotlabs\Daemon; + +require_once('include/queue_fn.php'); +require_once('include/html2plain.php'); + +/* + * This file was at one time responsible for doing all deliveries, but this caused + * big problems on shared hosting systems, where the process might get killed by the + * hosting provider and nothing would get delivered. + * It now only delivers one message under certain cases, and invokes a queued + * delivery mechanism (include/deliver.php) to deliver individual contacts at + * controlled intervals. + * This has a much better chance of surviving random processes getting killed + * by the hosting provider. + * + * The basic flow is: + * Identify the type of message + * Collect any information that needs to be sent + * Convert it into a suitable generic format for sending + * Figure out who the recipients are and if we need to relay + * through a conversation owner + * Once we know what recipients are involved, collect a list of + * destination sites + * Build and store a queue item for each unique site and invoke + * a delivery process for each site or a small number of sites (1-3) + * and add a slight delay between each delivery invocation if desired (usually) + * + */ + +/* + * The notifier is typically called with: + * + * proc_run('php', "include/notifier.php", COMMAND, ITEM_ID); + * + * where COMMAND is one of the following: + * + * activity (in diaspora.php, dfrn_confirm.php, profiles.php) + * comment-import (in diaspora.php, items.php) + * comment-new (in item.php) + * drop (in diaspora.php, items.php, photos.php) + * edit_post (in item.php) + * event (in events.php) + * expire (in items.php) + * like (in like.php, poke.php) + * mail (in message.php) + * tag (in photos.php, poke.php, tagger.php) + * tgroup (in items.php) + * wall-new (in photos.php, item.php) + * + * and ITEM_ID is the id of the item in the database that needs to be sent to others. + * + * ZOT + * permission_create abook_id + * permission_update abook_id + * refresh_all channel_id + * purge_all channel_id + * expire channel_id + * relay item_id (item was relayed to owner, we will deliver it as owner) + * single_activity item_id (deliver to a singleton network from the appropriate clone) + * single_mail mail_id (deliver to a singleton network from the appropriate clone) + * location channel_id + * request channel_id xchan_hash message_id + * rating xlink_id + * + */ + + +require_once('include/zot.php'); +require_once('include/queue_fn.php'); +require_once('include/datetime.php'); +require_once('include/items.php'); +require_once('include/bbcode.php'); +require_once('include/identity.php'); +require_once('include/Contact.php'); + + +class Notifier { + + static public function run($argc,$argv){ + + if($argc < 3) + return; + + logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG); + + $cmd = $argv[1]; + + $item_id = $argv[2]; + + $extra = (($argc > 3) ? $argv[3] : null); + + if(! $item_id) + return; + + $sys = get_sys_channel(); + + $deliveries = array(); + + $dead_hubs = array(); + + $dh = q("select site_url from site where site_dead = 1"); + if($dh) { + foreach($dh as $dead) { + $dead_hubs[] = $dead['site_url']; + } + } + + + $request = false; + $mail = false; + $top_level = false; + $location = false; + $recipients = array(); + $url_recipients = array(); + $normal_mode = true; + $packet_type = 'undefined'; + + if($cmd === 'mail' || $cmd === 'single_mail') { + $normal_mode = false; + $mail = true; + $private = true; + $message = q("SELECT * FROM `mail` WHERE `id` = %d LIMIT 1", + intval($item_id) + ); + if(! $message) { + return; + } + xchan_mail_query($message[0]); + $uid = $message[0]['channel_id']; + $recipients[] = $message[0]['from_xchan']; // include clones + $recipients[] = $message[0]['to_xchan']; + $item = $message[0]; + + $encoded_item = encode_mail($item); + + $s = q("select * from channel where channel_id = %d limit 1", + intval($item['channel_id']) + ); + if($s) + $channel = $s[0]; + + } + elseif($cmd === 'request') { + $channel_id = $item_id; + $xchan = $argv[3]; + $request_message_id = $argv[4]; + + $s = q("select * from channel where channel_id = %d limit 1", + intval($channel_id) + ); + if($s) + $channel = $s[0]; + + $private = true; + $recipients[] = $xchan; + $packet_type = 'request'; + $normal_mode = false; + } + elseif($cmd == 'permission_update' || $cmd == 'permission_create') { + // Get the (single) recipient + $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", + intval($item_id) + ); + if($r) { + $uid = $r[0]['abook_channel']; + // Get the sender + $channel = channelx_by_n($uid); + if($channel) { + $perm_update = array('sender' => $channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); + + if($cmd == 'permission_create') + call_hooks('permissions_create',$perm_update); + else + call_hooks('permissions_update',$perm_update); + + if($perm_update['success']) { + if($perm_update['deliveries']) { + $deliveries[] = $perm_update['deliveries']; + do_delivery($deliveries); + } + return; + } + else { + $recipients[] = $r[0]['abook_xchan']; + $private = false; + $packet_type = 'refresh'; + $packet_recips = array(array('guid' => $r[0]['xchan_guid'],'guid_sig' => $r[0]['xchan_guid_sig'],'hash' => $r[0]['xchan_hash'])); + } + } + } + } + elseif($cmd === 'refresh_all') { + logger('notifier: refresh_all: ' . $item_id); + $uid = $item_id; + $channel = channelx_by_n($item_id); + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) + ); + if($r) { + foreach($r as $rr) { + $recipients[] = $rr['abook_xchan']; + } + } + $private = false; + $packet_type = 'refresh'; + } + elseif($cmd === 'location') { + logger('notifier: location: ' . $item_id); + $s = q("select * from channel where channel_id = %d limit 1", + intval($item_id) + ); + if($s) + $channel = $s[0]; + $uid = $item_id; + $recipients = array(); + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) + ); + if($r) { + foreach($r as $rr) { + $recipients[] = $rr['abook_xchan']; + } + } + + $encoded_item = array('locations' => zot_encode_locations($channel),'type' => 'location', 'encoding' => 'zot'); + $target_item = array('aid' => $channel['channel_account_id'],'uid' => $channel['channel_id']); + $private = false; + $packet_type = 'location'; + $location = true; + } + elseif($cmd === 'purge_all') { + logger('notifier: purge_all: ' . $item_id); + $s = q("select * from channel where channel_id = %d limit 1", + intval($item_id) + ); + if($s) + $channel = $s[0]; + $uid = $item_id; + $recipients = array(); + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) + ); + if($r) { + foreach($r as $rr) { + $recipients[] = $rr['abook_xchan']; + } + } + $private = false; + $packet_type = 'purge'; + } + else { + + // Normal items + + // Fetch the target item + + $r = q("SELECT * FROM item WHERE id = %d and parent != 0 LIMIT 1", + intval($item_id) + ); + + if(! $r) + return; + + xchan_query($r); + + $r = fetch_post_tags($r); + + $target_item = $r[0]; + $deleted_item = false; + + if(intval($target_item['item_deleted'])) { + logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG); + $deleted_item = true; + } + + if(intval($target_item['item_type']) != ITEM_TYPE_POST) { + logger('notifier: target item not forwardable: type ' . $target_item['item_type'], LOGGER_DEBUG); + return; + } + if(intval($target_item['item_unpublished']) || intval($target_item['item_delayed']) || intval($target_item['item_hidden'])) { + logger('notifier: target item not published, so not forwardable', LOGGER_DEBUG); + return; + } + + if(strpos($target_item['postopts'],'nodeliver') !== false) { + logger('notifier: target item is undeliverable', LOGGER_DEBUG); + return; + } + + $s = q("select * from channel left join xchan on channel_hash = xchan_hash where channel_id = %d limit 1", + intval($target_item['uid']) + ); + if($s) + $channel = $s[0]; + + if($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) { + logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}", LOGGER_NORMAL, LOG_WARNING); + return; + } + + + if($target_item['id'] == $target_item['parent']) { + $parent_item = $target_item; + $top_level_post = true; + } + else { + // fetch the parent item + $r = q("SELECT * from item where id = %d order by id asc", + intval($target_item['parent']) + ); + + if(! $r) + return; + + if(strpos($r[0]['postopts'],'nodeliver') !== false) { + logger('notifier: target item is undeliverable', LOGGER_DEBUG, LOG_NOTICE); + return; + } + + xchan_query($r); + $r = fetch_post_tags($r); + + $parent_item = $r[0]; + $top_level_post = false; + } + + // avoid looping of discover items 12/4/2014 + + if($sys && $parent_item['uid'] == $sys['channel_id']) + return; + + $encoded_item = encode_item($target_item); + + // Send comments to the owner to re-deliver to everybody in the conversation + // We only do this if the item in question originated on this site. This prevents looping. + // To clarify, a site accepting a new comment is responsible for sending it to the owner for relay. + // Relaying should never be initiated on a post that arrived from elsewhere. + + // We should normally be able to rely on ITEM_ORIGIN, but start_delivery_chain() incorrectly set this + // flag on comments for an extended period. So we'll also call comment_local_origin() which looks at + // the hostname in the message_id and provides a second (fallback) opinion. + + $relay_to_owner = (((! $top_level_post) && (intval($target_item['item_origin'])) && comment_local_origin($target_item)) ? true : false); + + + + $uplink = false; + + // $cmd === 'relay' indicates the owner is sending it to the original recipients + // don't allow the item in the relay command to relay to owner under any circumstances, it will loop + + logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); + logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false'), LOGGER_DATA, LOG_DEBUG); + + // tag_deliver'd post which needs to be sent back to the original author + + if(($cmd === 'uplink') && intval($parent_item['item_uplink']) && (! $top_level_post)) { + logger('notifier: uplink'); + $uplink = true; + } + + if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) { + logger('notifier: followup relay', LOGGER_DEBUG); + $recipients = array(($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); + $private = true; + if(! $encoded_item['flags']) + $encoded_item['flags'] = array(); + $encoded_item['flags'][] = 'relay'; + } + else { + logger('notifier: normal distribution', LOGGER_DEBUG); + if($cmd === 'relay') + logger('notifier: owner relay'); + + // if our parent is a tag_delivery recipient, uplink to the original author causing + // a delivery fork. + + if(($parent_item) && intval($parent_item['item_uplink']) && (! $top_level_post) && ($cmd !== 'uplink')) { + // don't uplink a relayed post to the relay owner + if($parent_item['source_xchan'] !== $parent_item['owner_xchan']) { + logger('notifier: uplinking this item'); + proc_run('php','include/notifier.php','uplink',$item_id); + } + } + + $private = false; + $recipients = collect_recipients($parent_item,$private); + + // FIXME add any additional recipients such as mentions, etc. + + // don't send deletions onward for other people's stuff + // TODO verify this is needed - copied logic from same place in old code + + if(intval($target_item['item_deleted']) && (! intval($target_item['item_wall']))) { + logger('notifier: ignoring delete notification for non-wall item', LOGGER_NORMAL, LOG_NOTICE); + return; + } + } + + } + + $walltowall = (($top_level_post && $channel['xchan_hash'] === $target_item['author_xchan']) ? true : false); + + // Generic delivery section, we have an encoded item and recipients + // Now start the delivery process + + $x = $encoded_item; + $x['title'] = 'private'; + $x['body'] = 'private'; + logger('notifier: encoded item: ' . print_r($x,true), LOGGER_DATA, LOG_DEBUG); + + stringify_array_elms($recipients); + if(! $recipients) + return; + +// logger('notifier: recipients: ' . print_r($recipients,true), LOGGER_NORMAL, LOG_DEBUG); + + $env_recips = (($private) ? array() : null); + + $details = q("select xchan_hash, xchan_instance_url, xchan_network, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . implode(',',$recipients) . ")"); + + + $recip_list = array(); + + if($details) { + foreach($details as $d) { + + $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; + if($private) + $env_recips[] = array('guid' => $d['xchan_guid'],'guid_sig' => $d['xchan_guid_sig'],'hash' => $d['xchan_hash']); + + if($d['xchan_network'] === 'mail' && $normal_mode) { + $delivery_options = get_xconfig($d['xchan_hash'],'system','delivery_mode'); + if(! $delivery_options) + format_and_send_email($channel,$d,$target_item); + } + } + } + + + $narr = array( + 'channel' => $channel, + 'env_recips' => $env_recips, + 'packet_recips' => $packet_recips, + 'recipients' => $recipients, + 'item' => $item, + 'target_item' => $target_item, + 'top_level_post' => $top_level_post, + 'private' => $private, + 'relay_to_owner' => $relay_to_owner, + 'uplink' => $uplink, + 'cmd' => $cmd, + 'mail' => $mail, + 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), + 'location' => $location, + 'request' => $request, + 'normal_mode' => $normal_mode, + 'packet_type' => $packet_type, + 'walltowall' => $walltowall, + 'queued' => array() + ); + + call_hooks('notifier_process', $narr); + if($narr['queued']) { + foreach($narr['queued'] as $pq) + $deliveries[] = $pq; + } + + // notifier_process can alter the recipient list + + $recipients = $narr['recipients']; + $env_recips = $narr['env_recips']; + $packet_recips = $narr['packet_recips']; + + if(($private) && (! $env_recips)) { + // shouldn't happen + logger('notifier: private message with no envelope recipients.' . print_r($argv,true), LOGGER_NORMAL, LOG_NOTICE); + } + + logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG); + + + // Now we have collected recipients (except for external mentions, FIXME) + // Let's reduce this to a set of hubs. + + $r = q("select * from hubloc where hubloc_hash in (" . implode(',',$recipients) . ") + and hubloc_error = 0 and hubloc_deleted = 0" + ); + + + if(! $r) { + logger('notifier: no hubs', LOGGER_NORMAL, LOG_NOTICE); + return; + } + + $hubs = $r; + + + + /** + * Reduce the hubs to those that are unique. For zot hubs, we need to verify uniqueness by the sitekey, since it may have been + * a re-install which has not yet been detected and pruned. + * For other networks which don't have or require sitekeys, we'll have to use the URL + */ + + + $hublist = array(); // this provides an easily printable list for the logs + $dhubs = array(); // delivery hubs where we store our resulting unique array + $keys = array(); // array of keys to check uniquness for zot hubs + $urls = array(); // array of urls to check uniqueness of hubs from other networks + + + foreach($hubs as $hub) { + if(in_array($hub['hubloc_url'],$dead_hubs)) { + logger('skipping dead hub: ' . $hub['hubloc_url'], LOGGER_DEBUG, LOG_INFO); + continue; + } + + if($hub['hubloc_network'] == 'zot') { + if(! in_array($hub['hubloc_sitekey'],$keys)) { + $hublist[] = $hub['hubloc_host']; + $dhubs[] = $hub; + $keys[] = $hub['hubloc_sitekey']; + } + } + else { + if(! in_array($hub['hubloc_url'],$urls)) { + $hublist[] = $hub['hubloc_host']; + $dhubs[] = $hub; + $urls[] = $hub['hubloc_url']; + } + } + } + + logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG, LOG_DEBUG); + + + foreach($dhubs as $hub) { + + if($hub['hubloc_network'] !== 'zot') { + + $narr = array( + 'channel' => $channel, + 'env_recips' => $env_recips, + 'packet_recips' => $packet_recips, + 'recipients' => $recipients, + 'item' => $item, + 'target_item' => $target_item, + 'hub' => $hub, + 'top_level_post' => $top_level_post, + 'private' => $private, + 'relay_to_owner' => $relay_to_owner, + 'uplink' => $uplink, + 'cmd' => $cmd, + 'mail' => $mail, + 'single' => (($cmd === 'single_mail' || $cmd === 'single_activity') ? true : false), + 'location' => $location, + 'request' => $request, + 'normal_mode' => $normal_mode, + 'packet_type' => $packet_type, + 'walltowall' => $walltowall, + 'queued' => array() + ); + + + call_hooks('notifier_hub',$narr); + if($narr['queued']) { + foreach($narr['queued'] as $pq) + $deliveries[] = $pq; + } + continue; + + } + + // singleton deliveries by definition 'not got zot'. + // Single deliveries are other federated networks (plugins) and we're essentially + // delivering only to those that have this site url in their abook_instance + // and only from within a sync operation. This means if you post from a clone, + // and a connection is connected to one of your other clones; assuming that hub + // is running it will receive a sync packet. On receipt of this sync packet it + // will invoke a delivery to those connections which are connected to just that + // hub instance. + + if($cmd === 'single_mail' || $cmd === 'single_activity') { + continue; + } + + // default: zot protocol + + $hash = random_string(); + $packet = null; + + if($packet_type === 'refresh' || $packet_type === 'purge') { + $packet = zot_build_packet($channel,$packet_type,(($packet_recips) ? $packet_recips : null)); + } + elseif($packet_type === 'request') { + $packet = zot_build_packet($channel,$packet_type,$env_recips,$hub['hubloc_sitekey'],$hash, + array('message_id' => $request_message_id) + ); + } + + if($packet) { + queue_insert(array( + 'hash' => $hash, + 'account_id' => $channel['channel_account_id'], + 'channel_id' => $channel['channel_id'], + 'posturl' => $hub['hubloc_callback'], + 'notify' => $packet + )); + } + else { + $packet = zot_build_packet($channel,'notify',$env_recips,(($private) ? $hub['hubloc_sitekey'] : null),$hash); + queue_insert(array( + 'hash' => $hash, + 'account_id' => $target_item['aid'], + 'channel_id' => $target_item['uid'], + 'posturl' => $hub['hubloc_callback'], + 'notify' => $packet, + 'msg' => json_encode($encoded_item) + )); + + // only create delivery reports for normal undeleted items + if(is_array($target_item) && array_key_exists('postopts',$target_item) && (! $target_item['item_deleted']) && (! get_config('system','disable_dreport'))) { + q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan, dreport_queue ) values ( '%s','%s','%s','%s','%s','%s','%s' ) ", + dbesc($target_item['mid']), + dbesc($hub['hubloc_host']), + dbesc($hub['hubloc_host']), + dbesc('queued'), + dbesc(datetime_convert()), + dbesc($channel['channel_hash']), + dbesc($hash) + ); + } + } + + $deliveries[] = $hash; + } + + if($normal_mode) { + $x = q("select * from hook where hook = 'notifier_normal'"); + if($x) + proc_run('php','include/deliver_hooks.php', $target_item['id']); + } + + if($deliveries) + do_delivery($deliveries); + + logger('notifier: basic loop complete.', LOGGER_DEBUG); + + call_hooks('notifier_end',$target_item); + + logger('notifer: complete.'); + return; + + } +} + |