aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFriendika <info@friendika.com>2011-10-21 03:33:34 -0700
committerFriendika <info@friendika.com>2011-10-21 03:33:34 -0700
commit6e76c86ad20c5c9ae3f8f2e2c226c8e22b9a8032 (patch)
treebef69999ba8f63cabbeceeec052807350b24d5f7
parentd1833cabf67b35a10a676d69a1e45fe7aadc31bc (diff)
downloadvolse-hubzilla-6e76c86ad20c5c9ae3f8f2e2c226c8e22b9a8032.tar.gz
volse-hubzilla-6e76c86ad20c5c9ae3f8f2e2c226c8e22b9a8032.tar.bz2
volse-hubzilla-6e76c86ad20c5c9ae3f8f2e2c226c8e22b9a8032.zip
queue api + queue limits
-rw-r--r--include/delivery.php18
-rw-r--r--include/diaspora.php22
-rw-r--r--include/notifier.php26
-rw-r--r--include/profile_update.php14
-rw-r--r--include/queue_fn.php35
5 files changed, 53 insertions, 62 deletions
diff --git a/include/delivery.php b/include/delivery.php
index 8318be4dd..a9e629fcf 100644
--- a/include/delivery.php
+++ b/include/delivery.php
@@ -1,5 +1,6 @@
<?php
require_once("boot.php");
+require_once('include/queue_fn.php');
function delivery_run($argv, $argc){
global $a, $db;
@@ -323,14 +324,7 @@ function delivery_run($argv, $argc){
if($deliver_status == (-1)) {
logger('notifier: delivery failed: queuing message');
- // queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
- VALUES ( %d, '%s', '%s', '%s') ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($atom)
- );
+ add_to_queue($contact['id'],NETWORK_DFRN,$atom);
}
break;
@@ -370,13 +364,7 @@ function delivery_run($argv, $argc){
$deliver_status = slapper($owner,$contact['notify'],$slappy);
if($deliver_status == (-1)) {
// queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
- VALUES ( %d, '%s', '%s', '%s') ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($slappy)
- );
+ add_to_queue($contact['id'],NETWORK_OSTATUS,$slappy);
}
}
}
diff --git a/include/diaspora.php b/include/diaspora.php
index 89707967f..fa5973e7b 100644
--- a/include/diaspora.php
+++ b/include/diaspora.php
@@ -4,6 +4,7 @@ require_once('include/crypto.php');
require_once('include/items.php');
require_once('include/bb2diaspora.php');
require_once('include/contact_selectors.php');
+require_once('include/queue_fn.php');
function diaspora_dispatch_public($msg) {
@@ -957,7 +958,7 @@ function diaspora_comment($importer,$xml,$msg) {
);
}
- if(! $parent_author_signature) {
+ if(($parent_item['origin']) && (! $parent_author_signature)) {
q("insert into sign (`iid`,`signed_text`,`signature`,`signer`) values (%d,'%s','%s','%s') ",
intval($message_id),
dbesc($author_signed_data),
@@ -1162,9 +1163,9 @@ EOT;
$arr['parent'] = $parent_item['id'];
$arr['parent-uri'] = $parent_item['uri'];
- $arr['owner-name'] = $contact['name'];
- $arr['owner-link'] = $contact['url'];
- $arr['owner-avatar'] = $contact['thumb'];
+ $arr['owner-name'] = $parent_item['name'];
+ $arr['owner-link'] = $parent_item['url'];
+ $arr['owner-avatar'] = $parent_item['thumb'];
$arr['author-name'] = $person['name'];
$arr['author-link'] = $person['url'];
@@ -1206,9 +1207,9 @@ EOT;
// if the message isn't already being relayed, notify others
// the existence of parent_author_signature means the parent_author or owner
- // is already relaying.
+ // is already relaying. The parent_item['origin'] indicates the message was created on our system
- if(! $parent_author_signature)
+ if(($parent_item['origin']) && (! $parent_author_signature))
proc_run('php','include/notifier.php','comment',$message_id);
return;
@@ -1647,14 +1648,7 @@ function diaspora_transmit($owner,$contact,$slap,$public_batch) {
if((! $return_code) || (($curl_stat == 503) && (stristr($a->get_curl_headers(),'retry-after')))) {
logger('diaspora_transmit: queue message');
// queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`,`batch`)
- VALUES ( %d, '%s', '%s', '%s', %d) ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($slap),
- intval($public_batch)
- );
+ add_to_queue($contact['id'],NETWORK_DIASPORA,$slap,$public_batch);
}
diff --git a/include/notifier.php b/include/notifier.php
index 18ad07012..d12119b05 100644
--- a/include/notifier.php
+++ b/include/notifier.php
@@ -1,6 +1,7 @@
<?php
require_once("boot.php");
+require_once('include/queue_fn.php');
/*
* This file was at one time responsible for doing all deliveries, but this caused
@@ -519,13 +520,7 @@ function notifier_run($argv, $argc){
if($deliver_status == (-1)) {
logger('notifier: delivery failed: queuing message');
// queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
- VALUES ( %d, '%s', '%s', '%s') ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($atom)
- );
+ add_to_queue($contact['id'],NETWORK_DFRN,$atom);
}
break;
case NETWORK_OSTATUS:
@@ -542,14 +537,7 @@ function notifier_run($argv, $argc){
if($deliver_status == (-1)) {
// queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
- VALUES ( %d, '%s', '%s', '%s') ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($slap)
- );
-
+ add_to_queue($contact['id'],NETWORK_OSTATUS,$slap);
}
}
else {
@@ -564,13 +552,7 @@ function notifier_run($argv, $argc){
$deliver_status = slapper($owner,$contact['notify'],$slappy);
if($deliver_status == (-1)) {
// queue message for redelivery
- q("INSERT INTO `queue` ( `cid`, `created`, `last`, `content`)
- VALUES ( %d, '%s', '%s', '%s') ",
- intval($contact['id']),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($slappy)
- );
+ add_to_queue($contact['id'],NETWORK_OSTATUS,$slappy);
}
}
}
diff --git a/include/profile_update.php b/include/profile_update.php
index 1a2d9d3b5..3828e90ed 100644
--- a/include/profile_update.php
+++ b/include/profile_update.php
@@ -2,6 +2,7 @@
require_once('include/datetime.php');
require_once('include/diaspora.php');
+require_once('include/queue_fn.php');
function profile_change() {
@@ -81,7 +82,6 @@ function profile_change() {
$tpl = get_markup_template('diaspora_profile.tpl');
-
$msg = replace_macros($tpl,array(
'$handle' => $handle,
'$first' => $first,
@@ -100,14 +100,6 @@ function profile_change() {
$msgtosend = diaspora_msg_build($msg,$a->user,null,$a->user['prvkey'],null,true);
foreach($recips as $recip) {
- q("insert into queue (`cid`,`network`,`created`,`last`,`content`,`batch`)
- values(%d,'%s','%s','%s','%s',%d)",
- intval($recip['id']),
- dbesc(NETWORK_DIASPORA),
- dbesc(datetime_convert()),
- dbesc(datetime_convert()),
- dbesc($msgtosend),
- intval(1)
- );
+ add_to_queue($recip['id'],NETWORK_DIASPORA,$msgtosend,true);
}
-} \ No newline at end of file
+}
diff --git a/include/queue_fn.php b/include/queue_fn.php
index bc47ceffd..2168d42d7 100644
--- a/include/queue_fn.php
+++ b/include/queue_fn.php
@@ -14,3 +14,38 @@ function remove_queue_item($id) {
intval($id)
);
}
+
+
+function add_to_queue($cid,$network,$msg,$batch = false) {
+
+ $max_queue = get_config('system','max_contact_queue');
+ if($max_queue < 1)
+ $max_queue = 500;
+
+ $batch_queue = get_config('system','max_batch_queue');
+ if($batch_queue < 1)
+ $batch_queue = 1000;
+
+ $r = q("SELECT COUNT(*) AS `total` FROM `queue` left join `contact` WHERE ``queue`.`cid` = %d AND `contact`.`self` = 0 ",
+ intval($cid)
+ );
+ if($r && count($r)) {
+ if($batch && ($r[0]['total'] > $batch_queue)) {
+ logger('add_to_queue: too many queued items for batch server ' . $cid . ' - discarding message');
+ return;
+ }
+ elseif((! $batch) && ($r[0]['total'] > $max_queue)) {
+ logger('add_to_queue: too many queued items for contact ' . $cid . ' - discarding message');
+ return;
+ }
+ }
+
+ q("INSERT INTO `queue` ( `cid`, `network`, `created`, `last`, `content`, `batch`)
+ VALUES ( %d, '%s', '%s', '%s', '%s', %d) ",
+ intval($cid),
+ dbesc(datetime_convert()),
+ dbesc(datetime_convert()),
+ dbesc($msg),
+ intval(($batch) ? 1: 0)
+ );
+}