const child = require('child_process') const { cpus } = require('os') const transform = require('stream-transform') class Pool { constructor (file, maxPool, done) { this.pool = [] this.active = [] this.waiting = [] this.maxPool = maxPool const releaseWorker = (function (worker) { this.active = this.active.filter((w) => worker !== w) this.pool.push(worker) if (this.waiting.length > 0) { this.assignWork(this.waiting.shift()) } }).bind(this) for (let i = 0; i < maxPool; i++) { const worker = child.fork(file) worker.on('message', (...params) => { done(...params) releaseWorker(worker) }) this.pool.push(worker) } } assignWork (data) { if (this.active.length >= this.maxPool) { this.waiting.push(data) } if (this.pool.length > 0) { const worker = this.pool.pop() worker.send(data) this.active.push(worker) } } } const JobQueue = {} const Pooler = new Pool(join(__dirname, 'workers', 'work.js'), cpus().length, (msg) => { const queue = [...JobQueue[msg.event]] JobQueue[msg.event] = null queue.map((cb) => cb(msg.value)) }) const jobBatch = (record, done) => { if (JobQueue[record[0]]) { JobQueue[record[0]].push(done) } else { JobQueue[record[0]] = [done] Pooler.assignWork({ record, event: record[0] }) } } // pass read stream or something similar (async (readStream) => { const parser = parse({ delimiter: ',' }) const transformer = transform((record, callback) => { jobBatch(record, callback) }, { parallel: cpus().length }) readStream.pipe(parser).pipe(transformer).pipe(process.stdout) })() // workers/work.js: const doWork = async (record) => { } process.on('message', async ({ record, event }) => { process.send({ value: await doWork(record), event }) })