* @license PHP License * @package WB * @subpackage base */ WBClass::load('WBEvent'); /** * Event_Queue * * * * @version 0.2.1 * @package WB * @subpackage base */ class WBEvent_Queue extends WBStdClass { /** * singleton instance * @var WBEvent_Queue */ protected static $instance = null; /** * table onject to access event queue * @var WBDatasource_Table_EventQueue */ protected $table; /** * list of known matches and actions * @var array */ protected $actions; /** * config loader * @var WEConfig */ protected $config; /** * logger * @var WBLog */ protected $log; /** * current event processing position * @var int */ protected $procPos = 0; /** * numer of events to process * @var int */ protected $procCount = 0; /** * get singleton instance * * @return WBEvent_Queue */ public static function create() { if (self::$instance) { return self::$instance; } self::$instance = new WBEvent_Queue(); return self::$instance; } /** * constructor * * init table access and verify event.xml */ private function __construct() { WBClass::load('WBLog' , 'WBClock'); $this->log = WBLog::start(__CLASS__); $this->table = WBClass::create('WBDatasource_Table_EventQueue'); $this->config = WBClass::create('WBConfig'); $this->config->load('event'); $this->actions = $this->config->get('actions', array()); if (!is_array($this->actions)) { $this->actions = array(); return; } // mormalize list of actions foreach ($this->actions as &$a) { if (!isset($a['match']) || !isset($a['chains'])) { $e = array( 'msg' => 'Error in event.xml - check "match" and "chains"', 'code' => 1, 'class' => __CLASS__ ); throw WBClass::create('WBException_Config', $e); } } } /** * deny cloning object * */ private function __clone() { } /** * add event to queue * * store event information in queue * * @param string $name * @param string $msg * @param array $data * @param string $timestamp in ISO format */ public function add($name, $msg, $data, $timestamp) { $actions = $this->getActions($name); if (empty($actions)) { return null; } // log add event $log = array( 'action' => 'add', 'namespace' => '', 'name' => $name, 'chain' => '' ); foreach ($actions as $a) { // normalize "now", "async" and "namespace" if (isset($a['now'])) { $a['now'] = true; } else { $a['now'] = false; } if (isset($a['async'])) { $a['async'] = true; } else { $a['async'] = false; } if (!isset($a['namespace']) || empty($a['namespace'])) { $a['namespace'] = ''; } $save = array( 'name' => $name, 'action' => $a['action'], 'namespace' => $a['namespace'], 'message' => $msg, 'data' => serialize($data), 'created' => $timestamp ); $id = $this->table->save('eventqueue', '__new', $save); $log['namespace'] = $a['namespace']; $log['chain'] = $a['action']; $this->log->debug($log); // process immediatly if ($a['now']) { $this->process($id); continue; } // process immediatly but assynchroniously if ($a['async']) { $this->processAsync($id); continue; } } } /** * Create Event Object * * @param array action data * @return WBEvent */ public function action2Event($action) { if (empty($action['name']) || empty($action['data']) || empty($action['message']) || empty($action['created'])) { $e = array( 'msg' => 'Event action data insufficient', 'code' => 2, 'class' => __CLASS__ ); throw WBClass::create('WBException_Argument', $e); } // event data $data = unserialize($action['data']); // create event onject WBClass::load('WBEvent'); return WBEvent::create($action['name'], $action['message'], $data, $action['created']); } /** * process single event action chain * * load chain from queue. Create event object and call all * event handler. * * The chain will be marked as failed when exception was caught. * * Handlers will processed in same order they appear in chain config. * Process chain as long as handlers return "true", break on "false" * * @param string $id */ public function process($id) { if (empty($id)) { return; } $now = WBClock::now(); // load action $action = $this->table->get('eventqueue', $id); $action = $action[0]; $log = array( 'action' => 'processid', 'namespace' => $action['namespace'], 'id' => $id, 'chain' => $action['action'], 'actions' => 0, 'elapsed' => '0ms', 'msg' => 'OK' ); // load chain if (!$this->config->load('event/' . $action['action'], true)) { $log['msg'] = sprintf('Could not load action chain from "%s"', $action['action']); $log['elapsed'] = WBClock::stop($now, 1000, 1) . 'ms'; $this->log->err($log); $save = array('failed' => 1); $this->table->save('eventqueue', $id, $save); return; } $chain = $this->config->get('chain', array()); if (!is_array($chain)) { $chain = array(); } $log['actions'] = count($chain); $this->log->info($log); $event = $this->action2Event($action); // process chain foreach ($chain as $c) { if (!isset($c['handler'])) { continue; } if (!isset($c['params']) || !is_array($c['params'])) { $c['params'] = array(); } // start handler try{ $h = WBClass::create('WBEvent_Handler_' . $c['handler']); $h->setConfig($c['params']); } catch(Exception $e) { $log['msg'] = sprintf('Failed to instantiate event handler "%s"', $c['handler']); $log['elapsed'] = WBClock::stop($now, 1000, 1) . 'ms'; $this->log->err($log); $save = array('failed' => 1); $this->table->save('eventqueue', $id, $save); return; } // process event and see whether to break try{ $eLog = array( 'action' => 'process', 'handler' => $c['handler'], 'result' => intval($h->process($event)) ); $this->log->info($eLog); if (1 > $eLog['result']) { break; } } catch(Exception $e) { // get processing time $log['elapsed'] = WBClock::stop($now, 1000, 1) . 'ms'; // if error is recoverable retry next time if ($h->isRecoverable()) { $log['msg'] = sprintf('Caught recoverable exception while processing "%s"', $c['handler']); $this->log->err($log); return; } $log['msg'] = sprintf('Caught exception while processing "%s"', $c['handler']); $this->log->err($log); $eLog = array( 'action' => 'exception', 'handler' => $c['handler'], 'code' => $e->getCode(), 'msg' => $e->getMessage() ); $this->log->notice($eLog); $save = array('failed' => 1); $this->table->save('eventqueue', $id, $save); return; } } // delete action on success $log['elapsed'] = WBClock::stop($now, 1000, 1) . 'ms'; $this->log->warn($log); $this->table->delete('eventqueue', $id); } /** * Process Event Asynchroniously * * Execute event queue processor to * * @param string $id */ public function processAsync($id) { $base = WBParam::get('wb/dir/base'); $cmd = '%s/bin/eqp.php -q run "%s" >>/dev/null 2>/dev/null actions as $a) { if (preg_match($a['match'], $name)) { if (!isset($a['chains']) || empty($a['chains'])) { return array(); } if (!isset($a['chains'][0])) { $a['chains'] = array($a['chains']); } return $a['chains']; break; } } return array(); } }