Created
February 3, 2020 11:18
-
-
Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.
Revisions
-
dinamic created this gist
Feb 3, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,152 @@ <?php declare(strict_types=1); namespace Petkanski\Component\ProcessManager\Service; use LogicException; use Swoole\Process; use Swoole\Table; /** * Class ProcessManager * @package Petkanski\Component\ProcessManager\Service */ class ProcessManager { /** * @var Process[] */ private $workers; /** * @var callable[] */ private $callbacks; /** * @var Table */ private $results; /** * @var int */ private $concurrency; /** * @param iterable|callable[] $callbacks * @param int $concurrency * @return array */ public function execute(array $callbacks, int $concurrency = 3): array { $this->callbacks = $callbacks; $this->concurrency = $concurrency; $this->results = new Table(count($callbacks)); $this->results->column('result', Table::TYPE_STRING, 9999999); $this->results->create(); return $this ->spawnWorkers($this->concurrency) ->loadJobs() ->waitWorkers() ->getResult(); } private function getResult(): array { $results = []; foreach ($this->results as $result) { $results[] = unserialize($result['result']); } $this->results->destroy(); return $results; } private function waitWorkers(): self { foreach ($this->workers as $key => $worker) { $data = unserialize($worker->pop()); if ($data['command'] === 'exit') { unset($this->workers[$key]); continue; } throw new LogicException('Unknown command'); } return $this; } private function loadJobs(): self { $keys = array_keys($this->callbacks); foreach ($keys as $key) { $workerId = $key % $this->concurrency; $this->workers[$workerId]->push(serialize([ 'command' => 'process', 'id' => $key, ])); } usleep(500); foreach ($this->workers as $worker) { $worker->push(serialize([ 'command' => 'exit', ])); } return $this; } private function spawnWorkers(int $instances): self { if ($instances <= 0) { throw new LogicException('Invalid instance count'); } $this->workers = array_map(function (): Process { $process = new Process(function (Process $worker): void { while (true) { $data = unserialize($worker->pop()); if ($data['command'] === 'exit') { // printf("{$worker->pid} exiting\n"); $worker->push(serialize([ 'command' => 'exit', ])); exit(0); } if ($data['command'] !== 'process') { throw new LogicException('Unknown command'); } $callbackId = $data['id']; $callback = $this->callbacks[$callbackId]; $this->results->set((string) $callbackId, [ 'result' => serialize($callback($worker)), ]); } }, false, 0); $process->useQueue(); $process->start(); return $process; }, range(0, $instances-1)); return $this; } }