Skip to content

Instantly share code, notes, and snippets.

@dinamic
Created February 3, 2020 11:18
Show Gist options
  • Select an option

  • Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.

Select an option

Save dinamic/7994bc7d48d363c7603a4f6e2eced78c to your computer and use it in GitHub Desktop.

Revisions

  1. dinamic created this gist Feb 3, 2020.
    152 changes: 152 additions & 0 deletions SwooleProcessManager.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,152 @@
    <?php
    declare(strict_types=1);

    namespace Petkanski\Component\ProcessManager\Service;

    use LogicException;
    use Swoole\Process;
    use Swoole\Table;

    /**
    * Class ProcessManager
    * @package Petkanski\Component\ProcessManager\Service
    */
    class ProcessManager
    {
    /**
    * @var Process[]
    */
    private $workers;

    /**
    * @var callable[]
    */
    private $callbacks;

    /**
    * @var Table
    */
    private $results;

    /**
    * @var int
    */
    private $concurrency;

    /**
    * @param iterable|callable[] $callbacks
    * @param int $concurrency
    * @return array
    */
    public function execute(array $callbacks, int $concurrency = 3): array
    {
    $this->callbacks = $callbacks;
    $this->concurrency = $concurrency;

    $this->results = new Table(count($callbacks));
    $this->results->column('result', Table::TYPE_STRING, 9999999);
    $this->results->create();

    return $this
    ->spawnWorkers($this->concurrency)
    ->loadJobs()
    ->waitWorkers()
    ->getResult();
    }

    private function getResult(): array
    {
    $results = [];

    foreach ($this->results as $result) {
    $results[] = unserialize($result['result']);
    }

    $this->results->destroy();

    return $results;
    }

    private function waitWorkers(): self
    {
    foreach ($this->workers as $key => $worker) {
    $data = unserialize($worker->pop());

    if ($data['command'] === 'exit') {
    unset($this->workers[$key]);
    continue;
    }

    throw new LogicException('Unknown command');
    }

    return $this;
    }

    private function loadJobs(): self
    {
    $keys = array_keys($this->callbacks);

    foreach ($keys as $key) {
    $workerId = $key % $this->concurrency;
    $this->workers[$workerId]->push(serialize([
    'command' => 'process',
    'id' => $key,
    ]));
    }

    usleep(500);

    foreach ($this->workers as $worker) {
    $worker->push(serialize([
    'command' => 'exit',
    ]));
    }

    return $this;
    }

    private function spawnWorkers(int $instances): self
    {
    if ($instances <= 0) {
    throw new LogicException('Invalid instance count');
    }

    $this->workers = array_map(function (): Process {
    $process = new Process(function (Process $worker): void {
    while (true) {
    $data = unserialize($worker->pop());

    if ($data['command'] === 'exit') {
    // printf("{$worker->pid} exiting\n");

    $worker->push(serialize([
    'command' => 'exit',
    ]));

    exit(0);
    }

    if ($data['command'] !== 'process') {
    throw new LogicException('Unknown command');
    }

    $callbackId = $data['id'];

    $callback = $this->callbacks[$callbackId];

    $this->results->set((string) $callbackId, [
    'result' => serialize($callback($worker)),
    ]);
    }
    }, false, 0);

    $process->useQueue();
    $process->start();

    return $process;
    }, range(0, $instances-1));

    return $this;
    }
    }