Skip to content

Instantly share code, notes, and snippets.

@erkurn
Forked from jsor/Connection.php
Created April 27, 2018 07:12
Show Gist options
  • Select an option

  • Save erkurn/d8f81db0d3929b8ff6d110ec8781d4a6 to your computer and use it in GitHub Desktop.

Select an option

Save erkurn/d8f81db0d3929b8ff6d110ec8781d4a6 to your computer and use it in GitHub Desktop.

Revisions

  1. @jsor jsor revised this gist Dec 13, 2013. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion Connection.php
    Original file line number Diff line number Diff line change
    @@ -84,7 +84,7 @@ public function enqueue($job)
    {
    $deferred = new Deferred();

    $job->resolver = $deferred->resolver();
    $job->resolver = $deferred;

    $this->jobs->enqueue($job);

  2. @jsor jsor revised this gist Jan 9, 2013. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions test.php
    Original file line number Diff line number Diff line change
    @@ -7,8 +7,8 @@
    $connection = new Jsor\MysqlAsync\Connection($loop, array(
    'host' => 'localhost',
    'username' => 'root',
    'password' => 'local',
    'dbname' => 'bos_fahrzeuge'
    'password' => '',
    'dbname' => 'test'
    ));

    $errorCallback = function ($error) {
  3. @jsor jsor revised this gist Nov 23, 2012. 1 changed file with 50 additions and 0 deletions.
    50 changes: 50 additions & 0 deletions test.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    <?php

    include 'vendor/autoload.php';

    $loop = React\EventLoop\Factory::create();

    $connection = new Jsor\MysqlAsync\Connection($loop, array(
    'host' => 'localhost',
    'username' => 'root',
    'password' => 'local',
    'dbname' => 'bos_fahrzeuge'
    ));

    $errorCallback = function ($error) {
    echo "ERROR\n";
    print_r($error);
    echo "\n";
    };

    $connection
    ->connect()
    ->then(function($result) {
    echo "CONNECT\n";
    }, $errorCallback);

    $connection
    ->query("select 'query1', sleep(5)")
    ->then(function($result) {
    echo "RESULT1\n";
    print_r($result->fetch_row());
    }, $errorCallback);

    echo "STEP1\n";

    $connection
    ->query("SELECT 'query2'")
    ->then(function($result) {
    echo "RESULT2\n";
    print_r($result->fetch_row());
    }, $errorCallback);

    echo "STEP2\n";

    $connection
    ->end()
    ->then(function() {
    echo "END\n";
    }, $errorCallback);

    $loop->run();
  4. @jsor jsor created this gist Nov 23, 2012.
    154 changes: 154 additions & 0 deletions Connection.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,154 @@
    <?php

    namespace Jsor\MysqlAsync;

    use React\EventLoop\LoopInterface;
    use React\Promise\Deferred;

    class Connection
    {
    private $loop;
    private $parameters;
    private $mysqli;
    private $jobs;
    private $connectDeferred;

    public function __construct(LoopInterface $loop, array $parameters = array(), array $options = array())
    {
    $this->loop = $loop;
    $this->parameters = $parameters;

    $this->jobs = new \SplQueue();

    $this->mysqli = mysqli_init();

    foreach ($options as $name => $option) {
    $this->mysqli->options($name, $option);
    }
    }

    public function connect()
    {
    if ($this->connectDeferred) {
    return $this->connectDeferred->promise();
    }

    $this->connectDeferred = new Deferred();

    $host = isset($this->parameters['host']) ? $this->parameters['host'] : null;
    $username = isset($this->parameters['username']) ? $this->parameters['username'] : null;
    $password = isset($this->parameters['password']) ? $this->parameters['password'] : null;
    $dbname = isset($this->parameters['dbname']) ? $this->parameters['dbname'] : null;
    $port = isset($this->parameters['port']) ? $this->parameters['port'] : null;
    $socket = isset($this->parameters['socket']) ? $this->parameters['socket'] : null;

    @$this->mysqli->real_connect($host, $username, $password, $dbname, $port, $socket);

    if ($this->mysqli->connect_error) {
    $this->connectDeferred->reject($this->mysqli->connect_error);
    } else {
    $this->connectDeferred->resolve($this);
    }

    return $this->connectDeferred->promise();
    }

    public function query($sql)
    {
    $job = (object) array(
    'type' => 'query',
    'sql' => $sql
    );

    return $this
    ->connect()
    ->then(function ($connection) use ($job) {
    return $connection->enqueue($job);
    });
    }

    public function end()
    {
    $job = (object) array(
    'type' => 'close'
    );

    return $this
    ->connect()
    ->then(function ($connection) use ($job) {
    return $connection->enqueue($job);
    });
    }

    public function enqueue($job)
    {
    $deferred = new Deferred();

    $job->resolver = $deferred->resolver();

    $this->jobs->enqueue($job);

    if (1 === count($this->jobs)) {
    $this->start();
    }

    return $deferred->promise();
    }

    protected function start()
    {
    if ($this->jobs->isEmpty()) {
    return;
    }

    $job = $this->jobs->bottom();

    switch ($job->type) {
    case 'query':
    $this->mysqli->query($job->sql, \MYSQLI_ASYNC);
    $this->poll();
    break;
    case 'close':
    if ($this->mysqli->close()) {
    $job->resolver->resolve();
    } else {
    $job->resolver->reject(mysqli_error($this->mysqli));
    }

    $this->dequeue();
    break;
    }
    }

    public function poll()
    {
    $links = $errors = $reject = array($this->mysqli);

    if (0 === mysqli_poll($links, $errors, $reject, 1)) {
    $this->loop->addTimer(0.001, array($this, 'poll'));
    return;
    }

    $job = $this->jobs->bottom();
    $result = $links[0]->reap_async_query();

    if ($result) {
    $job->resolver->resolve($result);
    } else {
    $job->resolver->reject(mysqli_error($links[0]));
    }

    $this->dequeue();
    }

    protected function dequeue()
    {
    $this->jobs->dequeue();

    if ($this->jobs->isEmpty()) {
    //$this->emit('drain');
    } else {
    $this->start();
    }
    }
    }