Skip to content

Instantly share code, notes, and snippets.

@talaikis
Created July 25, 2021 19:34
Show Gist options
  • Save talaikis/baab913636737250850438ddb616fb5d to your computer and use it in GitHub Desktop.
Save talaikis/baab913636737250850438ddb616fb5d to your computer and use it in GitHub Desktop.

Revisions

  1. talaikis created this gist Jul 25, 2021.
    82 changes: 82 additions & 0 deletions stream-queue.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,82 @@
    const child = require('child_process')
    const { cpus } = require('os')
    const transform = require('stream-transform')

    class Pool {
    constructor (file, maxPool, done) {
    this.pool = []
    this.active = []
    this.waiting = []
    this.maxPool = maxPool

    const releaseWorker = (function (worker) {
    this.active = this.active.filter((w) => worker !== w)
    this.pool.push(worker)
    if (this.waiting.length > 0) {
    this.assignWork(this.waiting.shift())
    }
    }).bind(this)

    for (let i = 0; i < maxPool; i++) {
    const worker = child.fork(file)
    worker.on('message', (...params) => {
    done(...params)
    releaseWorker(worker)
    })
    this.pool.push(worker)
    }
    }

    assignWork (data) {
    if (this.active.length >= this.maxPool) {
    this.waiting.push(data)
    }

    if (this.pool.length > 0) {
    const worker = this.pool.pop()
    worker.send(data)
    this.active.push(worker)
    }
    }
    }

    const JobQueue = {}

    const Pooler = new Pool(join(__dirname, 'workers', 'work.js'), cpus().length, (msg) => {
    const queue = [...JobQueue[msg.event]]
    JobQueue[msg.event] = null
    queue.map((cb) => cb(msg.value))
    })

    const jobBatch = (record, done) => {
    if (JobQueue[record[0]]) {
    JobQueue[record[0]].push(done)
    } else {
    JobQueue[record[0]] = [done]
    Pooler.assignWork({ record, event: record[0] })
    }
    }

    // pass read stream or something similar
    (async (readStream) => {
    const parser = parse({
    delimiter: ','
    })

    const transformer = transform((record, callback) => {
    jobBatch(record, callback)
    }, {
    parallel: cpus().length
    })

    readStream.pipe(parser).pipe(transformer).pipe(process.stdout)
    })()

    // workers/work.js:

    const doWork = async (record) => {
    }

    process.on('message', async ({ record, event }) => {
    process.send({ value: await doWork(record), event })
    })