diff options
-rw-r--r-- | Zotlabs/Daemon/Notifier.php | 15 | ||||
-rw-r--r-- | Zotlabs/Lib/Activity.php | 5 | ||||
-rw-r--r-- | Zotlabs/Lib/Libzot.php | 121 | ||||
-rw-r--r-- | Zotlabs/Lib/QueueWorker.php | 2 | ||||
-rw-r--r-- | Zotlabs/Module/Item.php | 5 | ||||
-rw-r--r-- | include/items.php | 30 | ||||
-rw-r--r-- | include/network.php | 2 |
7 files changed, 149 insertions, 31 deletions
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php index 76e0628bf..77ec6920d 100644 --- a/Zotlabs/Daemon/Notifier.php +++ b/Zotlabs/Daemon/Notifier.php @@ -299,7 +299,7 @@ class Notifier { return; } - if ($target_item['verb'] === ACTIVITY_SHARE) { + if (in_array($target_item['verb'], [ACTIVITY_SHARE])) { // Provide correct representation across the wire. Internally this is treated as a comment. $target_item['parent_mid'] = $target_item['thr_parent'] = $target_item['mid']; } @@ -345,7 +345,7 @@ class Notifier { } logger('target_item: ' . print_r($target_item, true), LOGGER_DEBUG); - hz_syslog('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG); + logger('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG); // Send comments to the owner to re-deliver to everybody in the conversation // We only do this if the item in question originated on this site. This prevents looping. @@ -431,6 +431,9 @@ class Notifier { return; } + hz_syslog(print_r(self::$encoded_item['type'], true)); + hz_syslog(print_r(self::$recipients, true)); + // logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG); if (!count(self::$env_recips)) { @@ -468,7 +471,7 @@ class Notifier { 'queued' => [] ]; - call_hooks('notifier_process', $narr); + //call_hooks('notifier_process', $narr); if ($narr['queued']) { foreach ($narr['queued'] as $pq) self::$deliveries[] = $pq; @@ -589,7 +592,9 @@ class Notifier { foreach ($dhubs as $hub) { - logger('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG); + hz_syslog('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG); + hz_syslog(print_r($target_item['id'], true)); + hz_syslog(print_r($target_item['verb'], true)); if ($hub['hubloc_network'] !== 'zot6') { $narr = [ @@ -611,7 +616,7 @@ class Notifier { 'queued' => [] ]; - call_hooks('notifier_hub', $narr); + // call_hooks('notifier_hub', $narr); if ($narr['queued']) { foreach ($narr['queued'] as $pq) self::$deliveries[] = $pq; diff --git a/Zotlabs/Lib/Activity.php b/Zotlabs/Lib/Activity.php index 2b9beb3f5..dfbdacf18 100644 --- a/Zotlabs/Lib/Activity.php +++ b/Zotlabs/Lib/Activity.php @@ -974,7 +974,7 @@ class Activity { // inReplyTo needs to be set in the activity for followup actions (Like, Dislike, Announce, etc.), // but *not* for comments and RSVPs, where it should only be present in the object - if (!in_array($ret['type'], ['Create', 'Update', 'Accept', 'Reject', 'TentativeAccept', 'TentativeReject'])) { + if (!in_array($ret['type'], ['Create', 'Update', 'Add', 'Remove', 'Accept', 'Reject', 'TentativeAccept', 'TentativeReject'])) { $ret['inReplyTo'] = ((strpos($i['thr_parent'], 'http') === 0) ? $i['thr_parent'] : z_root() . '/item/' . urlencode($i['thr_parent'])); } @@ -1067,11 +1067,8 @@ class Activity { call_hooks('encode_activity', $hookinfo); -//hz_syslog(print_r($hookinfo['encoded'], true)); - return $hookinfo['encoded']; - } // Returns an array of URLS for any mention tags found in the item array $i. diff --git a/Zotlabs/Lib/Libzot.php b/Zotlabs/Lib/Libzot.php index 243588d2b..5c0b620eb 100644 --- a/Zotlabs/Lib/Libzot.php +++ b/Zotlabs/Lib/Libzot.php @@ -1134,6 +1134,7 @@ class Libzot { } $message_request = false; + $is_collection_operation = false; $has_data = array_key_exists('data', $env) && $env['data']; @@ -1141,16 +1142,37 @@ class Libzot { $AS = null; + if ($env['encoding'] === 'activitystreams') { $AS = new ActivityStreams($data); - if (!$AS->is_valid()) { - logger('Activity rejected: ' . print_r($data, true)); - return; + + // process add/remove from collection separately, as it requires a target. + // use the raw object, as it will not include actor expansion + if (in_array($AS->type, ['Add', 'Remove']) + && is_array($AS->obj) + && array_key_exists('object', $AS->obj) + && array_key_exists('actor', $AS->obj) + && !empty($AS->tgt)) { + + hz_syslog('relayed collection operation', LOGGER_DEBUG); + $is_collection_operation = true; + + $original_id = $AS->id; + $original_type = $AS->type; + + $raw_activity = $AS->data; + + $AS = new ActivityStreams($raw_activity['object'], portable_id: $env['sender']); + + // Store the original activity id and type for later usage + $AS->meta['original_id'] = $original_id; + $AS->meta['original_type'] = $original_type; } if (is_array($AS->obj)) { $item = Activity::decode_note($AS); + if (!$item) { logger('Could not decode activity: ' . print_r($AS, true)); return; @@ -1159,6 +1181,12 @@ class Libzot { else { $item = []; } + + if (!$AS->is_valid()) { + logger('Activity rejected: ' . print_r($data, true)); + return; + } + logger($AS->debug(), LOGGER_DATA); } @@ -1302,7 +1330,12 @@ class Libzot { $relay = (($env['type'] === 'response') ? true : false); - $result = self::process_delivery($env['sender'], $AS, $item, $deliveries, $relay, false, $message_request); + if($is_collection_operation) + hz_syslog('col'); + else + hz_syslog('not col'); + + $result = self::process_delivery($env['sender'], $AS, $item, $deliveries, $relay, false, $message_request, false, $is_collection_operation); Activity::init_background_fetch($env['sender']); } @@ -1518,7 +1551,7 @@ class Libzot { * @return array */ - static function process_delivery($sender, $act, $arr, $deliveries, $relay, $public = false, $request = false, $force = false) { + static function process_delivery($sender, $act, $arr, $deliveries, $relay, $public = false, $request = false, $force = false, $is_collection_operation = false) { $result = []; // We've validated the sender. Now make sure that the sender is the owner or author @@ -1546,6 +1579,14 @@ class Libzot { $DR->set_name($channel['channel_name'] . ' <' . channel_reddress($channel) . '>'); + $conversation_operation = $is_collection_operation && isset($arr['target']['attributedTo']); + + if (str_contains($arr['tgt_type'], 'Collection') && !$relay && !$conversation_operation) { + $DR->update('not a collection activity'); + $result[] = $DR->get(); + continue; + } + if (($act) && ($act->obj) && (!is_array($act->obj))) { // The initial object fetch failed using the sys channel credentials. // Try again using the delivery channel credentials. @@ -1579,6 +1620,8 @@ class Libzot { * */ + + if ($sender === $channel['channel_hash'] && $arr['author_xchan'] === $channel['channel_hash'] && !str_starts_with($arr['mid'], z_root())) { $DR->update('self delivery ignored'); $result[] = $DR->get(); @@ -1819,6 +1862,12 @@ class Libzot { } } + if($is_collection_operation) + hz_syslog('col1'); + else + hz_syslog('not col1'); + + // This is used to fetch allow/deny rules if either the sender // or owner is a connection. post_is_importable() evaluates all of them $abook = q("select * from abook where abook_channel = %d and ( abook_xchan = '%s' OR abook_xchan = '%s' )", @@ -1827,11 +1876,15 @@ class Libzot { dbesc($arr['author_xchan']) ); - // reactions such as like and dislike could have an mid with /activity/ in it. + // Reactions such as like and dislike could have an mid with /activity/ in it. // Check for both forms in order to prevent duplicates. - $r = q("select * from item where mid in ('%s','%s') and uid = %d limit 1", - dbesc($arr['mid']), - dbesc(str_replace(z_root() . '/activity/', z_root() . '/item/', $arr['mid'])), + + // If we process an add/remove activity, look for the original activity id instead of the object id + $sql_mid = (($is_collection_operation && $relay && $channel['channel_hash'] === $arr['owner_xchan']) ? $act->meta['original_id'] : $arr['mid']); + + $r = q("select * from item where mid in ('%s', '%s') and uid = %d limit 1", + dbesc($sql_mid), + dbesc(reverse_activity_mid($sql_mid)), intval($channel['channel_id']) ); @@ -1860,21 +1913,29 @@ class Libzot { $DR->update('update ignored'); $result[] = $DR->get(); } + + if ($relay && $channel['channel_hash'] === $item_result['item']['owner_xchan'] && $item_result['item']['verb'] !== 'Add' && !$is_collection_operation) { + $approval = Activity::addToCollection($channel, $act->data, $item_result['item']['parent_mid'], $item_result['item'], deliver: false); + } + } else { $DR->update('update ignored'); $result[] = $DR->get(); - // We need this line to ensure wall-to-wall comments are relayed (by falling through to the relay bit), + // We need this line to ensure wall-to-wall comments and add/remove activities are relayed (by falling through to the relay bit), // and at the same time not relay any other relayable posts more than once, because to do so is very wasteful. if (!intval($r[0]['item_origin'])) continue; } + + } else { $arr['aid'] = $channel['channel_account_id']; $arr['uid'] = $channel['channel_id']; + // if it's a sourced post, call the post_local hooks as if it were // posted locally so that crosspost connectors will be triggered. $item_source = check_item_source($arr['uid'], $arr); @@ -1903,10 +1964,17 @@ class Libzot { } if (post_is_importable($arr['uid'], $arr, $abook)) { - $item_result = item_store($arr); + $item_result = item_store($arr, addAndSync: false); + + + if ($item_result['success']) { $item_id = $item_result['item_id']; + if ($relay && $channel['channel_hash'] === $item_result['item']['owner_xchan'] && $item_result['item']['verb'] !== 'Add' && !$is_collection_operation) { + $approval = Activity::addToCollection($channel, $act->data, $item_result['item']['parent_mid'], $item_result['item'], deliver: false); + } + if ($item_source && in_array($item_result['item']['obj_type'], ['Event', ACTIVITY_OBJ_EVENT])) { event_addtocal($item_id, $channel['channel_id']); } @@ -1955,6 +2023,10 @@ class Libzot { if ($relay && $item_id && $stored['item_blocked'] !== ITEM_MODERATED) { logger('Invoking relay'); Master::Summon(['Notifier', 'relay', intval($item_id)]); + if (!empty($approval) && $approval['item_id']) { + Master::Summon(['Notifier', 'relay', intval($approval['item_id'])]); + } + $DR->addto_update('relayed'); $result[] = $DR->get(); } @@ -2007,6 +2079,7 @@ class Libzot { foreach ($items as $activity) { $AS = new ActivityStreams($activity); + if ($AS->is_valid() && $AS->type === 'Announce' && is_array($AS->obj) && array_key_exists('object', $AS->obj) && array_key_exists('actor', $AS->obj)) { // This is a relayed/forwarded Activity (as opposed to a shared/boosted object) @@ -2015,6 +2088,30 @@ class Libzot { $AS = new ActivityStreams($AS->obj); } + // process add/remove from collection separately, as it requires a target. + // use the raw object, as it will not include actor expansion + if (in_array($AS->type, ['Add', 'Remove']) + && is_array($AS->obj) + && array_key_exists('object', $AS->obj) + && array_key_exists('actor', $AS->obj) + && !empty($AS->tgt)) { + + logger('relayed collection operation', LOGGER_DEBUG); + + $is_collection_operation = true; + + $original_id = $AS->id; + $original_type = $AS->type; + + $raw_activity = $AS->data; + + $AS = new ActivityStreams($raw_activity['object'], portable_id: $env['sender']); + + // Store the original activity id and type for later usage + $AS->meta['original_id'] = $original_id; + $AS->meta['original_type'] = $original_type; + } + if (!$AS->is_valid()) { logger('Fetched activity rejected: ' . print_r($activity, true)); continue; @@ -2208,7 +2305,7 @@ class Libzot { } - $x = item_store_update($item); + $x = item_store_update($item, addAndSync: false); // If we're updating an event that we've saved locally, we store the item info first // because event_addtocal will parse the body to get the 'new' event details diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php index 9e28999c9..4242ec8e7 100644 --- a/Zotlabs/Lib/QueueWorker.php +++ b/Zotlabs/Lib/QueueWorker.php @@ -284,7 +284,7 @@ class QueueWorker { $jobs++; - logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG); + hz_syslog("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG); $workinfo = json_decode($workitem[0]['workerq_data'], true); $argv = $workinfo['argv']; diff --git a/Zotlabs/Module/Item.php b/Zotlabs/Module/Item.php index 47358aebd..0b50b9b55 100644 --- a/Zotlabs/Module/Item.php +++ b/Zotlabs/Module/Item.php @@ -1302,6 +1302,11 @@ class Item extends Controller { $post = item_store($datarray, $execflag); + hz_syslog('Item: ' . print_r($post['item_id'], true)); + hz_syslog('Item mid: ' . print_r($post['item']['mid'], true)); + hz_syslog('Item appr: ' . print_r($post['approval_id'], true)); + hz_syslog('Item appr mid: ' . print_r($post['approval']['mid'], true)); + if ($post['success']) { $this->add_listeners($datarray); } diff --git a/include/items.php b/include/items.php index fa699148a..60f5cbe9c 100644 --- a/include/items.php +++ b/include/items.php @@ -525,7 +525,7 @@ function post_activity_item($arr, $allow_code = false, $deliver = true, $channel $post = item_store($arr, $allow_code, $deliver, $addAndSync); -hz_syslog('xxx: ' . print_r($post, true)); + hz_syslog('post_activity_item: ' . print_r($post['item_id'], true)); if (!$post['success']) { return $ret; @@ -3395,6 +3395,9 @@ function start_delivery_chain($channel, $item, $item_id, $parent, $group = false if ($post_id) { Master::Summon([ 'Notifier','tgroup',$post_id ]); + if (!empty($post['approval_id'])) { + Master::Summon(['Notifier', 'tgroup', $post['approval_id']]); + } } q("update channel set channel_lastpost = '%s' where channel_id = %d", @@ -5181,10 +5184,12 @@ function addToCollectionAndSync($ret) { $channel = channelx_by_n($ret['item']['uid']); if ($channel && $channel['channel_hash'] === $ret['item']['owner_xchan']) { $items = [$ret['item']]; + if ((int)$items[0]['item_blocked'] === ITEM_MODERATED || (int)$items[0]['item_unpublished'] || (int)$items[0]['item_delayed']) { return $ret; } + xchan_query($items); $items = fetch_post_tags($items); $sync_items = []; @@ -5192,16 +5197,16 @@ function addToCollectionAndSync($ret) { if (!in_array($ret['item']['verb'], ['Add', 'Remove'])) { - $newObj = Activity::build_packet(Activity::encode_activity($items[0]), $channel, false); - $approval = Activity::addToCollection($channel, $newObj, $ret['item']['parent_mid'], $ret['item'], deliver: false); -//hz_syslog(print_r($approval, true)); + $new_obj = Activity::build_packet(Activity::encode_activity($items[0]), $channel, false); + $approval = Activity::addToCollection($channel, $new_obj, $ret['item']['parent_mid'], $ret['item'], deliver: false); + if ($approval['success']) { $ret['approval_id'] = $approval['item_id']; $ret['approval'] = $approval['activity']; - $addItems = [$approval['activity']]; - xchan_query($addItems); - $addItems = fetch_post_tags($addItems); - $sync_items[] = encode_item($addItems[0], true); + $add_items = [$approval['activity']]; + xchan_query($add_items); + $add_items = fetch_post_tags($add_items); + $sync_items[] = encode_item($add_items[0], true); } } @@ -5227,3 +5232,12 @@ function addToCollectionAndSync($ret) { return $ret; } + +function reverse_activity_mid($string) { + return str_replace(z_root() . '/activity/', z_root() . '/item/', $string); +} + +function set_activity_mid($string) { + return str_replace(z_root() . '/item/', z_root() . '/activity/', $string); +} + diff --git a/include/network.php b/include/network.php index b3a3d715c..0a78c144b 100644 --- a/include/network.php +++ b/include/network.php @@ -1488,7 +1488,7 @@ function do_delivery($deliveries, $force = false) { $interval = Config::Get('queueworker', 'queue_interval', 500000); - $deliveries_per_process = intval(Config::Get('system','delivery_batch_count')); +// $deliveries_per_process = intval(Config::Get('system','delivery_batch_count')); if($deliveries_per_process <= 0) $deliveries_per_process = 1; |