callbacks = $callbacks; $this->concurrency = $concurrency; $this->results = new Table(count($callbacks)); $this->results->column('result', Table::TYPE_STRING, 9999999); $this->results->create(); return $this ->spawnWorkers($this->concurrency) ->loadJobs() ->waitWorkers() ->getResult(); } private function getResult(): array { $results = []; foreach ($this->results as $result) { $results[] = unserialize($result['result']); } $this->results->destroy(); return $results; } private function waitWorkers(): self { foreach ($this->workers as $key => $worker) { $data = unserialize($worker->pop()); if ($data['command'] === 'exit') { unset($this->workers[$key]); continue; } throw new LogicException('Unknown command'); } return $this; } private function loadJobs(): self { $keys = array_keys($this->callbacks); foreach ($keys as $key) { $workerId = $key % $this->concurrency; $this->workers[$workerId]->push(serialize([ 'command' => 'process', 'id' => $key, ])); } usleep(500); foreach ($this->workers as $worker) { $worker->push(serialize([ 'command' => 'exit', ])); } return $this; } private function spawnWorkers(int $instances): self { if ($instances <= 0) { throw new LogicException('Invalid instance count'); } $this->workers = array_map(function (): Process { $process = new Process(function (Process $worker): void { while (true) { $data = unserialize($worker->pop()); if ($data['command'] === 'exit') { // printf("{$worker->pid} exiting\n"); $worker->push(serialize([ 'command' => 'exit', ])); exit(0); } if ($data['command'] !== 'process') { throw new LogicException('Unknown command'); } $callbackId = $data['id']; $callback = $this->callbacks[$callbackId]; $this->results->set((string) $callbackId, [ 'result' => serialize($callback($worker)), ]); } }, false, 0); $process->useQueue(); $process->start(); return $process; }, range(0, $instances-1)); return $this; } }