task("hello"); }); }, function($data){//onTask: 任务进程对数据做处理并返回 return $data.' world'; }, function($data, $pid){//onFinish: 获取任务进程处理完的数据 $this->log($pid.': '.$data); } ); $taskServer->start(); */ class TaskServer extends \Swoole\SwooleAbstract { protected $_workerNum; protected $_onMaster; protected $_onTask; protected $_onFinish; protected $_pid; protected $_timerIds; protected $_workers; public function __construct($workNum=3, $onMaster=null, $onTask=null, $onFinish=null){ $this->_workerNum = $workNum; $this->_onMaster = $onMaster; $this->_onTask = $onTask; $this->_onFinish = $onFinish; } public function start() { try { //守护进程 //swoole_process::daemon(false, true); //主进程 $pid = getmypid(); $this->_pid = $pid; $this->log('master pid:' . $pid . ' start.'); swoole_set_process_name("taskServer: master {$pid}"); //创建子进程 for ($i = 0; $i < $this->_workerNum; $i++) { $this->createProcess(); } //定时 $this->_timerIds[] = swoole_timer_tick(1000, function () { //检查工作进程数量 if (count($this->_workers) < $this->_workerNum) { $this->createProcess(); } //回收工作进程 $ret = swoole_process::wait(false); if ($ret) { $pid = $ret['pid']; $this->destryProcess($pid); $this->log("worker down wait worker process exist:{$ret['signal']} pid:" . $pid); } }); //主进程关闭 swoole_process::signal(SIGTERM, function ($signo) { $pid = getmypid(); $this->log("master process exist rev:{$signo} pid:" . $pid); //清理所有定时任务 foreach ($this->_timerIds as $timerId) { swoole_timer_clear($timerId); } //清理工作进程 foreach ($this->_workers as $pid => $process) { $process->kill($pid, SIGTERM); } //等待工作进程退出 while (true) { //等待工作进程结束 $count = count($this->_workers); if ($count > 0) { $this->log("wait worker num: " . $count); $ret = swoole_process::wait(false); if ($ret) { $pid = $ret['pid']; $this->destryProcess($pid); $this->log("master down wait worker process exist:{$ret['signal']} pid:" . $pid); } } else { break; } } exit(); }); //主进程任务 if(is_callable($this->_onMaster)){ $fun = $this->_onMaster; $fun($this); } } catch (\Exception $e) { die('server error: ' . $e->getMessage()); } } public function task($data) { $processKey = array_rand($this->_workers); $process = $this->_workers[$processKey]; //$result = $process->write($data . " {$process->pid}"); $result = $process->write($data); if (!$result) { $this->log("write fail pid: " . $process->pid); } } protected function destryProcess($pid) { $process = $this->_workers[$pid]; swoole_event_del($process->pipe); //清理监听 $process->close(); unset($this->_workers[$pid]); //移除进程管理 } protected function createProcess() { $process = new swoole_process(function (swoole_process $process) { $this->log('worker pid:' . $process->pid . ' start.'); $process->name("taskServer: worker {$process->pid}"); //子进程内收到主进程消息 swoole_event_add($process->pipe, function ($pipe) use ($process) { $this->onTask($pipe, $process); }); //主进程退出,工作进程跟着退出 swoole_timer_tick(1000, function () { if (!swoole_process::kill($this->_pid, 0)) { // 主进程已退出 $pid = getmypid(); $this->log("master process exist " . $this->_pid . " worker process exist " . $pid); exit(); } }); swoole_process::signal(SIGSEGV, function ($signo) use ($process) { //子进程 $pid = getmypid(); //子进程退出 $this->log("worker process exist rev:{$signo} pid:" . $pid); }); //子进程收到信号 swoole_process::signal(SIGTERM, function ($signo) use ($process) { //子进程 $pid = getmypid(); //移除监听 swoole_event_del($process->pipe); $process->close(); //子进程退出 $this->log("worker process exist rev:{$signo} pid:" . $pid); $process->exit(); exit(); }); }); $pid = $process->start(); if ($pid < 1) { $this->log("worker process create fail"); return 0; } //收录子进程 $this->_workers[$pid] = $process; //主进程收到子进程消息 swoole_event_add($process->pipe, function ($pipe) use ($process) { $this->onFinish($pipe, $process); }); return $pid; } //工作进程收到数据 protected function onTask($pipe, $process) { if (!$process instanceof \swoole_process) { $this->log("fail 2"); } $data = $process->read(); $result=''; if(is_callable($this->_onTask)){ $fun = $this->_onTask; $result = $fun($data); }else{ //$this->log("RECV: " . $data); //$result = "hello master {$process->pid}"; } if(strlen($result)>0){ $process->write($result); } } //工作进程处理完业务后返回数据,函数根据pipe确定具体工作进程后读取数据 protected function onFinish($pipe, $process) { if (!$process instanceof \swoole_process) { $this->log("fail 1"); } $data = $process->read(); if(is_callable($this->_onFinish)){ $fun = $this->_onFinish; $fun($data, $process->pid); }else{ //$this->log("RECV: " . $data); } } }