Skip to content

Instantly share code, notes, and snippets.

@gggeek
Created July 9, 2013 09:59
Show Gist options
  • Save gggeek/5956177 to your computer and use it in GitHub Desktop.
Save gggeek/5956177 to your computer and use it in GitHub Desktop.

Revisions

  1. gggeek created this gist Jul 9, 2013.
    200 changes: 200 additions & 0 deletions ProcessManager.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,200 @@
    <?php
    /**
    * A simple process manager, forking jobs to run in parallel. Works on linux and windows.
    *
    * @copyright G. Giunta
    * @license GPL v2
    *
    * @todo add more methods? f.e. one to kill any the executing processes
    */

    namespace Forlagshuset\RecommendationBundle\Entities;


    class ProcessManager
    {
    protected $toRun = array();
    protected $pipes = array();
    protected $childProcs = array();
    protected $childResults = array();
    protected $startTimes = array();
    //protected $pause = 1000;
    protected $maxParallel = 0;

    /**
    * @todo add some options, f.e. throw an exception if any command can not start, or buffers for pipes
    */
    public function __construct()
    {

    }

    /**
    * Runs commands in parallel, waiting until all of them terminate.
    * @param array $commands array of strings. Do not forget to escape them while building them!
    * @param int $maxParallel set to 0 for no-limit
    * @param int $poll in microseconds, how long to sleep between polling for process termination
    * @return array
    * @throws \Exception
    */
    public function runParallel( array $commands, $maxParallel = 0, $poll = 1000000 )
    {
    if ( !count( $commands ) )
    {
    throw new \Exception( "Can not run in parallel 0 commands" );
    }

    $this->startChildren( $commands, $maxParallel );
    do
    {
    // it's a good idea to sleep a while before we check pipes for the 1st time
    usleep( $poll );
    }
    while( $this->waitFor() > 0 );

    return $this->getResults();
    }

    /**
    * Starts commands in parallel - with a maximum parallel level (other commands are queued).
    * @param array $commands array of strings. Do not forget to escape them while building them!
    * @param int $maxParallel set to 0 for no-limit
    * @return int the number of processes started
    *
    * @see runParallel for an example loop using this function
    */
    public function startChildren( array $commands, $maxParallel = 0 )
    {
    $this->toRun = $commands;
    $commandCount = count( $this->toRun );
    $this->startTimes = array();
    $this->pipes = array();
    $this->childProcs = array();
    $this->childResults = array_fill( 0, $commandCount, null );
    if ( $maxParallel <= 0 || $maxParallel > $commandCount )
    {
    $maxParallel = $commandCount;
    }
    $this->maxParallel = $maxParallel;

    $started = 0;
    for ( $i = 0; $i < $maxParallel; $i++ )
    {
    if ( $this->startChild( $i ) )
    {
    $started++;
    }
    }

    return $started;
    }

    /**
    * Checks if there are any child commands finished. If there are any queued, starts them
    * @return int number of running processes
    *
    * @see runParallel for an example loop using this function
    */
    public function waitFor()
    {
    $running = 0;

    $time = microtime( true );

    for ( $i = 0; $i < count( $this->childProcs ); $i++ )
    {
    if ( $this->childProcs[$i] !== false )
    {
    /// @todo see note from Lachlan Mulcahy on http://www.php.net/manual/en/function.proc-get-status.php:
    /// to make sure buffers are not blocking children, we should read rom their pipes every now and then
    /// (but not on windows, since pipes are blocking and can not be timedout, see https://bugs.php.net/bug.php?id=54717)
    $status = proc_get_status( $this->childProcs[$i] );
    if ( $status['running'] == false )
    {
    $this->childResults[$i] = array_merge( $status, array(
    'output' => stream_get_contents( $this->pipes[$i][1] ),
    'error' => stream_get_contents( $this->pipes[$i][2] ),
    'return' => proc_close( $this->childProcs[$i] ),
    'starttime' => $this->startTimes[$i],
    'stoptime' => $time
    ) );
    $this->childProcs[$i] = false;
    }
    else
    {
    $this->childResults[$i] = $status;
    $running++;
    }
    }
    }

    $started = count( $this->childProcs );
    if ( $started < count( $this->toRun ) && $running < $this->maxParallel )
    {
    for( $i = $running, $j = $started; $i < $this->maxParallel; $i++, $j++ )
    {
    if ( $this->startChild( $j ) )
    {
    $running++;
    }
    }
    }

    return $running;
    }

    /**
    * Checks if child process i is running
    * @param int $i
    * @return bool
    */
    public function isRunning( $i )
    {
    if ( $i >= count( $this->childProcs ) || $this->childProcs[$i] == false )
    return false;
    $status = proc_get_status( $this->childProcs[$i] );
    return $status['running'];
    }

    /**
    * Returns true if child process i was started (at least tried to)
    * @param int $i
    * @return bool
    */
    public function wasStarted( $i )
    {
    return ( $i < count( $this->childProcs ) && $i >= 0 );
    }

    /**
    * Get results for either one process or all of them.
    * It can be used to retrieve f.e. the pid of a particular command, after waitFor has been called at least once
    * @param integer $i
    * @return array
    */
    public function getResults( $i = null )
    {
    return $i !== null ? $this->childResults[$i] : $this->childResults;
    }

    protected function startChild( $i )
    {
    $this->startTimes[$i] = microtime( true );

    $this->pipes[$i] = null;
    $this->childProcs[$i] = proc_open(
    $this->toRun[$i],
    /// @todo test if error pipe should use 'a' or 'w' on linux
    array( array( 'pipe','r' ), array( 'pipe','w' ), array( 'pipe', 'w' ) ),
    $this->pipes[$i]
    );
    fclose( $this->pipes[$i][0] );

    if ( !$this->childProcs[$i] )
    {
    return false;
    }

    return true;
    }
    }