From 93f061f78a8b914a731838058a38354a62d002de Mon Sep 17 00:00:00 2001 From: redmatrix Date: Thu, 15 Oct 2015 18:52:04 -0700 Subject: mail sync/migrate continued; also abstract delivery loop to make it re-usable, change refresh_all to use delivery loop. --- include/identity.php | 2 +- include/import.php | 9 ---- include/items.php | 4 ++ include/message.php | 43 ++++++++----------- include/network.php | 27 ++++++++++++ include/notifier.php | 102 +++++++++++++++++++------------------------- install/schema_mysql.sql | 2 + install/schema_postgres.sql | 2 + install/update.php | 19 ++++++++- 9 files changed, 115 insertions(+), 95 deletions(-) diff --git a/include/identity.php b/include/identity.php index ad5bfbe6d..672ed6763 100644 --- a/include/identity.php +++ b/include/identity.php @@ -643,7 +643,7 @@ function identity_basic_export($channel_id, $items = false) { } - $r = q("select mail.*, conv.guid as conv_guid from mail left join conv on mail.convid = conv.id where mail.uid = %d", + $r = q("select * from mail where mail.uid = %d", intval($channel_id) ); if($r) { diff --git a/include/import.php b/include/import.php index 1734bd263..c90b05ec0 100644 --- a/include/import.php +++ b/include/import.php @@ -849,15 +849,6 @@ function import_mail($channel,$mails) { if(! $m) continue; - if($mail['conv_guid']) { - $x = q("select id from conv where guid = '%s' and uid = %d limit 1", - dbesc($mail['conv_guid']), - intval($channel['channel_id']) - ); - if($x) { - $m['convid'] = $x[0]['id']; - } - } $m['aid'] = $channel['channel_account_id']; $m['uid'] = $channel['channel_id']; mail_store($m); diff --git a/include/items.php b/include/items.php index ae5b0ca94..9159f6da3 100755 --- a/include/items.php +++ b/include/items.php @@ -1618,6 +1618,8 @@ function get_mail_elements($x) { $arr['body'] = (($x['body']) ? htmlspecialchars($x['body'], ENT_COMPAT,'UTF-8',false) : ''); $arr['title'] = (($x['title'])? htmlspecialchars($x['title'],ENT_COMPAT,'UTF-8',false) : ''); + $arr['conv_guid'] = (($x['conv_guid'])? htmlspecialchars($x['conv_guid'],ENT_COMPAT,'UTF-8',false) : ''); + $arr['created'] = datetime_convert('UTC','UTC',$x['created']); if((! array_key_exists('expires',$x)) || ($x['expires'] === NULL_DATE)) $arr['expires'] = NULL_DATE; @@ -1656,6 +1658,7 @@ function get_mail_elements($x) { if($arr['created'] > datetime_convert()) $arr['created'] = datetime_convert(); + $arr['mid'] = (($x['message_id']) ? htmlspecialchars($x['message_id'], ENT_COMPAT,'UTF-8',false) : ''); $arr['parent_mid'] = (($x['message_parent']) ? htmlspecialchars($x['message_parent'], ENT_COMPAT,'UTF-8',false) : ''); @@ -3536,6 +3539,7 @@ function mail_store($arr) { $arr['title'] = ((x($arr,'title')) ? trim($arr['title']) : ''); $arr['parent_mid'] = ((x($arr,'parent_mid')) ? notags(trim($arr['parent_mid'])) : ''); $arr['body'] = ((x($arr,'body')) ? trim($arr['body']) : ''); + $arr['conv_guid'] = ((x($arr,'conv_guid')) ? trim($arr['conv_guid']) : ''); $arr['mail_flags'] = ((x($arr,'mail_flags')) ? intval($arr['mail_flags']) : 0 ); diff --git a/include/message.php b/include/message.php index 87975ef6a..b5e805d4a 100644 --- a/include/message.php +++ b/include/message.php @@ -28,8 +28,6 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' // $expires = datetime_convert(date_default_timezone_get(),'UTC',$expires); - - if($uid) { $r = q("select * from channel where channel_id = %d limit 1", intval($uid) @@ -52,17 +50,17 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' $conv_guid = ''; if(strlen($replyto)) { - $r = q("select convid from mail where channel_id = %d and ( mid = '%s' or parent_mid = '%s' ) limit 1", + $r = q("select conv_guid from mail where channel_id = %d and ( mid = '%s' or parent_mid = '%s' ) limit 1", intval(local_channel()), dbesc($replyto), dbesc($replyto) ); if($r) { - $convid = $r[0]['convid']; + $conv_guid = $r[0]['conv_guid']; } } - if(! $convid) { + if(! $conv_guid) { // create a new conversation @@ -91,33 +89,30 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' dbesc($handles) ); - $r = q("select * from conv where guid = '%s' and uid = %d limit 1", dbesc($conv_guid), intval(local_channel()) ); if($r) { - $convid = $r[0]['id']; $retconv = $r[0]; } } - if(! $convid) { - $ret['message'] = 'conversation not found'; - return $ret; - } - - if(! $conv_guid) { - $r = q("select * from conv where id = %d and uid = %d limit 1", - intval($convid), + if(! $retconv) { + $r = q("select * from conv where guid = '%s' and uid = %d limit 1", + dbesc($conv_guid), intval(local_channel()) ); if($r) { - $conv_guid = $r[0]['guid']; $retconv = $r[0]; } } + if(! $retconv) { + $ret['message'] = 'conversation not found'; + return $ret; + } + // generate a unique message_id do { @@ -189,10 +184,10 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' - $r = q("INSERT INTO mail ( account_id, convid, mail_obscured, channel_id, from_xchan, to_xchan, title, body, attach, mid, parent_mid, created, expires ) - VALUES ( %d, %d, %d, %d, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' )", + $r = q("INSERT INTO mail ( account_id, conv_guid, mail_obscured, channel_id, from_xchan, to_xchan, title, body, attach, mid, parent_mid, created, expires ) + VALUES ( %d, '%s', %d, %d, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' )", intval($channel['channel_account_id']), - intval($convid), + dbesc($conv_guid), intval(1), intval($channel['channel_id']), dbesc($channel['channel_hash']), @@ -215,7 +210,6 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' if($r) { $post_id = $r[0]['id']; $retmail = $r; - $retmail['conv_guid'] = $conv_guid; } else { $ret['message'] = t('Stored post could not be verified.'); @@ -260,10 +254,9 @@ function send_message($uid = 0, $recipient='', $body='', $subject='', $replyto=' $ret['success'] = true; $ret['message_item'] = intval($post_id); - if($retconv) - $ret['conv'] = $retconv; - if($retmail) - $ret['mail'] = $retmail; + $ret['conv'] = $retconv; + $ret['mail'] = $retmail; + return $ret; } @@ -391,7 +384,7 @@ function private_messages_drop($channel_id, $messageitem_id, $drop_conversation if($drop_conversation) { // find the parent_id - $p = q("SELECT parent_mid, convid FROM mail WHERE id = %d AND channel_id = %d LIMIT 1", + $p = q("SELECT parent_mid, conv_guid FROM mail WHERE id = %d AND channel_id = %d LIMIT 1", intval($messageitem_id), intval($channel_id) ); diff --git a/include/network.php b/include/network.php index e1793b405..c67c019ef 100644 --- a/include/network.php +++ b/include/network.php @@ -1677,13 +1677,40 @@ function format_and_send_email($sender,$xchan,$item) { 'additionalMailHeader' => '', )); +} + +function do_delivery($deliveries) { + if(! (is_array($deliveries) && count($deliveries))) + return; + $interval = ((get_config('system','delivery_interval') !== false) + ? intval(get_config('system','delivery_interval')) : 2 ); + $deliveries_per_process = intval(get_config('system','delivery_batch_count')); + if($deliveries_per_process <= 0) + $deliveries_per_process = 1; + $deliver = array(); + foreach($deliveries as $d) { + $deliver[] = $d; + + if(count($deliver) >= $deliveries_per_process) { + proc_run('php','include/deliver.php',$deliver); + $deliver = array(); + if($interval) + @time_sleep_until(microtime(true) + (float) $interval); + } + } + + // catch any stragglers + + if($deliver) + proc_run('php','include/deliver.php',$deliver); + } diff --git a/include/notifier.php b/include/notifier.php index 80f243452..2676f20d9 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -96,6 +96,18 @@ function notifier_run($argv, $argc){ require_once('include/identity.php'); $sys = get_sys_channel(); + $deliveries = array(); + + $dead_hubs = array(); + + $dh = q("select site_url from site where site_dead = 1"); + if(dh) { + foreach($dh as $dead) { + $dead_hubs[] = $dead['site_url']; + } + } + + if($cmd == 'permission_update') { // Get the recipient $r = q("select abook.*, hubloc.* from abook @@ -113,8 +125,11 @@ function notifier_run($argv, $argc){ intval($r[0]['abook_channel']) ); if($s) { - $perm_update = array('sender' => $s[0], 'recipient' => $r[0], 'success' => false); + $perm_update = array('sender' => $s[0], 'recipient' => $r[0], 'success' => false, 'deliveries' => ''); call_hooks('permissions_update',$perm_update); + if($perm_update['success'] && $perm_update['deliveries']) + $deliveries[] = $perm_update['deliveries']; + if(! $perm_update['success']) { // send a refresh message to each hub they have registered here $h = q("select * from hubloc where hubloc_hash = '%s' @@ -125,36 +140,40 @@ function notifier_run($argv, $argc){ ); if($h) { foreach($h as $hh) { + if(in_array($hh['hubloc_url'],$dead_hubs)) { + logger('skipping dead hub: ' . $hh['hubloc_url'], LOGGER_DEBUG); + continue; + } + $data = zot_build_packet($s[0],'refresh',array(array( 'guid' => $hh['hubloc_guid'], 'guid_sig' => $hh['hubloc_guid_sig'], 'url' => $hh['hubloc_url']) )); if($data) { - $result = zot_zot($hh['hubloc_callback'],$data); - - // if immediate delivery failed, stick it in the queue to try again later. - - if(! $result['success']) { - $hash = random_string(); - q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) - values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )", - dbesc($hash), - intval($s[0]['channel_account_id']), - intval($s[0]['channel_id']), - dbesc('zot'), - dbesc($hh['hubloc_callback']), - intval(1), - dbesc(datetime_convert()), - dbesc(datetime_convert()), - dbesc($data), - dbesc('') - ); - } + $hash = random_string(); + q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_created, outq_updated, outq_notify, outq_msg ) + values ( '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', '%s' )", + dbesc($hash), + intval($s[0]['channel_account_id']), + intval($s[0]['channel_id']), + dbesc('zot'), + dbesc($hh['hubloc_callback']), + intval(1), + dbesc(datetime_convert()), + dbesc(datetime_convert()), + dbesc($data), + dbesc('') + ); + $deliveries[] = $hash; } - } + } + } } + + if($deliveries) + do_delivery($deliveries); } } return; @@ -524,14 +543,6 @@ function notifier_run($argv, $argc){ $hubs = $r; - $dead_hubs = array(); - - $dh = q("select site_url from site where site_dead = 1"); - if(dh) { - foreach($dh as $dead) { - $dead_hubs[] = $dead['site_url']; - } - } /** @@ -571,15 +582,6 @@ function notifier_run($argv, $argc){ logger('notifier: will notify/deliver to these hubs: ' . print_r($hublist,true), LOGGER_DEBUG); - $interval = ((get_config('system','delivery_interval') !== false) - ? intval(get_config('system','delivery_interval')) : 2 ); - - $deliveries_per_process = intval(get_config('system','delivery_batch_count')); - - if($deliveries_per_process <= 0) - $deliveries_per_process = 1; - - $deliveries = array(); foreach($dhubs as $hub) { @@ -690,26 +692,8 @@ function notifier_run($argv, $argc){ proc_run('php','include/deliver_hooks.php', $target_item['id']); } - if($deliveries) { - $deliver = array(); - - foreach($deliveries as $d) { - - $deliver[] = $d; - - if(count($deliver) >= $deliveries_per_process) { - proc_run('php','include/deliver.php',$deliver); - $deliver = array(); - if($interval) - @time_sleep_until(microtime(true) + (float) $interval); - } - } - } - - // catch any stragglers - - if($deliver) - proc_run('php','include/deliver.php',$deliver); + if($deliveries) + do_delivery($deliveries); logger('notifier: basic loop complete.', LOGGER_DEBUG); diff --git a/install/schema_mysql.sql b/install/schema_mysql.sql index 0e626752d..4aaa70825 100644 --- a/install/schema_mysql.sql +++ b/install/schema_mysql.sql @@ -731,6 +731,7 @@ CREATE TABLE IF NOT EXISTS `likes` ( CREATE TABLE IF NOT EXISTS `mail` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `convid` int(10) unsigned NOT NULL DEFAULT '0', + `conv_guid` char(255) NOT NULL DEFAULT '', `mail_flags` int(10) unsigned NOT NULL DEFAULT '0', `from_xchan` char(255) NOT NULL DEFAULT '', `to_xchan` char(255) NOT NULL DEFAULT '', @@ -761,6 +762,7 @@ CREATE TABLE IF NOT EXISTS `mail` ( KEY `parent_mid` (`parent_mid`), KEY `expires` (`expires`), KEY `convid` (`convid`), + KEY `conv_guid` (`conv_guid`), KEY `mail_deleted` (`mail_deleted`), KEY `mail_replied` (`mail_replied`), KEY `mail_isreply` (`mail_isreply`), diff --git a/install/schema_postgres.sql b/install/schema_postgres.sql index f378a3e3d..f42f6b297 100644 --- a/install/schema_postgres.sql +++ b/install/schema_postgres.sql @@ -728,6 +728,7 @@ create index "likes_target_id" on likes ("target_id"); CREATE TABLE "mail" ( "id" serial NOT NULL, "convid" bigint NOT NULL DEFAULT '0', + "conv_guid" text NOT NULL, "mail_flags" bigint NOT NULL DEFAULT '0', "from_xchan" text NOT NULL DEFAULT '', "to_xchan" text NOT NULL DEFAULT '', @@ -750,6 +751,7 @@ CREATE TABLE "mail" ( PRIMARY KEY ("id") ); create index "mail_convid" on mail ("convid"); +create index "mail_conv_guid" on mail ("conv_guid"); create index "mail_created" on mail ("created"); create index "mail_flags" on mail ("mail_flags"); create index "mail_account_id" on mail ("account_id"); diff --git a/install/update.php b/install/update.php index e120f91d2..dc9377892 100644 --- a/install/update.php +++ b/install/update.php @@ -1887,6 +1887,23 @@ function update_r1155() { function update_r1156() { - return UPDATE_SUCCESS; + $r1 = q("ALTER TABLE mail ADD conv_guid CHAR( 255 ) NOT NULL DEFAULT '' "); + $r2 = q("create index conv_guid on mail ( conv_guid ) "); + + $r3 = q("select mail.id, mail.convid, conv.guid from mail left join conv on mail.convid = conv.id where true"); + if($r3) { + foreach($r3 as $rr) { + if($rr['convid']) { + q("update mail set conv_guid = '%s' where id = %d", + dbesc($rr['guid']), + intval($rr['id']) + ); + } + } + } + + if($r1 && $r2) + return UPDATE_SUCCESS; + return UPDATE_FAILED; } -- cgit v1.2.3