* @license PHP License * @package WB * @subpackage base */ WBClass::load('WBDaemon' , 'WBClock'); /** * Generic forking socket server daemon * * @version 0.3.0 * @package WB * @subpackage base */ class WBDaemon_SocketServer extends WBDaemon { /** * server config file * @var string */ protected $configFile = 'socketserver'; /** * corrent configuration * @var WBConfig */ protected $config; /** * listening server socket * @var resource */ protected $server; /** * actual connected client * @var resource */ protected $client; /** * list of child processes * @var array */ protected $children = array(); /** * CSV string read and writer helper variables * @var string */ protected $csvDelimiter = ';'; /** * CSV string read and writer helper variables * @var string */ protected $csvEnclosure = '"'; /** * 2nd constructor * */ protected function init() { $this->config = WBClass::create('WBConfig'); $this->config->load($this->configFile); $this->installSocket(); return true; } /** * stop daemon */ protected function halt() { $this->uninstallSocket(); $log = array( 'action' => 'shutdown', 'pid' => 0, 'msg' => 'Kill child process' ); foreach ($this->children as $pid => $info) { $log['pid'] = $pid; $this->log->info($log); posix_kill($pid, SIGTERM); } return true; } /** * hangup * * Reload config and restart listening socket */ protected function hangUp() { $this->uninstallSocket(); $this->config->reload($this->configFile); $this->installSocket(); return true; } /** * create listening socket * * Create streaming server socket ready to accept connections * * @see uninstallSocket() * @throws WBException_Config */ protected function installSocket() { if ($this->server) { return; } // server socket $proto = 'tcp'; $context = stream_context_create(); if ($this->config->get('server/ssl', 0)) { $proto = 'ssl'; // server certificate $cert = sprintf('/resource/ssl/%s', $this->config->get('server/sslcert', 'server.pem')); $file = WBClass::create('WBFile'); if (!$file->exists($cert)) { WBClass::load('WBException_Config'); throw new WBException_Config('SSL certificate and key ('.$cert.') not found', 1, __CLASS__); } // add server certificate and add passphrase - optionally //stream_context_set_option($context, 'ssl', 'passphrase', 'secret'); stream_context_set_option($context, 'ssl', 'local_cert', $file->realpath()); stream_context_set_option($context, 'ssl', 'allow_self_signed', true); stream_context_set_option($context, 'ssl', 'verify_peer', false); } $ip = $this->config->get('server/ip', '127.0.0.1'); $port = $this->config->get('server/port', '8000'); $address = sprintf('%s://%s:%s', $proto, $ip, $port); $this->server = stream_socket_server($address , $errno , $errstr , STREAM_SERVER_BIND|STREAM_SERVER_LISTEN , $context ); if (!$this->server) { WBClass::load('WBException_Config'); throw new WBException_Config('Could not bind server to socket: ' . $errstr, 2, __CLASS__); } $log = array( 'action' => 'bind', 'pid' => getmypid(), 'msg' => 'Start listening to server socket', 'protocol' => $proto, 'ip' => $ip, 'port' => $port ); $this->log->notice($log); stream_set_timeout($this->server, $this->config->get('server/timeout', 60)); stream_set_blocking($this->server, 1); } /** * remove listening socket * * @see installSocket(); */ protected function uninstallSocket() { if (!$this->server) { return; } $log = array( 'action' => 'release', 'pid' => getmypid(), 'msg' => 'Stop listening to server socket', ); $this->log->notice($log); fclose($this->server); $this->server = null; } /** * main loop to run daemon process * * Accept incomming connections and fork worker to process each client. * * * @see processClient(); * @see max */ protected function process() { while (true) { $log = array( 'action' => 'process', 'pid' => $this->pid, 'msg' => 'timeout' ); // cleanup child list $child = pcntl_wait($status, WNOHANG); if ($child > 0) { $status = pcntl_wexitstatus($status); $this->handleChildExit($child, $status); } // accept connection via socket $this->client = @stream_socket_accept($this->server, $this->config->get('server/timeout', 60)); if (!$this->client) { $this->log->debug($log); continue; } // log client's address and connection port $log['peer'] = stream_socket_get_name($this->client, true); if (!empty($log['peer']) && strstr($log['peer'], ':')) { list($log['peer'], $log['port']) = explode(':', $log['peer']); } // count worker processes if (count($this->children) >= $this->config->get('server/maxclient', 10)) { $log['pid'] = $this->pid; $log['msg'] = 'Maximum number of clients reached'; $log['max'] = $this->config->get('server/maxclient', 10); $this->log->warn($log); $this->maxClientDeny(); fclose($this->client); continue; } // fork worker process $pid = pcntl_fork(); if ($pid < 0) { $log['msg'] = 'Failed to fork child process'; $this->log->warn($log); continue; } if ($pid > 0) { $log['pid'] = $pid; $log['msg'] = 'Forked child process'; $this->log->notice($log); $this->children[$pid] = array( 'pid' => $pid, 'start' => WBClock::now() ); // close stream in mother process fclose($this->client); continue; } // only worker child processes will reach this point break; } // process client end exist child pcntl_signal(SIGHUP, SIG_DFL); pcntl_signal(SIGINT, SIG_DFL); pcntl_signal(SIGTERM, SIG_DFL); pcntl_signal(SIGCLD, SIG_DFL); stream_set_timeout($this->client, $this->config->get('server/timeout', 60)); stream_set_blocking($this->client, 1); $this->processClient(); $this->running = false; fclose($this->client); } /** * deny additional client when maximum has been reached * * Inform client that maximum number of clients was reached * This is just an example - override this funktion in actual server * implementation */ protected function maxClientDeny() { $this->write('Limit of client reached - come back later'); } /** * handler for exiting children * * Log event and decrease number of cliens * * @param int $pid * @param int $status */ protected function handleChildExit($pid, $status) { $start = 0; if (isset($this->children[$pid])) { $start = $this->children[$pid]['start']; unset($this->children[$pid]); } $log = array( 'action' => 'exit', 'pid' => $pid, 'msg' => 'Child exit', 'status' => $status, 'elapsed' => WBClock::stop($start, 1, 0) ); $this->log->info($log); } /** * process client's request * * This function gets called whenever a new connection was established. * This function is meant to implement whatever your server does. * * This is sort of the main function. If this function returns, the connection * to the client will be closed and the process exits. */ protected function processClient() { $welcome = array('Welcome'); $welcome[] = 'pid: ' . getmypid(); $welcome[] = 'date: ' . gmdate('Y-m-d H:i:s'); $this->write($welcome); while (true) { $req = fgets($this->client, 8192); if ($req === false) { // nothing received after timeout @fputs($this->client, "Timeout\n"); break; } $req = explode(' ', trim($req)); $res = array(); if (empty($req[0])) { continue; } switch (strtolower($req[0])) { case 'timestamp': $res[] = gmdate('Y-m-d H:i:s'); break; case 'date': $res[] = gmdate('Y-m-d'); break; case 'time': $res[] = gmdate('H:i:s'); break; case 'ping': $res[] = 'pong'; break; case 'version': $res[] = '0.1.0'; break; case 'echo': array_shift($req); $res[] = implode( ' ', $req); array_unshift($req, 'echo'); break; case 'help': $res[] = 'Available commands:'; $res[] = ' date'; $res[] = ' time'; $res[] = ' timestamp'; $res[] = ' ping'; $res[] = ' echo [WORD]...'; $res[] = ' version'; $res[] = ' quit'; break; case 'quit': $res[] = 'Bye Bye!'; break; default: $res[] = 'unknown command!'; break; } $this->write($res); if (strtolower($req[0]) == 'quit') { break; } } } /** * read client data * * Helper function to ease fgets() * * @return array */ protected function read() { $lines = array(); while (true) { $line = @fgets($this->client, 8192); if ($line === false || trim($line) == '') { return $lines; } $lines[] = trim($line); } return $lines; } /** * send one or more lines to client * * Helper function to ease fputs() * * @param string|array $lines */ protected function write($lines) { if (!is_array($lines)) { $lines = array($lines); } @fputs($this->client, implode("\n", $lines) . "\n\n"); } /** * read client data and and treat as CSV * * Helper function to ease fgetcsv() * * @return array */ protected function readCSV() { $lines = array(); while (true) { $line = @fgetcsv($this->client, 8192, $this->csvDelimiter, $this->csvEnclosure); if ($line === false || empty($line) || empty($line[0])) { return $lines; } $lines[] = $line; } return $lines; } /** * send one or more lines to client as CSV data * * Helper function to ease fputcsv() * * @param array $lines */ protected function writeCSV($lines) { foreach ($lines as $line) { @fputcsv($this->client, $line, $this->csvDelimiter, $this->csvEnclosure); } @fputs($this->client, "\n"); } } ?>