diff options
Diffstat (limited to 'include/notifier.php')
-rw-r--r-- | include/notifier.php | 257 |
1 files changed, 221 insertions, 36 deletions
diff --git a/include/notifier.php b/include/notifier.php index feb1c693c..59f472539 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -1,4 +1,4 @@ -<?php +<?php /** @file */ require_once("boot.php"); require_once('include/queue_fn.php'); @@ -53,6 +53,9 @@ require_once('include/html2plain.php'); * * ZOT * permission_update abook_id + * refresh_all channel_id + * purge_all channel_id + * expire channel_id * relay item_id (item was relayed to owner, we will deliver it as owner) * */ @@ -87,6 +90,9 @@ function notifier_run($argv, $argc){ if(! $item_id) return; + require_once('include/identity.php'); + $sys = get_sys_channel(); + if($cmd == 'permission_update') { // Get the recipient $r = q("select abook.*, hubloc.* from abook @@ -136,6 +142,7 @@ function notifier_run($argv, $argc){ $recipients = array(); $url_recipients = array(); $normal_mode = true; + $packet_type = 'undefined'; if($cmd === 'mail') { $normal_mode = false; @@ -164,14 +171,24 @@ function notifier_run($argv, $argc){ elseif($cmd === 'expire') { $normal_mode = false; $expire = true; - $items = q("SELECT * FROM `item` WHERE `uid` = %d AND `wall` = 1 - AND `deleted` = 1 AND `changed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE", - intval($item_id) + $items = q("SELECT * FROM item WHERE uid = %d AND ( item_flags & %d ) + AND ( item_restrict & %d ) AND `changed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE", + intval($item_id), + intval(ITEM_WALL), + intval(ITEM_DELETED) ); $uid = $item_id; $item_id = 0; - if(! count($items)) + if(! $items) return; + +// FIXME +// This will require a special zot packet containing a list of item message_id's to be expired. +// This packet will be public, since we cannot selectively deliver here. +// We need the handling on this end to create the array, and the handling on the remote end +// to verify permissions (for each item) and process it. Until this is complete, the expire feature will be disabled. + + return; } elseif($cmd === 'suggest') { $normal_mode = false; @@ -186,6 +203,46 @@ function notifier_run($argv, $argc){ $recipients[] = $suggest[0]['cid']; $item = $suggest[0]; } + elseif($cmd === 'refresh_all') { + logger('notifier: refresh_all: ' . $item_id); + $s = q("select * from channel where channel_id = %d limit 1", + intval($item_id) + ); + if($s) + $channel = $s[0]; + $uid = $item_id; + $recipients = array(); + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) + ); + if($r) { + foreach($r as $rr) { + $recipients[] = $rr['abook_xchan']; + } + } + $private = false; + $packet_type = 'refresh'; + } + elseif($cmd === 'purge_all') { + logger('notifier: purge_all: ' . $item_id); + $s = q("select * from channel where channel_id = %d limit 1", + intval($item_id) + ); + if($s) + $channel = $s[0]; + $uid = $item_id; + $recipients = array(); + $r = q("select abook_xchan from abook where abook_channel = %d", + intval($item_id) + ); + if($r) { + foreach($r as $rr) { + $recipients[] = $rr['abook_xchan']; + } + } + $private = false; + $packet_type = 'purge'; + } else { // Normal items @@ -204,13 +261,27 @@ function notifier_run($argv, $argc){ $r = fetch_post_tags($r); $target_item = $r[0]; - + + if($target_item['item_restrict'] & ITEM_DELETED) + logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG); + + $unforwardable = ITEM_UNPUBLISHED|ITEM_DELAYED_PUBLISH|ITEM_WEBPAGE|ITEM_BUILDBLOCK|ITEM_PDL; + if($target_item['item_restrict'] & $unforwardable) { + logger('notifier: target item not forwardable: flags ' . $target_item['item_restrict'], LOGGER_DEBUG); + return; + } + $s = q("select * from channel where channel_id = %d limit 1", intval($target_item['uid']) ); if($s) $channel = $s[0]; + if($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) { + logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}"); + return; + } + if($target_item['id'] == $target_item['parent']) { $parent_item = $target_item; @@ -231,22 +302,34 @@ function notifier_run($argv, $argc){ $top_level_post = false; } + // avoid looping of discover items 12/4/2014 + + if($sys && $parent_item['uid'] == $sys['channel_id']) + return; $encoded_item = encode_item($target_item); - $relay_to_owner = (((! $top_level_post) && ($target_item['item_flags'] & ITEM_ORIGIN)) ? true : false); + $uplink = false; + // $cmd === 'relay' indicates the owner is sending it to the original recipients // don't allow the item in the relay command to relay to owner under any circumstances, it will loop - logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false')); - logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false')); - logger('notifier: target_item_flags: ' . $target_item['item_flags'] . ' ' . (($target_item['item_flags'] & ITEM_ORIGIN ) ? 'true' : 'false')); + logger('notifier: relay_to_owner: ' . (($relay_to_owner) ? 'true' : 'false'), LOGGER_DATA); + logger('notifier: top_level_post: ' . (($top_level_post) ? 'true' : 'false'), LOGGER_DATA); + logger('notifier: target_item_flags: ' . $target_item['item_flags'] . ' ' . (($target_item['item_flags'] & ITEM_ORIGIN ) ? 'true' : 'false'), LOGGER_DATA); + + // tag_deliver'd post which needs to be sent back to the original author + + if(($cmd === 'uplink') && ($parent_item['item_flags'] & ITEM_UPLINK) && (! $top_level_post)) { + logger('notifier: uplink'); + $uplink = true; + } - if(($relay_to_owner) && ($cmd !== 'relay')) { + if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) { logger('notifier: followup relay', LOGGER_DEBUG); - $recipients = array($parent_item['owner_xchan']); + $recipients = array(($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']); $private = true; if(! $encoded_item['flags']) $encoded_item['flags'] = array(); @@ -256,6 +339,15 @@ function notifier_run($argv, $argc){ logger('notifier: normal distribution', LOGGER_DEBUG); if($cmd === 'relay') logger('notifier: owner relay'); + + // if our parent is a tag_delivery recipient, uplink to the original author causing + // a delivery fork. + + if(($parent_item['item_flags'] & ITEM_UPLINK) && (! $top_level_post) && ($cmd !== 'uplink')) { + logger('notifier: uplinking this item'); + proc_run('php','include/notifier.php','uplink',$item_id); + } + $private = false; $recipients = collect_recipients($parent_item,$private); @@ -275,40 +367,103 @@ function notifier_run($argv, $argc){ // Generic delivery section, we have an encoded item and recipients // Now start the delivery process - logger('notifier: encoded item: ' . print_r($encoded_item,true)); + $x = $encoded_item; + $x['title'] = 'private'; + $x['body'] = 'private'; + logger('notifier: encoded item: ' . print_r($x,true), LOGGER_DATA); stringify_array_elms($recipients); if(! $recipients) return; - logger('notifier: recipients: ' . print_r($recipients,true)); +// logger('notifier: recipients: ' . print_r($recipients,true)); - $env_recips = null; - if($private) { - $r = q("select xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . implode(',',$recipients) . ")"); - if($r) { - $env_recips = array(); - foreach($r as $rr) - $env_recips[] = array('guid' => $rr['xchan_guid'],'guid_sig' => $rr['xchan_guid_sig']); + $env_recips = (($private) ? array() : null); + + $details = q("select xchan_hash, xchan_instance_url, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . implode(',',$recipients) . ")"); + + $recip_list = array(); + + if($details) { + foreach($details as $d) { + + // If the recipient is federated from a traditional network they won't be able to + // handle nomadic identity. If we're publishing from a site that they aren't + // directly connected with, ignore them. + + // FIXME: make sure we run through a notifier loop on the hub they're connected + // with if this post comes in from a different hub - so that we will deliver to them. + + // On the down side, these channels will stop working if the hub they connected with + // goes down permanently, as they are (doh) not nomadic. + + if(($d['xchan_instance_url']) && ($d['xchan_instance_url'] != z_root())) + continue; + + + $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')'; + if($private) + $env_recips[] = array('guid' => $d['xchan_guid'],'guid_sig' => $d['xchan_guid_sig']); } } + if(($private) && (! $env_recips)) { + // shouldn't happen + logger('notifier: private message with no envelope recipients.' . print_r($argv,true)); + } + + logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG); + + // Now we have collected recipients (except for external mentions, FIXME) // Let's reduce this to a set of hubs. // for public posts always include our own hub - $sql_extra = (($private) ? "" : " or hubloc_url = '" . z_root() . "' "); + $sql_extra = (($private) ? "" : " or hubloc_url = '" . dbesc(z_root()) . "' "); + + + if($relay_to_owner && (! $private) && ($cmd !== 'relay')) { + + // If sending a followup to the post owner, only send it to one channel clone - to avoid race conditions. + // In this case we'll pick the most recently contacted hub, as their primary might be down and the most + // recently contacted has the best chance of being alive. + + // For private posts or uplinks we have to do things differently as only the sending clone will have the recipient list. + // We have to send to all clone channels of the owner to find out who has the definitive list. Posts with + // item_private set (but no ACL list) will return empty recipients (except for the sender and owner) in + // collect_recipients() above. The end result is we should get only one delivery per delivery chain if we + // aren't the owner or author. + + + $r = q("select hubloc_sitekey, hubloc_flags, hubloc_callback, hubloc_host from hubloc + where hubloc_hash in (" . implode(',',$recipients) . ") group by hubloc_sitekey order by hubloc_connected desc limit 1"); + } + else { + $r = q("select hubloc_sitekey, hubloc_flags, hubloc_callback, hubloc_host from hubloc + where hubloc_hash in (" . implode(',',$recipients) . ") $sql_extra group by hubloc_sitekey"); + } - $r = q("select distinct(hubloc_callback),hubloc_host,hubloc_sitekey from hubloc - where hubloc_hash in (" . implode(',',$recipients) . ") $sql_extra group by hubloc_callback"); if(! $r) { logger('notifier: no hubs'); return; } $hubs = $r; + $hublist = array(); + $keys = array(); + + foreach($hubs as $hub) { + // don't try to deliver to deleted hublocs - and inexplicably SQL "distinct" and "group by" + // both return records with duplicate keys in rare circumstances + if((! ($hub['hubloc_flags'] & HUBLOC_FLAGS_DELETED)) && (! in_array($hub['hubloc_sitekey'],$keys))) { + $hublist[] = $hub['hubloc_host']; + $keys[] = $hub['hubloc_sitekey']; + } + } + + logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG); $interval = ((get_config('system','delivery_interval') !== false) ? intval(get_config('system','delivery_interval')) : 2 ); @@ -321,19 +476,45 @@ function notifier_run($argv, $argc){ $deliver = array(); foreach($hubs as $hub) { + + if(defined('DIASPORA_RELIABILITY_EMULATION')) { + $cointoss = mt_rand(0,2); + if($cointoss == 2) { + continue; + } + } + $hash = random_string(); - $n = zot_build_packet($channel,'notify',$env_recips,(($private) ? $hub['hubloc_sitekey'] : null),$hash); - q("insert into outq ( outq_hash, outq_account, outq_channel, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', %d, '%s', '%s', '%s', '%s' )", - dbesc($hash), - intval($target_item['aid']), - intval($target_item['uid']), - dbesc($hub['hubloc_callback']), - intval(1), - dbesc(datetime_convert()), - dbesc(datetime_convert()), - dbesc($n), - dbesc(json_encode($encoded_item)) - ); + if($packet_type === 'refresh' || $packet_type === 'purge') { + $n = zot_build_packet($channel,$packet_type); + q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )", + dbesc($hash), + intval($channel['channel_account_id']), + intval($channel['channel_id']), + dbesc('zot'), + dbesc($hub['hubloc_callback']), + intval(1), + dbesc(datetime_convert()), + dbesc(datetime_convert()), + dbesc($n), + dbesc('') + ); + } + else { + $n = zot_build_packet($channel,'notify',$env_recips,(($private) ? $hub['hubloc_sitekey'] : null),$hash); + q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )", + dbesc($hash), + intval($target_item['aid']), + intval($target_item['uid']), + dbesc('zot'), + dbesc($hub['hubloc_callback']), + intval(1), + dbesc(datetime_convert()), + dbesc(datetime_convert()), + dbesc($n), + dbesc(json_encode($encoded_item)) + ); + } $deliver[] = $hash; if(count($deliver) >= $deliveries_per_process) { @@ -349,12 +530,16 @@ function notifier_run($argv, $argc){ if(count($deliver)) { proc_run('php','include/deliver.php',$deliver); } + + logger('notifier: basic loop complete.', LOGGER_DEBUG); if($normal_mode) call_hooks('notifier_normal',$target_item); + call_hooks('notifier_end',$target_item); + logger('notifer: complete.'); return; } |