aboutsummaryrefslogblamecommitdiffstats
path: root/Zotlabs/Lib/QueueWorker.php
blob: 9f60e3315e0055aa48b69be75d71da4fc8eaa48b (plain) (tree)





































































                                                                                          







                                                                                
                                                                                   






































                                                                                                                            






                                                                                
                                                                                   






































































































                                                                                                                                                                                         






                                                                                  







                                                                                                             
                                 


                                                                 
                                                                                      





                                                                 
                                                   





























                                                                                                                                         







































































                                                                                                          
<?php

namespace Zotlabs\Lib;


use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\Exception\UnableToBuildUuidException;


class QueueWorker {

	public static $queueworker = null;
	public static $maxworkers = 0;
	public static $workermaxage = 0;
	public static $workersleep = 100;
	public static $default_priorities = [
		'Notifier'          => 10,
		'Deliver'           => 10,
		'Cache_query'       => 10,
		'Content_importer'  => 1,
		'File_importer'     => 1,
		'Channel_purge'     => 1,
		'Directory'         => 1
	];

	private static function qbegin($tablename) {
		switch (ACTIVE_DBTYPE) {
			case DBTYPE_MYSQL:
				q('BEGIN');
				q('LOCK TABLE ' . $tablename . ' WRITE');
				break;

			case DBTYPE_POSTGRES:
				q('BEGIN');
				//q('LOCK TABLE '.$tablename.' IN ACCESS EXCLUSIVE MODE');
				break;
		}
		return;
	}

	private static function qcommit() {
		switch (ACTIVE_DBTYPE) {
			case DBTYPE_MYSQL:
				q("UNLOCK TABLES");
				q("COMMIT");
				break;

			case DBTYPE_POSTGRES:
				q("COMMIT");
				break;
		}
		return;
	}

	private static function qrollback() {
		switch (ACTIVE_DBTYPE) {
			case DBTYPE_MYSQL:
				q("ROLLBACK");
				q("UNLOCK TABLES");
				break;

			case DBTYPE_POSTGRES:
				q("ROLLBACK");
				break;
		}
		return;
	}

	public static function Summon(&$argv) {

		if ($argv[0] !== 'Queueworker') {

			$priority = 0; // @TODO allow reprioritization

			if(isset(self::$default_priorities[$argv[0]])) {
				$priority = self::$default_priorities[$argv[0]];
			}

			$workinfo      = ['argc' => count($argv), 'argv' => $argv];
			$workinfo_json = json_encode($workinfo);
			$uuid          = self::getUuid($workinfo_json);

			$r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
				dbesc($uuid)
			);
			if ($r) {
				logger("Summon: Ignoring duplicate workerq task", LOGGER_DEBUG);
				logger(print_r($workinfo,true));
				$argv = [];
				return;
			}

			self::qbegin('workerq');
			$r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
				intval($priority),
				$workinfo_json,
				dbesc($uuid)
			);
			if (!$r) {
				self::qrollback();
				logger("INSERT FAILED", LOGGER_DEBUG);
				return;
			}
			self::qcommit();
			logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
		}
		$argv = [];

		$workers = self::GetWorkerCount();
		if ($workers < self::$maxworkers) {
			logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
			$phpbin = get_config('system', 'phpbin', 'php');
			proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
		}
	}

	public static function Release(&$argv) {

		if ($argv[0] !== 'Queueworker') {

			$priority = 0; // @TODO allow reprioritization
			if(isset(self::$default_priorities[$argv[0]])) {
				$priority = self::$default_priorities[$argv[0]];
			}

			$workinfo      = ['argc' => count($argv), 'argv' => $argv];
			$workinfo_json = json_encode($workinfo);
			$uuid          = self::getUuid($workinfo_json);

			$r = q("SELECT * FROM workerq WHERE workerq_uuid = '%s'",
				dbesc($uuid)
			);
			if ($r) {
				logger("Release: Duplicate task - do not insert.", LOGGER_DEBUG);
				logger(print_r($workinfo,true));

				$argv = [];
				return;
			}

			self::qbegin('workerq');
			$r = q("INSERT INTO workerq (workerq_priority, workerq_data, workerq_uuid) VALUES (%d, '%s', '%s')",
				intval($priority),
				$workinfo_json,
				dbesc($uuid)
			);
			if (!$r) {
				self::qrollback();
				logger("Insert failed: " . $workinfo_json, LOGGER_DEBUG);
				return;
			}
			self::qcommit();
			logger('INSERTED: ' . $workinfo_json, LOGGER_DEBUG);
		}
		$argv = [];
		self::Process();
	}

	public static function GetWorkerCount() {
		if (self::$maxworkers == 0) {
			self::$maxworkers = get_config('queueworker', 'max_queueworkers', 4);
			self::$maxworkers = self::$maxworkers > 3 ? self::$maxworkers : 4;
		}
		if (self::$workermaxage == 0) {
			self::$workermaxage = get_config('queueworker', 'max_queueworker_age');
			self::$workermaxage = self::$workermaxage > 120 ? self::$workermaxage : 300;
		}

		q("update workerq set workerq_reservationid = null where workerq_reservationid is not null and workerq_processtimeout < %s",
			db_utcnow()
		);

		usleep(self::$workersleep);
		$workers = dbq("select count(distinct workerq_reservationid) as total from workerq where workerq_reservationid is not null");
		logger("WORKERCOUNT: " . $workers[0]['total'], LOGGER_DEBUG);
		return intval($workers[0]['total']);
	}

	public static function GetWorkerID() {
		if (self::$queueworker) {
			return self::$queueworker;
		}
		$wid = uniqid('', true);
		usleep(mt_rand(500000, 3000000)); //Sleep .5 - 3 seconds before creating a new worker.
		$workers = self::GetWorkerCount();
		if ($workers >= self::$maxworkers) {
			logger("Too many active workers ($workers) max = " . self::$maxworkers, LOGGER_DEBUG);
			return false;
		}
		self::$queueworker = $wid;
		return $wid;
	}

	private static function getWorkId() {
		self::GetWorkerCount();

		self::qbegin('workerq');

		if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
			$work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED;");
		}
		else {
			$work = dbq("SELECT workerq_id FROM workerq WHERE workerq_reservationid IS NULL ORDER BY workerq_priority DESC, workerq_id ASC LIMIT 1;");
		}

		if (!$work) {
			self::qrollback();
			return false;
		}
		$id = $work[0]['workerq_id'];

		$work = q("UPDATE workerq SET workerq_reservationid = '%s', workerq_processtimeout = %s + INTERVAL %s WHERE workerq_id = %d",
			self::$queueworker,
			db_utcnow(),
			db_quoteinterval(self::$workermaxage . " SECOND"),
			intval($id)
		);

		if (!$work) {
			self::qrollback();
			logger("Could not update workerq.", LOGGER_DEBUG);
			return false;
		}
		logger("GOTWORK: " . json_encode($work), LOGGER_DEBUG);
		self::qcommit();
		return $id;
	}

	public static function Process() {
		if (!self::GetWorkerID()) {
			logger('Unable to get worker ID. Exiting.', LOGGER_DEBUG);
			killme();
		}

		$jobs   = 0;
		$workid = self::getWorkId();
		$load_average_sleep = false;
		self::$workersleep = get_config('queueworker', 'queue_worker_sleep');
		self::$workersleep = ((intval(self::$workersleep) > 100) ? intval(self::$workersleep) : 100);

		if (function_exists('sys_getloadavg') && get_config('queueworker', 'load_average_sleep')) {
			$load_average_sleep = true;
		}

		while ($workid) {

			if ($load_average_sleep) {
				$load_average = sys_getloadavg();
				self::$workersleep = intval($load_average[0]) * 10000;

				if (!self::$workersleep) {
					self::$workersleep = 100;
				}
			}

			usleep(self::$workersleep);

			self::qbegin('workerq');

			if (ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
				$workitem = q("SELECT * FROM workerq WHERE workerq_id = %d FOR UPDATE SKIP LOCKED",
					$workid
				);
			}
			else {
				$workitem = q("SELECT * FROM workerq WHERE workerq_id = %d",
					$workid
				);
			}

			self::qcommit();

			if (isset($workitem[0])) {
				// At least SOME work to do.... in case there's more, let's ramp up workers.
				$workers = self::GetWorkerCount();
				if ($workers < self::$maxworkers) {
					logger("Less than max active workers ($workers) max = " . self::$maxworkers . ".", LOGGER_DEBUG);
					$phpbin = get_config('system', 'phpbin', 'php');
					proc_run($phpbin, 'Zotlabs/Daemon/Master.php', ['Queueworker']);
				}

				$jobs++;
				logger("Workinfo: " . $workitem[0]['workerq_data'], LOGGER_DEBUG);

				$workinfo = json_decode($workitem[0]['workerq_data'], true);
				$argv     = $workinfo['argv'];

				$cls  = '\\Zotlabs\\Daemon\\' . $argv[0];
				$argv = flatten_array_recursive($argv);
				$argc = count($argv);
				$cls::run($argc, $argv);

				// @FIXME: Right now we assume that if we get a return, everything is OK.
				// At some point we may want to test whether the run returns true/false
				// and requeue the work to be tried again if needed.  But we probably want
				// to implement some sort of "retry interval" first.

				self::qbegin('workerq');
				q("delete from workerq where workerq_id = %d", $workid);
				self::qcommit();
			}
			else {
				logger("NO WORKITEM!", LOGGER_DEBUG);
			}
			$workid = self::getWorkId();
		}
		logger('Master: Worker Thread: queue items processed:' . $jobs, LOGGER_DEBUG);
	}

	public static function ClearQueue() {
		$work = q("select * from workerq");
		while ($work) {
			foreach ($work as $workitem) {
				$workinfo = json_decode($workitem['v'], true);
				$argc     = $workinfo['argc'];
				$argv     = $workinfo['argv'];
				logger('Master: process: ' . print_r($argv, true), LOGGER_ALL, LOG_DEBUG);
				if (!isset($argv[0])) {
					q("delete from workerq where workerq_id = %d",
						$work[0]['workerq_id']
					);
					continue;
				}
				$cls = '\\Zotlabs\\Daemon\\' . $argv[0];
				$cls::run($argc, $argv);
				q("delete from workerq where workerq_id = %d",
					$work[0]['workerq_id']
				);
				usleep(300000);
				//Give the server .3 seconds to catch its breath between tasks.
				//This will hopefully keep it from crashing to it's knees entirely
				//if the last task ended up initiating other parallel processes
				//(eg. polling remotes)
			}
			//Make sure nothing new came in
			$work = q("select * from workerq");
		}
		return;
	}

	/**
	 * @brief Generate a name-based v5 UUID with custom namespace
	 *
	 * @param string $data
	 * @return string $uuid
	 */
	private static function getUuid($data) {
		$namespace = '3a112e42-f147-4ccf-a78b-f6841339ea2a';
		try {
			$uuid = Uuid::uuid5($namespace, $data)->toString();
		} catch (UnableToBuildUuidException $e) {
			logger('UUID generation failed');
			return '';
		}
		return $uuid;
	}

}