aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMario Vavti <mario@mariovavti.com>2024-10-09 18:17:37 +0200
committerMario Vavti <mario@mariovavti.com>2024-10-09 18:17:37 +0200
commit45c0091d3d5ce1aae1f4915dfd4c15865a011fe1 (patch)
tree8a9c4dd92ca86a3ee0fec353de9b80f49aef3a2a
parent75184355d02007593d978a13b7e57514e6b08f63 (diff)
downloadvolse-hubzilla-45c0091d3d5ce1aae1f4915dfd4c15865a011fe1.tar.gz
volse-hubzilla-45c0091d3d5ce1aae1f4915dfd4c15865a011fe1.tar.bz2
volse-hubzilla-45c0091d3d5ce1aae1f4915dfd4c15865a011fe1.zip
more work on porting containers from streams
-rw-r--r--Zotlabs/Daemon/Notifier.php15
-rw-r--r--Zotlabs/Lib/Activity.php5
-rw-r--r--Zotlabs/Lib/Libzot.php121
-rw-r--r--Zotlabs/Lib/QueueWorker.php2
-rw-r--r--Zotlabs/Module/Item.php5
-rw-r--r--include/items.php30
-rw-r--r--include/network.php2
7 files changed, 149 insertions, 31 deletions
diff --git a/Zotlabs/Daemon/Notifier.php b/Zotlabs/Daemon/Notifier.php
index 76e0628bf..77ec6920d 100644
--- a/Zotlabs/Daemon/Notifier.php
+++ b/Zotlabs/Daemon/Notifier.php
@@ -299,7 +299,7 @@ class Notifier {
return;
}
- if ($target_item['verb'] === ACTIVITY_SHARE) {
+ if (in_array($target_item['verb'], [ACTIVITY_SHARE])) {
// Provide correct representation across the wire. Internally this is treated as a comment.
$target_item['parent_mid'] = $target_item['thr_parent'] = $target_item['mid'];
}
@@ -345,7 +345,7 @@ class Notifier {
}
logger('target_item: ' . print_r($target_item, true), LOGGER_DEBUG);
- hz_syslog('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG);
+ logger('encoded: ' . print_r(self::$encoded_item, true), LOGGER_DEBUG);
// Send comments to the owner to re-deliver to everybody in the conversation
// We only do this if the item in question originated on this site. This prevents looping.
@@ -431,6 +431,9 @@ class Notifier {
return;
}
+ hz_syslog(print_r(self::$encoded_item['type'], true));
+ hz_syslog(print_r(self::$recipients, true));
+
// logger('recipients: ' . print_r(self::$recipients,true), LOGGER_NORMAL, LOG_DEBUG);
if (!count(self::$env_recips)) {
@@ -468,7 +471,7 @@ class Notifier {
'queued' => []
];
- call_hooks('notifier_process', $narr);
+ //call_hooks('notifier_process', $narr);
if ($narr['queued']) {
foreach ($narr['queued'] as $pq)
self::$deliveries[] = $pq;
@@ -589,7 +592,9 @@ class Notifier {
foreach ($dhubs as $hub) {
- logger('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG);
+ hz_syslog('notifier_hub: ' . $hub['hubloc_url'], LOGGER_DEBUG);
+ hz_syslog(print_r($target_item['id'], true));
+ hz_syslog(print_r($target_item['verb'], true));
if ($hub['hubloc_network'] !== 'zot6') {
$narr = [
@@ -611,7 +616,7 @@ class Notifier {
'queued' => []
];
- call_hooks('notifier_hub', $narr);
+ // call_hooks('notifier_hub', $narr);
if ($narr['queued']) {
foreach ($narr['queued'] as $pq)
self::$deliveries[] = $pq;
diff --git a/Zotlabs/Lib/Activity.php b/Zotlabs/Lib/Activity.php
index 2b9beb3f5..dfbdacf18 100644
--- a/Zotlabs/Lib/Activity.php
+++ b/Zotlabs/Lib/Activity.php
@@ -974,7 +974,7 @@ class Activity {
// inReplyTo needs to be set in the activity for followup actions (Like, Dislike, Announce, etc.),
// but *not* for comments and RSVPs, where it should only be present in the object
- if (!in_array($ret['type'], ['Create', 'Update', 'Accept', 'Reject', 'TentativeAccept', 'TentativeReject'])) {
+ if (!in_array($ret['type'], ['Create', 'Update', 'Add', 'Remove', 'Accept', 'Reject', 'TentativeAccept', 'TentativeReject'])) {
$ret['inReplyTo'] = ((strpos($i['thr_parent'], 'http') === 0) ? $i['thr_parent'] : z_root() . '/item/' . urlencode($i['thr_parent']));
}
@@ -1067,11 +1067,8 @@ class Activity {
call_hooks('encode_activity', $hookinfo);
-//hz_syslog(print_r($hookinfo['encoded'], true));
-
return $hookinfo['encoded'];
-
}
// Returns an array of URLS for any mention tags found in the item array $i.
diff --git a/Zotlabs/Lib/Libzot.php b/Zotlabs/Lib/Libzot.php
index 243588d2b..5c0b620eb 100644
--- a/Zotlabs/Lib/Libzot.php
+++ b/Zotlabs/Lib/Libzot.php
@@ -1134,6 +1134,7 @@ class Libzot {
}
$message_request = false;
+ $is_collection_operation = false;
$has_data = array_key_exists('data', $env) && $env['data'];
@@ -1141,16 +1142,37 @@ class Libzot {
$AS = null;
+
if ($env['encoding'] === 'activitystreams') {
$AS = new ActivityStreams($data);
- if (!$AS->is_valid()) {
- logger('Activity rejected: ' . print_r($data, true));
- return;
+
+ // process add/remove from collection separately, as it requires a target.
+ // use the raw object, as it will not include actor expansion
+ if (in_array($AS->type, ['Add', 'Remove'])
+ && is_array($AS->obj)
+ && array_key_exists('object', $AS->obj)
+ && array_key_exists('actor', $AS->obj)
+ && !empty($AS->tgt)) {
+
+ hz_syslog('relayed collection operation', LOGGER_DEBUG);
+ $is_collection_operation = true;
+
+ $original_id = $AS->id;
+ $original_type = $AS->type;
+
+ $raw_activity = $AS->data;
+
+ $AS = new ActivityStreams($raw_activity['object'], portable_id: $env['sender']);
+
+ // Store the original activity id and type for later usage
+ $AS->meta['original_id'] = $original_id;
+ $AS->meta['original_type'] = $original_type;
}
if (is_array($AS->obj)) {
$item = Activity::decode_note($AS);
+
if (!$item) {
logger('Could not decode activity: ' . print_r($AS, true));
return;
@@ -1159,6 +1181,12 @@ class Libzot {
else {
$item = [];
}
+
+ if (!$AS->is_valid()) {
+ logger('Activity rejected: ' . print_r($data, true));
+ return;
+ }
+
logger($AS->debug(), LOGGER_DATA);
}
@@ -1302,7 +1330,12 @@ class Libzot {
$relay = (($env['type'] === 'response') ? true : false);
- $result = self::process_delivery($env['sender'], $AS, $item, $deliveries, $relay, false, $message_request);
+ if($is_collection_operation)
+ hz_syslog('col');
+ else
+ hz_syslog('not col');
+
+ $result = self::process_delivery($env['sender'], $AS, $item, $deliveries, $relay, false, $message_request, false, $is_collection_operation);
Activity::init_background_fetch($env['sender']);
}
@@ -1518,7 +1551,7 @@ class Libzot {
* @return array
*/
- static function process_delivery($sender, $act, $arr, $deliveries, $relay, $public = false, $request = false, $force = false) {
+ static function process_delivery($sender, $act, $arr, $deliveries, $relay, $public = false, $request = false, $force = false, $is_collection_operation = false) {
$result = [];
// We've validated the sender. Now make sure that the sender is the owner or author
@@ -1546,6 +1579,14 @@ class Libzot {
$DR->set_name($channel['channel_name'] . ' <' . channel_reddress($channel) . '>');
+ $conversation_operation = $is_collection_operation && isset($arr['target']['attributedTo']);
+
+ if (str_contains($arr['tgt_type'], 'Collection') && !$relay && !$conversation_operation) {
+ $DR->update('not a collection activity');
+ $result[] = $DR->get();
+ continue;
+ }
+
if (($act) && ($act->obj) && (!is_array($act->obj))) {
// The initial object fetch failed using the sys channel credentials.
// Try again using the delivery channel credentials.
@@ -1579,6 +1620,8 @@ class Libzot {
*
*/
+
+
if ($sender === $channel['channel_hash'] && $arr['author_xchan'] === $channel['channel_hash'] && !str_starts_with($arr['mid'], z_root())) {
$DR->update('self delivery ignored');
$result[] = $DR->get();
@@ -1819,6 +1862,12 @@ class Libzot {
}
}
+ if($is_collection_operation)
+ hz_syslog('col1');
+ else
+ hz_syslog('not col1');
+
+
// This is used to fetch allow/deny rules if either the sender
// or owner is a connection. post_is_importable() evaluates all of them
$abook = q("select * from abook where abook_channel = %d and ( abook_xchan = '%s' OR abook_xchan = '%s' )",
@@ -1827,11 +1876,15 @@ class Libzot {
dbesc($arr['author_xchan'])
);
- // reactions such as like and dislike could have an mid with /activity/ in it.
+ // Reactions such as like and dislike could have an mid with /activity/ in it.
// Check for both forms in order to prevent duplicates.
- $r = q("select * from item where mid in ('%s','%s') and uid = %d limit 1",
- dbesc($arr['mid']),
- dbesc(str_replace(z_root() . '/activity/', z_root() . '/item/', $arr['mid'])),
+
+ // If we process an add/remove activity, look for the original activity id instead of the object id
+ $sql_mid = (($is_collection_operation && $relay && $channel['channel_hash'] === $arr['owner_xchan']) ? $act->meta['original_id'] : $arr['mid']);
+
+ $r = q("select * from item where mid in ('%s', '%s') and uid = %d limit 1",
+ dbesc($sql_mid),
+ dbesc(reverse_activity_mid($sql_mid)),
intval($channel['channel_id'])
);
@@ -1860,21 +1913,29 @@ class Libzot {
$DR->update('update ignored');
$result[] = $DR->get();
}
+
+ if ($relay && $channel['channel_hash'] === $item_result['item']['owner_xchan'] && $item_result['item']['verb'] !== 'Add' && !$is_collection_operation) {
+ $approval = Activity::addToCollection($channel, $act->data, $item_result['item']['parent_mid'], $item_result['item'], deliver: false);
+ }
+
}
else {
$DR->update('update ignored');
$result[] = $DR->get();
- // We need this line to ensure wall-to-wall comments are relayed (by falling through to the relay bit),
+ // We need this line to ensure wall-to-wall comments and add/remove activities are relayed (by falling through to the relay bit),
// and at the same time not relay any other relayable posts more than once, because to do so is very wasteful.
if (!intval($r[0]['item_origin']))
continue;
}
+
+
}
else {
$arr['aid'] = $channel['channel_account_id'];
$arr['uid'] = $channel['channel_id'];
+
// if it's a sourced post, call the post_local hooks as if it were
// posted locally so that crosspost connectors will be triggered.
$item_source = check_item_source($arr['uid'], $arr);
@@ -1903,10 +1964,17 @@ class Libzot {
}
if (post_is_importable($arr['uid'], $arr, $abook)) {
- $item_result = item_store($arr);
+ $item_result = item_store($arr, addAndSync: false);
+
+
+
if ($item_result['success']) {
$item_id = $item_result['item_id'];
+ if ($relay && $channel['channel_hash'] === $item_result['item']['owner_xchan'] && $item_result['item']['verb'] !== 'Add' && !$is_collection_operation) {
+ $approval = Activity::addToCollection($channel, $act->data, $item_result['item']['parent_mid'], $item_result['item'], deliver: false);
+ }
+
if ($item_source && in_array($item_result['item']['obj_type'], ['Event', ACTIVITY_OBJ_EVENT])) {
event_addtocal($item_id, $channel['channel_id']);
}
@@ -1955,6 +2023,10 @@ class Libzot {
if ($relay && $item_id && $stored['item_blocked'] !== ITEM_MODERATED) {
logger('Invoking relay');
Master::Summon(['Notifier', 'relay', intval($item_id)]);
+ if (!empty($approval) && $approval['item_id']) {
+ Master::Summon(['Notifier', 'relay', intval($approval['item_id'])]);
+ }
+
$DR->addto_update('relayed');
$result[] = $DR->get();
}
@@ -2007,6 +2079,7 @@ class Libzot {
foreach ($items as $activity) {
$AS = new ActivityStreams($activity);
+
if ($AS->is_valid() && $AS->type === 'Announce' && is_array($AS->obj)
&& array_key_exists('object', $AS->obj) && array_key_exists('actor', $AS->obj)) {
// This is a relayed/forwarded Activity (as opposed to a shared/boosted object)
@@ -2015,6 +2088,30 @@ class Libzot {
$AS = new ActivityStreams($AS->obj);
}
+ // process add/remove from collection separately, as it requires a target.
+ // use the raw object, as it will not include actor expansion
+ if (in_array($AS->type, ['Add', 'Remove'])
+ && is_array($AS->obj)
+ && array_key_exists('object', $AS->obj)
+ && array_key_exists('actor', $AS->obj)
+ && !empty($AS->tgt)) {
+
+ logger('relayed collection operation', LOGGER_DEBUG);
+
+ $is_collection_operation = true;
+
+ $original_id = $AS->id;
+ $original_type = $AS->type;
+
+ $raw_activity = $AS->data;
+
+ $AS = new ActivityStreams($raw_activity['object'], portable_id: $env['sender']);
+
+ // Store the original activity id and type for later usage
+ $AS->meta['original_id'] = $original_id;
+ $AS->meta['original_type'] = $original_type;
+ }
+
if (!$AS->is_valid()) {
logger('Fetched activity rejected: ' . print_r($activity, true));
continue;
@@ -2208,7 +2305,7 @@ class Libzot {
}
- $x = item_store_update($item);
+ $x = item_store_update($item, addAndSync: false);
// If we're updating an event that we've saved locally, we store the item info first
// because event_addtocal will parse the body to get the 'new' event details
diff --git a/Zotlabs/Lib/QueueWorker.php b/Zotlabs/Lib/QueueWorker.php
index 9e28999c9..4242ec8e7 100644
--- a/Zotlabs/Lib/QueueWorker.php
+++ b/Zotlabs/Lib/QueueWorker.php
@@ -284,7 +284,7 @@ class QueueWorker {
$jobs++;
- logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG);
+ hz_syslog("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG);
$workinfo = json_decode($workitem[0]['workerq_data'], true);
$argv = $workinfo['argv'];
diff --git a/Zotlabs/Module/Item.php b/Zotlabs/Module/Item.php
index 47358aebd..0b50b9b55 100644
--- a/Zotlabs/Module/Item.php
+++ b/Zotlabs/Module/Item.php
@@ -1302,6 +1302,11 @@ class Item extends Controller {
$post = item_store($datarray, $execflag);
+ hz_syslog('Item: ' . print_r($post['item_id'], true));
+ hz_syslog('Item mid: ' . print_r($post['item']['mid'], true));
+ hz_syslog('Item appr: ' . print_r($post['approval_id'], true));
+ hz_syslog('Item appr mid: ' . print_r($post['approval']['mid'], true));
+
if ($post['success']) {
$this->add_listeners($datarray);
}
diff --git a/include/items.php b/include/items.php
index fa699148a..60f5cbe9c 100644
--- a/include/items.php
+++ b/include/items.php
@@ -525,7 +525,7 @@ function post_activity_item($arr, $allow_code = false, $deliver = true, $channel
$post = item_store($arr, $allow_code, $deliver, $addAndSync);
-hz_syslog('xxx: ' . print_r($post, true));
+ hz_syslog('post_activity_item: ' . print_r($post['item_id'], true));
if (!$post['success']) {
return $ret;
@@ -3395,6 +3395,9 @@ function start_delivery_chain($channel, $item, $item_id, $parent, $group = false
if ($post_id) {
Master::Summon([ 'Notifier','tgroup',$post_id ]);
+ if (!empty($post['approval_id'])) {
+ Master::Summon(['Notifier', 'tgroup', $post['approval_id']]);
+ }
}
q("update channel set channel_lastpost = '%s' where channel_id = %d",
@@ -5181,10 +5184,12 @@ function addToCollectionAndSync($ret) {
$channel = channelx_by_n($ret['item']['uid']);
if ($channel && $channel['channel_hash'] === $ret['item']['owner_xchan']) {
$items = [$ret['item']];
+
if ((int)$items[0]['item_blocked'] === ITEM_MODERATED
|| (int)$items[0]['item_unpublished'] || (int)$items[0]['item_delayed']) {
return $ret;
}
+
xchan_query($items);
$items = fetch_post_tags($items);
$sync_items = [];
@@ -5192,16 +5197,16 @@ function addToCollectionAndSync($ret) {
if (!in_array($ret['item']['verb'], ['Add', 'Remove'])) {
- $newObj = Activity::build_packet(Activity::encode_activity($items[0]), $channel, false);
- $approval = Activity::addToCollection($channel, $newObj, $ret['item']['parent_mid'], $ret['item'], deliver: false);
-//hz_syslog(print_r($approval, true));
+ $new_obj = Activity::build_packet(Activity::encode_activity($items[0]), $channel, false);
+ $approval = Activity::addToCollection($channel, $new_obj, $ret['item']['parent_mid'], $ret['item'], deliver: false);
+
if ($approval['success']) {
$ret['approval_id'] = $approval['item_id'];
$ret['approval'] = $approval['activity'];
- $addItems = [$approval['activity']];
- xchan_query($addItems);
- $addItems = fetch_post_tags($addItems);
- $sync_items[] = encode_item($addItems[0], true);
+ $add_items = [$approval['activity']];
+ xchan_query($add_items);
+ $add_items = fetch_post_tags($add_items);
+ $sync_items[] = encode_item($add_items[0], true);
}
}
@@ -5227,3 +5232,12 @@ function addToCollectionAndSync($ret) {
return $ret;
}
+
+function reverse_activity_mid($string) {
+ return str_replace(z_root() . '/activity/', z_root() . '/item/', $string);
+}
+
+function set_activity_mid($string) {
+ return str_replace(z_root() . '/item/', z_root() . '/activity/', $string);
+}
+
diff --git a/include/network.php b/include/network.php
index b3a3d715c..0a78c144b 100644
--- a/include/network.php
+++ b/include/network.php
@@ -1488,7 +1488,7 @@ function do_delivery($deliveries, $force = false) {
$interval = Config::Get('queueworker', 'queue_interval', 500000);
- $deliveries_per_process = intval(Config::Get('system','delivery_batch_count'));
+// $deliveries_per_process = intval(Config::Get('system','delivery_batch_count'));
if($deliveries_per_process <= 0)
$deliveries_per_process = 1;