* @license PHP License * @package WB * @subpackage base */ WBClass::load('WBEvent' , 'WBObserver_Observable'); /** * Event_Queue Processor * * * * @version 0.1.0 * @package WB * @subpackage base */ class WBEvent_Processor extends WBStdClass implements WBObserver_Observable { /** * list of observer * @var array */ protected $observers = array(); /** * table onject to access event queue * @var WBDatasource_Table */ protected $table; /** * config loader * @var WEConfig */ protected $config; /** * Event Queue * @var WBEvent_Queue */ protected $queue; /** * current event processing position * @var int */ protected $procPos = 0; /** * numer of events to process * @var int */ protected $procCount = 0; /** * constructor * * init table access and verify event.xml */ public function __construct() { WBClass::load('WBLog' , 'WBClock'); $this->table = WBClass::create('WBDatasource_Table'); $this->queue = WBEvent_Queue::create(); } /** * execute all queued events * * @return int number of executed events */ public function executeAll() { $clause = array(); $clause[] = array( 'field' => 'failed', 'value' => 0 ); return $this->executeByClause($clause); } /** * process complete namespace * * @param string $ns * @return int number of executed events */ public function executeNamespace($ns = null) { $clause = array(); $clause[] = array( 'field' => 'failed', 'value' => 0 ); $clause[] = array( 'field' => 'namespace', 'value' => $ns ); return $this->executeByClause($clause); } /** * process single event * * Load queued event regardless of namespace or failure * * @param string $id event queue's id * @return int number of executed events */ public function executeId($id) { $clause = array(); $clause[] = array( 'field' => $this->table->getIdentifier('eventqueue'), 'value' => $id ); return $this->executeByClause($clause); } /** * process events by clause * * Load all queued events from table and process them. Use namespace parameter to * tell which namespace to process. Default is empty namespace. This method also * allows $ns = null to process all namespaces. * * @param array $clause * @return int $count numner of processed events */ protected function executeByClause($clause) { // track processing position $count = $this->table->count('eventqueue', null, null, $clause); $this->pos = 0; $this->count = $count; // fetch only 1000 per round to save memory $options = array('limit' => 1000); while ($this->pos < $this->count) { $list = $this->table->getIds('eventqueue', null, $clause, $options); foreach ($list as $id) { ++$this->pos; $this->queue->process($id); $this->notify($this); } } $this->count = 0; $this->pos = 0; return $count; } /** * Get List of Queued Events * * @param string $ns * @param int failed * @return array */ public function getList($ns = null, $failed = null) { $clause = array(); if (null !== $failed) { $clause[] = array( 'field' => 'failed', 'value' => intval($failed) ); } if (null !== $ns) { $clause[] = array( 'field' => 'namespace', 'value' => $ns ); } return $this->table->get('eventqueue', null, null, $clause); } /** * remove event * * @param string $id event queue's id * @return int number of executed events */ public function remove($id) { $this->table->delete('eventqueue', $id); } /** * fetch current processing position * * @return int */ public function getPosition() { return $this->pos; } /** * get number of events to process * * @return int */ public function getCount() { return $this->count; } /** * attach observer * * @see WBObserver_Observerable * @param WBObserver $observer */ public function attach(WBObserver $observer) { $this->observers[$observer->getObjectId()] = $observer; } /** * detach observer * * @see WBObserver_Observerable * @param WBObserver $observer */ public function detach(WBObserver $observer) { unset($this->observers[$observer->getObjectId()]); } /** * notify observer * */ public function notify() { foreach ($this->observers as $o) { $o->update($this); } } }