From 4cacfe59bd2f2d3b41ec9173b9310039971a76c8 Mon Sep 17 00:00:00 2001 From: redmatrix Date: Wed, 16 Dec 2015 22:42:33 -0800 Subject: queue/notification/delivery refactor continued --- include/deliver.php | 91 +++------------------------------- include/notifier.php | 135 ++++++++++++--------------------------------------- include/queue.php | 51 +------------------ include/queue_fn.php | 81 +++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 240 deletions(-) (limited to 'include') diff --git a/include/deliver.php b/include/deliver.php index cef8f7912..60d935d02 100644 --- a/include/deliver.php +++ b/include/deliver.php @@ -16,7 +16,6 @@ function deliver_run($argv, $argc) { logger('deliver: invoked: ' . print_r($argv,true), LOGGER_DATA); - for($x = 1; $x < $argc; $x ++) { $dresult = null; @@ -25,76 +24,6 @@ function deliver_run($argv, $argc) { ); if($r) { - /** - * Check to see if we have any recent communications with this hub (in the last month). - * If not, reduce the outq_priority. - */ - - $base = ''; - - $h = parse_url($r[0]['outq_posturl']); - if($h) { - $base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : ''); - if($base !== z_root()) { - $y = q("select site_update, site_dead from site where site_url = '%s' ", - dbesc($base) - ); - if($y) { - if(intval($y[0]['site_dead'])) { - remove_queue_by_posturl($r[0]['outq_posturl']); - logger('dead site ignored ' . $base); - continue; - } - if($y[0]['site_update'] < datetime_convert('UTC','UTC','now - 1 month')) { - update_queue_item($r[0]['outq_hash'],10); - logger('immediate delivery deferred for site ' . $base); - continue; - } - } - else { - - // zot sites should all have a site record, unless they've been dead for as long as - // your site has existed. Since we don't know for sure what these sites are, - // call them unknown - - q("insert into site (site_url, site_update, site_dead, site_type) values ('%s','%s',0,%d) ", - dbesc($base), - dbesc(datetime_convert()), - intval(($r[0]['outq_driver'] === 'post') ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN) - ); - } - } - } - - // "post" queue driver - used for diaspora and friendica-over-diaspora communications. - - if($r[0]['outq_driver'] === 'post') { - - - $result = z_post_url($r[0]['outq_posturl'],$r[0]['outq_msg']); - if($result['success'] && $result['return_code'] < 300) { - logger('deliver: queue post success to ' . $r[0]['outq_posturl'], LOGGER_DEBUG); - if($base) { - q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", - dbesc(datetime_convert()), - dbesc($base) - ); - } - q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1", - dbesc('accepted for delivery'), - dbesc(datetime_convert()), - dbesc($argv[$x]) - ); - - remove_queue_item($argv[$x]); - } - else { - logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $r[0]['outq_posturl'],LOGGER_DEBUG); - update_queue_item($argv[$x]); - } - continue; - } - $notify = json_decode($r[0]['outq_notify'],true); // Messages without an outq_msg will need to go via the web, even if it's a @@ -118,7 +47,7 @@ function deliver_run($argv, $argc) { $dresult = zot_import($msg,z_root()); } - remove_queue_item($argv[$x]); + remove_queue_item($r[0]['outq_hash']); if($dresult && is_array($dresult)) { foreach($dresult as $xx) { @@ -142,19 +71,11 @@ function deliver_run($argv, $argc) { ); } } - else { - logger('deliver: dest: ' . $r[0]['outq_posturl'], LOGGER_DEBUG); - $result = zot_zot($r[0]['outq_posturl'],$r[0]['outq_notify']); - if($result['success']) { - logger('deliver: remote zot delivery succeeded to ' . $r[0]['outq_posturl']); - zot_process_response($r[0]['outq_posturl'],$result, $r[0]); - } - else { - logger('deliver: remote zot delivery failed to ' . $r[0]['outq_posturl']); - logger('deliver: remote zot delivery fail data: ' . print_r($result,true), LOGGER_DATA); - update_queue_item($argv[$x],10); - } - } + + // otherwise it's a remote delivery - call queue_deliver(); + + queue_deliver($r[0],true); + } } } diff --git a/include/notifier.php b/include/notifier.php index 659a103a4..67e2472ef 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -44,7 +44,6 @@ require_once('include/html2plain.php'); * expire (in items.php) * like (in like.php, poke.php) * mail (in message.php) - * suggest (in fsuggest.php) * tag (in photos.php, poke.php, tagger.php) * tgroup (in items.php) * wall-new (in photos.php, item.php) @@ -52,6 +51,7 @@ require_once('include/html2plain.php'); * and ITEM_ID is the id of the item in the database that needs to be sent to others. * * ZOT + * permission_create abook_id * permission_update abook_id * refresh_all channel_id * purge_all channel_id @@ -110,72 +110,8 @@ function notifier_run($argv, $argc){ } - if($cmd == 'permission_update' || $cmd == 'permission_create') { - // Get the recipient - $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", - intval($item_id) - ); - - if($r) { - // Get the sender - $s = channelx_by_n($r[0]['abook_channel']); - if($s) { - $perm_update = array('sender' => $s, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); - - if($cmd == 'permission_create') - call_hooks('permissions_create',$perm_update); - else - call_hooks('permissions_update',$perm_update); - - if($perm_update['success'] && $perm_update['deliveries']) - $deliveries[] = $perm_update['deliveries']; - - if(! $perm_update['success']) { - // send a refresh message to each hub they have registered here - $h = q("select * from hubloc where hubloc_hash = '%s' - and hubloc_error = 0 and hubloc_deleted = 0", - dbesc($r[0]['hubloc_hash']) - ); - - if($h) { - foreach($h as $hh) { - if(in_array($hh['hubloc_url'],$dead_hubs)) { - logger('skipping dead hub: ' . $hh['hubloc_url'], LOGGER_DEBUG); - continue; - } - - $data = zot_build_packet($s,'refresh',array(array( - 'guid' => $hh['hubloc_guid'], - 'guid_sig' => $hh['hubloc_guid_sig'], - 'url' => $hh['hubloc_url']) - )); - if($data) { - $hash = random_string(); - queue_insert(array( - 'hash' => $hash, - 'account_id' => $s['channel_account_id'], - 'channel_id' => $s['channel_id'], - 'posturl' => $hh['hubloc_callback'], - 'notify' => $data, - )); - $deliveries[] = $hash; - } - } - } - } - - if($deliveries) - do_delivery($deliveries); - } - } - return; - } - - - $expire = false; $request = false; $mail = false; - $fsuggest = false; $top_level = false; $location = false; $recipients = array(); @@ -224,51 +160,42 @@ function notifier_run($argv, $argc){ $packet_type = 'request'; $normal_mode = false; } - elseif($cmd === 'expire') { - - // FIXME - // This will require a special zot packet containing a list of item message_id's to be expired. - // This packet will be public, since we cannot selectively deliver here. - // We need the handling on this end to create the array, and the handling on the remote end - // to verify permissions (for each item) and process it. Until this is complete, the expire feature will be disabled. - - return; - - $normal_mode = false; - $expire = true; - $items = q("SELECT * FROM item WHERE uid = %d AND item_wall = 1 - AND item_deleted = 1 AND `changed` > %s - INTERVAL %s", - intval($item_id), - db_utcnow(), db_quoteinterval('10 MINUTE') + elseif($cmd == 'permission_update' || $cmd == 'permission_create') { + // Get the (single) recipient + $r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0", + intval($item_id) ); - $uid = $item_id; - $item_id = 0; - if(! $items) - return; + if($r) { + $uid = $r[0]['abook_channel']; + // Get the sender + $channel = channelx_by_n($uid); + if($channel) { + $perm_update = array('sender' => $channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); - } - elseif($cmd === 'suggest') { - $normal_mode = false; - $fsuggest = true; + if($cmd == 'permission_create') + call_hooks('permissions_create',$perm_update); + else + call_hooks('permissions_update',$perm_update); - $suggest = q("SELECT * FROM `fsuggest` WHERE `id` = %d LIMIT 1", - intval($item_id) - ); - if(! count($suggest)) - return; - $uid = $suggest[0]['uid']; - $recipients[] = $suggest[0]['cid']; - $item = $suggest[0]; + if($perm_update['success']) { + if($perm_update['deliveries']) { + $deliveries[] = $perm_update['deliveries']; + do_delivery($deliveries); + } + return; + } + else { + $recipients[] = $r[0]['abook_xchan']; + $private = false; + $packet_type = 'refresh'; + } + } + } } elseif($cmd === 'refresh_all') { logger('notifier: refresh_all: ' . $item_id); - $s = q("select * from channel where channel_id = %d limit 1", - intval($item_id) - ); - if($s) - $channel = $s[0]; $uid = $item_id; - $recipients = array(); + $channel = channelx_by_n($item_id); $r = q("select abook_xchan from abook where abook_channel = %d", intval($item_id) ); @@ -592,10 +519,8 @@ function notifier_run($argv, $argc){ 'relay_to_owner' => $relay_to_owner, 'uplink' => $uplink, 'cmd' => $cmd, - 'expire' => $expire, 'mail' => $mail, 'location' => $location, - 'fsuggest' => $fsuggest, 'request' => $request, 'normal_mode' => $normal_mode, 'packet_type' => $packet_type, diff --git a/include/queue.php b/include/queue.php index 5c3376661..8a3b2aa58 100644 --- a/include/queue.php +++ b/include/queue.php @@ -18,11 +18,8 @@ function queue_run($argv, $argc){ else $queue_id = 0; - $deadguys = array(); - logger('queue: start'); - // delete all queue items more than 3 days old // but first mark these sites dead if we haven't heard from them in a month @@ -88,53 +85,7 @@ function queue_run($argv, $argc){ return; foreach($r as $rr) { - - $dresult = null; - - if(in_array($rr['outq_posturl'],$deadguys)) - continue; - - $base = ''; - $h = parse_url($rr['outq_posturl']); - if($h) - $base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : ''); - - if($rr['outq_driver'] === 'post') { - $result = z_post_url($rr['outq_posturl'],$rr['outq_msg']); - if($result['success'] && $result['return_code'] < 300) { - logger('queue: queue post success to ' . $rr['outq_posturl'], LOGGER_DEBUG); - if($base) { - q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", - dbesc(datetime_convert()), - dbesc($base) - ); - } - q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1", - dbesc('accepted for delivery'), - dbesc(datetime_convert()), - dbesc($rr['outq_hash']) - ); - remove_queue_item($rr['outq_hash']); - } - else { - logger('queue: queue post returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG); - update_queue_item($rr['outq_hash'],10); - $deadguys[] = $rr['outq_posturl']; - } - continue; - } - - - $result = zot_zot($rr['outq_posturl'],$rr['outq_notify']); - if($result['success']) { - logger('queue: deliver zot success to ' . $rr['outq_posturl'], LOGGER_DEBUG); - zot_process_response($rr['outq_posturl'],$result, $rr); - } - else { - $deadguys[] = $rr['outq_posturl']; - logger('queue: deliver zot returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG); - update_queue_item($rr['outq_hash'],10); - } + queue_deliver($rr); } } diff --git a/include/queue_fn.php b/include/queue_fn.php index 8449c1d72..1e53d7488 100644 --- a/include/queue_fn.php +++ b/include/queue_fn.php @@ -62,3 +62,84 @@ function queue_insert($arr) { } + + +function queue_deliver($outq, $immediate = false) { + + $base = null; + $h = parse_url($outq['outq_posturl']); + if($h) + $base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : ''); + + if(($base) && ($base !== z_root()) && ($immediate)) { + $y = q("select site_update, site_dead from site where site_url = '%s' ", + dbesc($base) + ); + if($y) { + if(intval($y[0]['site_dead'])) { + remove_queue_by_posturl($outq['outq_posturl']); + logger('dead site ignored ' . $base); + return; + } + if($y[0]['site_update'] < datetime_convert('UTC','UTC','now - 1 month')) { + update_queue_item($outq['outq_hash'],10); + logger('immediate delivery deferred for site ' . $base); + return; + } + } + else { + // zot sites should all have a site record, unless they've been dead for as long as + // your site has existed. Since we don't know for sure what these sites are, + // call them unknown + + q("insert into site (site_url, site_update, site_dead, site_type) values ('%s','%s',0,%d) ", + dbesc($base), + dbesc(datetime_convert()), + intval(($outq['outq_driver'] === 'post') ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN) + ); + } + } + + // "post" queue driver - used for diaspora and friendica-over-diaspora communications. + + if($outq['outq_driver'] === 'post') { + $result = z_post_url($outq['outq_posturl'],$outq['outq_msg']); + if($result['success'] && $result['return_code'] < 300) { + logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG); + if($base) { + q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ", + dbesc(datetime_convert()), + dbesc($base) + ); + } + q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1", + dbesc('accepted for delivery'), + dbesc(datetime_convert()), + dbesc($outq['outq_hash']) + ); + remove_queue_item($outq['outq_hash']); + } + else { + logger('deliver: queue post returned ' . $result['return_code'] + . ' from ' . $outq['outq_posturl'],LOGGER_DEBUG); + update_queue_item($argv[$x]); + } + return; + } + + // normal zot delivery + + logger('deliver: dest: ' . $outq['outq_posturl'], LOGGER_DEBUG); + $result = zot_zot($outq['outq_posturl'],$outq['outq_notify']); + if($result['success']) { + logger('deliver: remote zot delivery succeeded to ' . $outq['outq_posturl']); + zot_process_response($outq['outq_posturl'],$result, $outq); + } + else { + logger('deliver: remote zot delivery failed to ' . $outq['outq_posturl']); + logger('deliver: remote zot delivery fail data: ' . print_r($result,true), LOGGER_DATA); + update_queue_item($outq['outq_hash'],10); + } + return; +} + -- cgit v1.2.3