type Callback = (a: A) => void; /** * Delays stuff for ensuring fairness. */ export function yieldRunLoop(): Promise { const fn: (cb: (() => void)) => void = typeof setImmediate !== 'undefined' ? setImmediate : cb => setTimeout(cb, 0) return new Promise(fn) } /** * Async queue implementation */ export class Queue { private readonly elements: A[] = [] private readonly callbacks: ([Callback, Callback])[] = [] enqueue = async (a: A) => { const cbs = this.callbacks.shift() if (cbs) { // fairness + guards against stack overflows await yieldRunLoop() const [resolve, _] = cbs resolve(a) } else { this.elements.push(a) } } dequeue = async () => { if (this.elements.length > 0) { return this.elements.shift() as A } else { let cb: [Callback, Callback] | undefined const p = new Promise((resolve, reject) => { cb = [resolve, reject] }) if (!cb) throw new Error("Promise constructor") this.callbacks.push(cb) return await p } } rejectAllActive = (e: Error) => { while (this.callbacks.length > 0) { const cbs = this.callbacks.pop() if (!cbs) continue const [_, reject] = cbs reject(e) } } } /** * Consumer implementation. * * @param workers specifies the number of workers to start in parallel * @param blockProcessFromExiting if `true` then blocks the Nodejs process from exiting * * @returns a `[promise, cancel]` tuple, where `cancel` is a function that can be used * to stop all processing and `promise` can be awaited for the completion of * all workers, workers that complete on cancellation */ export function consumeQueue(queue: Queue, workers: number, blockProcessFromExiting: boolean = false) { const Cancel = new Error("queue-cancel-all") const startWorker = async (isActive: boolean[], process: (a: A) => Promise) => { await yieldRunLoop() try { while (isActive.length > 0 && isActive[0]) { const a = await queue.dequeue() try { await process(a) } catch (e) { console.error("Error while processing queue message:", a, e) } // Fairness + protection against stack-overflow await yieldRunLoop() } } catch (e) { if (e != Cancel) throw e } } // For keeping the process alive, for as long as there is a run-loop active function startTick() { let tickID: Object function tick() { tickID = setTimeout(tick, 1000) } tick() return () => clearTimeout(tickID as any) } return (process: (a: A) => Promise) => { const isActive = [true] const cancelTick = blockProcessFromExiting ? startTick() : () => {} const cancel = () => { isActive[0] = false queue.rejectAllActive(Cancel) cancelTick() } const tasks: Promise[] = [] for (let i=0; i undefined as void) return [all, cancel] as [Promise, () => void] } }