Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active October 11, 2023 19:07
Show Gist options
  • Select an option

  • Save krakjoe/3c4efb20e14db01bb0b9bc88dd14ff7c to your computer and use it in GitHub Desktop.

Select an option

Save krakjoe/3c4efb20e14db01bb0b9bc88dd14ff7c to your computer and use it in GitHub Desktop.

Revisions

  1. krakjoe created this gist May 4, 2019.
    76 changes: 76 additions & 0 deletions example.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    <?php
    use \parallel\{Runtime, Channel};

    class ExecutorService {

    public function __construct(int $workers, string $channel = __CLASS__, int $backlog = Channel::Infinite) {
    if ($backlog == 0) {
    /*
    * execute() will block until a worker is ready
    */
    $this->channel = Channel::make($channel);
    } else {
    /*
    * execute() will not block (until backlog is reached)
    */
    $this->channel = Channel::make($channel, $backlog);
    }

    for ($worker = 0; $worker < $workers; $worker++) {
    $this->workers[$worker] = new Runtime;
    $this->workers[$worker]->run(
    Closure::fromCallable(
    [self::class, "__thread"]),
    [(string)$this->channel]);
    }
    }

    private static function __thread(string $channel) {
    $channel = Channel::open($channel);

    while (($job = $channel->recv())) {
    /*
    * Missing try {} for simplicity
    */
    ($job["function"])(...$job["argv"]);
    }
    }

    public function execute(Closure $closure, array $argv) {
    $this->channel->send([
    "function" => $closure,
    "argv" => $argv
    ]);
    }

    public function __destruct() {
    /*
    * Notify workers to shutdown
    */
    foreach ($this->workers as $worker) {
    $this->channel->send(false);
    }

    /*
    * Close workers
    */
    foreach ($this->workers as $worker) {
    $worker->close();
    }

    /*
    * Close channel
    */
    $this->channel->close();
    }
    }

    $executor = new ExecutorService(4);

    while (++$i<1000) {
    $executor->execute(function($i){
    printf("{$i}: Hello from #%d\n",
    zend_thread_id());
    }, [$i]);
    }
    ?>