Skip to content

Instantly share code, notes, and snippets.

@bryanerayner
Created October 16, 2015 20:14
Show Gist options
  • Save bryanerayner/cc2bb9b4da243d94f0f8 to your computer and use it in GitHub Desktop.
Save bryanerayner/cc2bb9b4da243d94f0f8 to your computer and use it in GitHub Desktop.

Revisions

  1. bryanerayner created this gist Oct 16, 2015.
    77 changes: 77 additions & 0 deletions StreamQueue.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,77 @@

    var StreamQueue = (function () {

    function StreamQueue() {
    // The factory
    this.nextTaskFactory = null;
    // The current task
    this.currentTask = null;

    // Tasks which must have end() called on them.
    this.tasksToEnd = [];
    }

    /**
    *
    * @param taskFactory A function which should return the next task.
    */
    StreamQueue.prototype.currentTaskEnded = function () {

    gUtil.log('Finished');

    this.currentTask = null;
    if (this.nextTaskFactory) {
    var nextTask = this.nextTaskFactory();

    this.nextTaskFactory = null;
    this.currentTask = nextTask;

    // When we were queuing tasks, we were returning
    // streams. We must signify 'end' on all those streams, when
    // the now current task has finished.
    var tasksToEnd = [].concat(this.tasksToEnd);
    this.tasksToEnd = [];

    var _this = this;
    this.currentTask.on('end', function () {
    // Signify to everything that was listening to the 'next stream' that it's finished now.
    _.each(tasksToEnd, function (task) {
    task.end();
    });
    tasksToEnd = [];

    _this.currentTaskEnded();
    });
    }
    };

    /**
    *
    * @param taskFactory A function which should return the next task.
    */
    StreamQueue.prototype.queueTask = function (taskFactory) {
    var _this = this;
    if (!this.currentTask) {
    gUtil.log('Starting');
    this.currentTask = taskFactory();
    var currentTaskEnded = function () {
    _this.currentTaskEnded();
    };
    this.currentTask
    .on('end', currentTaskEnded);
    return this.currentTask;
    } else {
    gUtil.log('Task already in progress - queued next task.');
    if (!this.nextTaskFactory) {
    this.nextTaskFactory = taskFactory;
    }

    var ws = Writable();
    this.tasksToEnd.push(ws);

    return ws;
    }
    };

    return StreamQueue;
    })();