aboutsummaryrefslogtreecommitdiffstats
path: root/include/notifier.php
diff options
context:
space:
mode:
Diffstat (limited to 'include/notifier.php')
-rw-r--r--include/notifier.php227
1 files changed, 196 insertions, 31 deletions
diff --git a/include/notifier.php b/include/notifier.php
index b22f77d91..59f472539 100644
--- a/include/notifier.php
+++ b/include/notifier.php
@@ -1,4 +1,4 @@
-<?php
+<?php /** @file */
require_once("boot.php");
require_once('include/queue_fn.php');
@@ -53,6 +53,9 @@ require_once('include/html2plain.php');
*
* ZOT
* permission_update abook_id
+ * refresh_all channel_id
+ * purge_all channel_id
+ * expire channel_id
* relay item_id (item was relayed to owner, we will deliver it as owner)
*
*/
@@ -87,6 +90,9 @@ function notifier_run($argv, $argc){
if(! $item_id)
return;
+ require_once('include/identity.php');
+ $sys = get_sys_channel();
+
if($cmd == 'permission_update') {
// Get the recipient
$r = q("select abook.*, hubloc.* from abook
@@ -136,6 +142,7 @@ function notifier_run($argv, $argc){
$recipients = array();
$url_recipients = array();
$normal_mode = true;
+ $packet_type = 'undefined';
if($cmd === 'mail') {
$normal_mode = false;
@@ -164,14 +171,24 @@ function notifier_run($argv, $argc){
elseif($cmd === 'expire') {
$normal_mode = false;
$expire = true;
- $items = q("SELECT * FROM `item` WHERE `uid` = %d AND `wall` = 1
- AND `deleted` = 1 AND `changed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE",
- intval($item_id)
+ $items = q("SELECT * FROM item WHERE uid = %d AND ( item_flags & %d )
+ AND ( item_restrict & %d ) AND `changed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE",
+ intval($item_id),
+ intval(ITEM_WALL),
+ intval(ITEM_DELETED)
);
$uid = $item_id;
$item_id = 0;
- if(! count($items))
+ if(! $items)
return;
+
+// FIXME
+// This will require a special zot packet containing a list of item message_id's to be expired.
+// This packet will be public, since we cannot selectively deliver here.
+// We need the handling on this end to create the array, and the handling on the remote end
+// to verify permissions (for each item) and process it. Until this is complete, the expire feature will be disabled.
+
+ return;
}
elseif($cmd === 'suggest') {
$normal_mode = false;
@@ -186,6 +203,46 @@ function notifier_run($argv, $argc){
$recipients[] = $suggest[0]['cid'];
$item = $suggest[0];
}
+ elseif($cmd === 'refresh_all') {
+ logger('notifier: refresh_all: ' . $item_id);
+ $s = q("select * from channel where channel_id = %d limit 1",
+ intval($item_id)
+ );
+ if($s)
+ $channel = $s[0];
+ $uid = $item_id;
+ $recipients = array();
+ $r = q("select abook_xchan from abook where abook_channel = %d",
+ intval($item_id)
+ );
+ if($r) {
+ foreach($r as $rr) {
+ $recipients[] = $rr['abook_xchan'];
+ }
+ }
+ $private = false;
+ $packet_type = 'refresh';
+ }
+ elseif($cmd === 'purge_all') {
+ logger('notifier: purge_all: ' . $item_id);
+ $s = q("select * from channel where channel_id = %d limit 1",
+ intval($item_id)
+ );
+ if($s)
+ $channel = $s[0];
+ $uid = $item_id;
+ $recipients = array();
+ $r = q("select abook_xchan from abook where abook_channel = %d",
+ intval($item_id)
+ );
+ if($r) {
+ foreach($r as $rr) {
+ $recipients[] = $rr['abook_xchan'];
+ }
+ }
+ $private = false;
+ $packet_type = 'purge';
+ }
else {
// Normal items
@@ -208,6 +265,11 @@ function notifier_run($argv, $argc){
if($target_item['item_restrict'] & ITEM_DELETED)
logger('notifier: target item ITEM_DELETED', LOGGER_DEBUG);
+ $unforwardable = ITEM_UNPUBLISHED|ITEM_DELAYED_PUBLISH|ITEM_WEBPAGE|ITEM_BUILDBLOCK|ITEM_PDL;
+ if($target_item['item_restrict'] & $unforwardable) {
+ logger('notifier: target item not forwardable: flags ' . $target_item['item_restrict'], LOGGER_DEBUG);
+ return;
+ }
$s = q("select * from channel where channel_id = %d limit 1",
intval($target_item['uid'])
@@ -215,6 +277,11 @@ function notifier_run($argv, $argc){
if($s)
$channel = $s[0];
+ if($channel['channel_hash'] !== $target_item['author_xchan'] && $channel['channel_hash'] !== $target_item['owner_xchan']) {
+ logger("notifier: Sending channel {$channel['channel_hash']} is not owner {$target_item['owner_xchan']} or author {$target_item['author_xchan']}");
+ return;
+ }
+
if($target_item['id'] == $target_item['parent']) {
$parent_item = $target_item;
@@ -235,6 +302,10 @@ function notifier_run($argv, $argc){
$top_level_post = false;
}
+ // avoid looping of discover items 12/4/2014
+
+ if($sys && $parent_item['uid'] == $sys['channel_id'])
+ return;
$encoded_item = encode_item($target_item);
@@ -252,12 +323,13 @@ function notifier_run($argv, $argc){
// tag_deliver'd post which needs to be sent back to the original author
if(($cmd === 'uplink') && ($parent_item['item_flags'] & ITEM_UPLINK) && (! $top_level_post)) {
- $uplink = true;
+ logger('notifier: uplink');
+ $uplink = true;
}
if(($relay_to_owner || $uplink) && ($cmd !== 'relay')) {
logger('notifier: followup relay', LOGGER_DEBUG);
- $recipients = array(($uplink) ? $parent_item['author_xchan'] : $parent_item['owner_xchan']);
+ $recipients = array(($uplink) ? $parent_item['source_xchan'] : $parent_item['owner_xchan']);
$private = true;
if(! $encoded_item['flags'])
$encoded_item['flags'] = array();
@@ -295,40 +367,103 @@ function notifier_run($argv, $argc){
// Generic delivery section, we have an encoded item and recipients
// Now start the delivery process
- logger('notifier: encoded item: ' . print_r($encoded_item,true));
+ $x = $encoded_item;
+ $x['title'] = 'private';
+ $x['body'] = 'private';
+ logger('notifier: encoded item: ' . print_r($x,true), LOGGER_DATA);
stringify_array_elms($recipients);
if(! $recipients)
return;
- logger('notifier: recipients: ' . print_r($recipients,true));
+// logger('notifier: recipients: ' . print_r($recipients,true));
- $env_recips = null;
- if($private) {
- $r = q("select xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . implode(',',$recipients) . ")");
- if($r) {
- $env_recips = array();
- foreach($r as $rr)
- $env_recips[] = array('guid' => $rr['xchan_guid'],'guid_sig' => $rr['xchan_guid_sig']);
+ $env_recips = (($private) ? array() : null);
+
+ $details = q("select xchan_hash, xchan_instance_url, xchan_addr, xchan_guid, xchan_guid_sig from xchan where xchan_hash in (" . implode(',',$recipients) . ")");
+
+ $recip_list = array();
+
+ if($details) {
+ foreach($details as $d) {
+
+ // If the recipient is federated from a traditional network they won't be able to
+ // handle nomadic identity. If we're publishing from a site that they aren't
+ // directly connected with, ignore them.
+
+ // FIXME: make sure we run through a notifier loop on the hub they're connected
+ // with if this post comes in from a different hub - so that we will deliver to them.
+
+ // On the down side, these channels will stop working if the hub they connected with
+ // goes down permanently, as they are (doh) not nomadic.
+
+ if(($d['xchan_instance_url']) && ($d['xchan_instance_url'] != z_root()))
+ continue;
+
+
+ $recip_list[] = $d['xchan_addr'] . ' (' . $d['xchan_hash'] . ')';
+ if($private)
+ $env_recips[] = array('guid' => $d['xchan_guid'],'guid_sig' => $d['xchan_guid_sig']);
}
}
+ if(($private) && (! $env_recips)) {
+ // shouldn't happen
+ logger('notifier: private message with no envelope recipients.' . print_r($argv,true));
+ }
+
+ logger('notifier: recipients (may be delivered to more if public): ' . print_r($recip_list,true), LOGGER_DEBUG);
+
+
// Now we have collected recipients (except for external mentions, FIXME)
// Let's reduce this to a set of hubs.
// for public posts always include our own hub
- $sql_extra = (($private) ? "" : " or hubloc_url = '" . z_root() . "' ");
+ $sql_extra = (($private) ? "" : " or hubloc_url = '" . dbesc(z_root()) . "' ");
+
+
+ if($relay_to_owner && (! $private) && ($cmd !== 'relay')) {
+
+ // If sending a followup to the post owner, only send it to one channel clone - to avoid race conditions.
+ // In this case we'll pick the most recently contacted hub, as their primary might be down and the most
+ // recently contacted has the best chance of being alive.
+
+ // For private posts or uplinks we have to do things differently as only the sending clone will have the recipient list.
+ // We have to send to all clone channels of the owner to find out who has the definitive list. Posts with
+ // item_private set (but no ACL list) will return empty recipients (except for the sender and owner) in
+ // collect_recipients() above. The end result is we should get only one delivery per delivery chain if we
+ // aren't the owner or author.
+
+
+ $r = q("select hubloc_sitekey, hubloc_flags, hubloc_callback, hubloc_host from hubloc
+ where hubloc_hash in (" . implode(',',$recipients) . ") group by hubloc_sitekey order by hubloc_connected desc limit 1");
+ }
+ else {
+ $r = q("select hubloc_sitekey, hubloc_flags, hubloc_callback, hubloc_host from hubloc
+ where hubloc_hash in (" . implode(',',$recipients) . ") $sql_extra group by hubloc_sitekey");
+ }
- $r = q("select distinct(hubloc_callback),hubloc_host,hubloc_sitekey from hubloc
- where hubloc_hash in (" . implode(',',$recipients) . ") $sql_extra group by hubloc_callback");
if(! $r) {
logger('notifier: no hubs');
return;
}
$hubs = $r;
+ $hublist = array();
+ $keys = array();
+
+ foreach($hubs as $hub) {
+ // don't try to deliver to deleted hublocs - and inexplicably SQL "distinct" and "group by"
+ // both return records with duplicate keys in rare circumstances
+ if((! ($hub['hubloc_flags'] & HUBLOC_FLAGS_DELETED)) && (! in_array($hub['hubloc_sitekey'],$keys))) {
+ $hublist[] = $hub['hubloc_host'];
+ $keys[] = $hub['hubloc_sitekey'];
+ }
+ }
+
+ logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG);
$interval = ((get_config('system','delivery_interval') !== false)
? intval(get_config('system','delivery_interval')) : 2 );
@@ -341,19 +476,45 @@ function notifier_run($argv, $argc){
$deliver = array();
foreach($hubs as $hub) {
+
+ if(defined('DIASPORA_RELIABILITY_EMULATION')) {
+ $cointoss = mt_rand(0,2);
+ if($cointoss == 2) {
+ continue;
+ }
+ }
+
$hash = random_string();
- $n = zot_build_packet($channel,'notify',$env_recips,(($private) ? $hub['hubloc_sitekey'] : null),$hash);
- q("insert into outq ( outq_hash, outq_account, outq_channel, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', %d, '%s', '%s', '%s', '%s' )",
- dbesc($hash),
- intval($target_item['aid']),
- intval($target_item['uid']),
- dbesc($hub['hubloc_callback']),
- intval(1),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($n),
- dbesc(json_encode($encoded_item))
- );
+ if($packet_type === 'refresh' || $packet_type === 'purge') {
+ $n = zot_build_packet($channel,$packet_type);
+ q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )",
+ dbesc($hash),
+ intval($channel['channel_account_id']),
+ intval($channel['channel_id']),
+ dbesc('zot'),
+ dbesc($hub['hubloc_callback']),
+ intval(1),
+ dbesc(datetime_convert()),
+ dbesc(datetime_convert()),
+ dbesc($n),
+ dbesc('')
+ );
+ }
+ else {
+ $n = zot_build_packet($channel,'notify',$env_recips,(($private) ? $hub['hubloc_sitekey'] : null),$hash);
+ q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )",
+ dbesc($hash),
+ intval($target_item['aid']),
+ intval($target_item['uid']),
+ dbesc('zot'),
+ dbesc($hub['hubloc_callback']),
+ intval(1),
+ dbesc(datetime_convert()),
+ dbesc(datetime_convert()),
+ dbesc($n),
+ dbesc(json_encode($encoded_item))
+ );
+ }
$deliver[] = $hash;
if(count($deliver) >= $deliveries_per_process) {
@@ -369,12 +530,16 @@ function notifier_run($argv, $argc){
if(count($deliver)) {
proc_run('php','include/deliver.php',$deliver);
}
+
+ logger('notifier: basic loop complete.', LOGGER_DEBUG);
if($normal_mode)
call_hooks('notifier_normal',$target_item);
+
call_hooks('notifier_end',$target_item);
+ logger('notifer: complete.');
return;
}