aboutsummaryrefslogtreecommitdiffstats
path: root/include/deliver.php
blob: 82a1ac6df4f3691ecf6e644462319c583995ce57 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
<?php /** @file */

require_once('include/cli_startup.php');
require_once('include/zot.php');


function deliver_run($argv, $argc) {

	cli_startup();

	$a = get_app();

	if($argc < 2)
		return;

	logger('deliver: invoked: ' . print_r($argv,true), LOGGER_DATA);


	for($x = 1; $x < $argc; $x ++) {

		$dresult = null;
		$r = q("select * from outq where outq_hash = '%s' limit 1",
			dbesc($argv[$x])
		);
		if($r) {

			/**
			 * Check to see if we have any recent communications with this hub (in the last month).
			 * If not, reduce the outq_priority.
			 */

			$h = parse_url($r[0]['outq_posturl']);
			if($h) {
				$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
				if($base !== z_root()) {
					$y = q("select site_update, site_dead from site where site_url = '%s' ",
						dbesc($base)
					);
					if($y) {
						if(intval($y[0]['site_dead'])) {
							q("delete from outq where outq_posturl = '%s'",
								dbesc($r[0]['outq_posturl'])
							);
							logger('dead site ignored ' . $base);
							continue;							
						}
						if($y[0]['site_update'] < datetime_convert('UTC','UTC','now - 1 month')) {
							q("update outq set outq_priority = %d where outq_hash = '%s'",
								intval($r[0]['outq_priority'] + 10),
								dbesc($r[0]['outq_hash'])
							);
							logger('immediate delivery deferred for site ' . $base);
							continue;
						}
					}
				}
			} 

			// "post" queue driver - used for diaspora and friendica-over-diaspora communications.

			if($r[0]['outq_driver'] === 'post') {
				$result = z_post_url($r[0]['outq_posturl'],$r[0]['outq_msg']); 
				if($result['success'] && $result['return_code'] < 300) {
					logger('deliver: queue post success to ' . $r[0]['outq_posturl'], LOGGER_DEBUG);

					q("update dreport set status = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1",
						dbesc('accepted for delivery'),
						dbesc(datetime_convert()),
						dbesc($argv[$x])
					);

					$y = q("delete from outq where outq_hash = '%s'",
						dbesc($argv[$x])
					);

				}
				else {
					logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $r[0]['outq_posturl'],LOGGER_DEBUG);
					$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
						dbesc(datetime_convert()),
						dbesc($argv[$x])
					);
				}
				continue;
			}

			$notify = json_decode($r[0]['outq_notify'],true);

			// Check if this is a conversation request packet. It won't have outq_msg
			// but will be an encrypted packet - so will need to be handed off to
			// web delivery rather than processed inline. 

			$sendtoweb = false;
			if(array_key_exists('iv',$notify) && (! $r[0]['outq_msg']))
				$sendtoweb = true;

			if(($r[0]['outq_posturl'] === z_root() . '/post') && (! $sendtoweb)) {
				logger('deliver: local delivery', LOGGER_DEBUG);
				// local delivery
				// we should probably batch these and save a few delivery processes

				if($r[0]['outq_msg']) {
					$m = json_decode($r[0]['outq_msg'],true);
					if(array_key_exists('message_list',$m)) {
						foreach($m['message_list'] as $mm) {
							$msg = array('body' => json_encode(array('success' => true, 'pickup' => array(array('notify' => $notify,'message' => $mm)))));
							zot_import($msg,z_root());
						}
					}	
					else {	
						$msg = array('body' => json_encode(array('success' => true, 'pickup' => array(array('notify' => $notify,'message' => $m)))));
						$dresult = zot_import($msg,z_root());
					}
					$r = q("delete from outq where outq_hash = '%s'",
						dbesc($argv[$x])
					);
					if($dresult && is_array($dresult)) {
						foreach($dresult as $xx) {
							if(is_array($xx) && array_key_exists('message_id',$xx)) {
								q("insert into dreport ( dreport_mid, dreport_site, dreport_recip, dreport_result, dreport_time, dreport_xchan ) values ( '%s', '%s','%s','%s','%s','%s' ) ",
									dbesc($xx['message_id']),
									dbesc($xx['location']),
									dbesc($xx['recipient']),
									dbesc($xx['status']),
									dbesc(datetime_convert($xx['date'])),
									dbesc($xx['sender'])
								);
							}
						}
					}

					q("delete from dreport where dreport_queue = '%s' limit 1",
						dbesc($argv[$x])
					);
				}
			}
			else {
				logger('deliver: dest: ' . $r[0]['outq_posturl'], LOGGER_DEBUG);
				$result = zot_zot($r[0]['outq_posturl'],$r[0]['outq_notify']); 
				if($result['success']) {
					logger('deliver: remote zot delivery succeeded to ' . $r[0]['outq_posturl']);
					zot_process_response($r[0]['outq_posturl'],$result, $r[0]);				
				}
				else {
					logger('deliver: remote zot delivery failed to ' . $r[0]['outq_posturl']);
					$y = q("update outq set outq_updated = '%s' where outq_hash = '%s'",
						dbesc(datetime_convert()),
						dbesc($argv[$x])
					);
				}
			}
		}
	}
}

if (array_search(__file__,get_included_files())===0){
  deliver_run($argv,$argc);
  killme();
}