aboutsummaryrefslogblamecommitdiffstats
path: root/Zotlabs/Daemon/Master.php
blob: 3a71ee578a33bb929163abb091eb4ff14e6e6305 (plain) (tree)
1
2
3
4
5




                                                          












                                                

                                          





                                                                 




























                                                                                                                 



                                                                                                        












































                                                                                                            




                                                                                                         





















                                                                                                           
                


                                                                                               









                                                                                                        


                                                    
 
<?php

namespace Zotlabs\Daemon;

if(array_search( __file__ , get_included_files()) === 0) {
	require_once('include/cli_startup.php');
	array_shift($argv);
	$argc = count($argv);

	if($argc)
		Master::Release($argc,$argv);
	killme();
}



class Master {

	static public $queueworker = null;

	static public function Summon($arr) {
		proc_run('php','Zotlabs/Daemon/Master.php',$arr);
	}

	static public function Release($argc,$argv) {
		cli_startup();

		$maxworkers = get_config('system','max_queue_workers');

		if (!$maxworkers || $maxworkers == 0) {
			logger('Master: release: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
                	$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
                	$cls::run($argc,$argv);
			self::ClearQueue();
		} else {
			logger('Master: enqueue: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
			$workinfo = ['argc'=>$argc,'argv'=>$argv];
			q("insert into config (cat,k,v) values ('queuework','%s','%s')",
				dbesc(uniqid('workitem:',true)),
				dbesc(serialize($workinfo)));
			self::Process();
		}
	}

	static public function GetWorkerID() {
		$maxworkers = get_config('system','max_queue_workers');
		$maxworkers = ($maxworkers) ? $maxworkers : 3;

		$workermaxage = get_config('system','max_queue_worker_age');
		$workermaxage = ($workermaxage) ? $workermaxage : 300;

		$workers = q("select * from config where cat='queueworkers' and k like '%s'", 'workerstarted_%');

		if (count($workers) > $maxworkers) {
			foreach ($workers as $idx => $worker) {
                                $curtime = time();
                                $age = (intval($curtime) - intval($worker['v']));
                                if ( $age > $workermaxage) {
					logger("Prune worker: ".$worker['k'], LOGGER_ALL, LOGGER_DEBUG);
					$k = explode('_',$worker['k']);
					q("delete from config where cat='queueworkers' and k='%s'",
						'workerstarted_'.$k[1]);
					q("update config set k='workitem' where cat='queuework' and k='%s'",
						'workitem_'.$k[1]);
					unset($workers[$idx]);
				}
			}
			if (count($workers) > $maxworkers) {
				return false;
			}
		}
		return uniqid();

	}

	static public function Process() {

		self::$queueworker = self::GetWorkerID();

		if (!self::$queueworker) {
			logger('Master: unable to obtain worker ID.');
			killme();
		}

		set_config('queueworkers','workerstarted_'.self::$queueworker,time());

		$workersleep = get_config('system','queue_worker_sleep');
		$workersleep = ($workersleep) ? $workersleep : 5;
		cli_startup();

		$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
			'workitem_'.self::$queueworker,
			dbesc('workitem:%'));
		$jobs = 0;
		while ($work) {
			$workitem = q("select * from config where cat='queuework' and k='%s'",
				'workitem_'.self::$queueworker);

			if (isset($workitem[0])) {
				$jobs++;
				$workinfo = unserialize($workitem[0]['v']);
				$argc = $workinfo['argc'];
				$argv = $workinfo['argv'];
				logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);

				//Delete unclaimed duplicate workitems.
				q("delete from config where cat='queuework' and k='workitem' and v='%s'",
					serialize($argv));

				$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
				$cls::run($argc,$argv);

				//Right now we assume that if we get a return, everything is OK.
				//At some point we may want to test whether the run returns true/false
				//    and requeue the work to be tried again.  But we probably want
				//    to implement some sort of "retry interval" first.

				q("delete from config where cat='queuework' and k='%s'",
					'workitem_'.self::$queueworker);
			} else {
				break;
			}
			sleep ($workersleep);
			$work = q("update config set k='%s' where cat='queuework' and k like '%s' limit 1",
				'workitem_'.self::$queueworker,
				dbesc('workitem:%'));

		}
		logger('Master: Worker Thread: queue items processed:' . $jobs);
		q("delete from config where cat='queueworkers' and k='%s'",
			'workerstarted_'.self::$queueworker);
	}	

	static public function ClearQueue() {
			$work = q("select * from config where cat='queuework' and k like '%s'",
				dbesc('workitem%'));
			foreach ($work as $workitem) {
				$workinfo = unserialize($workitem['v']);
				$argc = $workinfo['argc'];
				$argv = $workinfo['argv'];
				logger('Master: process: ' . print_r($argv,true), LOGGER_ALL,LOG_DEBUG);
				$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
				$cls::run($argc,$argv);
			}
			$work = q("delete from config where cat='queuework' and k like '%s'",
				dbesc('workitem%'));
	}
	
}