/* Makes a channel that buffers up to n items */ function chan(n) { const data = []; // data not yet read const readersBacklog = []; // readers waiting for data const writersBacklog = []; // writers waiting for data let disposed = false; // TODO(Benjamin) - disposing return { async read() { if (data.length === n) { // data is full const nextWaitingWrite = writersBacklog.shift(); nextWaitingWrite(); } if (data.length > 0) { return data.shift(); } return new Promise(resolve => readersBacklog.push(resolve)); }, async write(datum) { if (data.length === 0) { const resolve = readersBacklog.shift(); resolve(datum); return; } if (data.length < n) { data.push(datum); return; } return new Promise(resolve => { writersBacklog.push(() => { data.push(datum); resolve(); }); }); }, async *[Symbol.asyncIterator]() { while(!disposed) yield await this.read(); }, }; } /* this impementation is very slow (for the arrays and queue and closure) but it just demonstrates how */ { // Example with 1 concurrency, uncomment to see // const c = chan(1); // only one at once // let i = 0; // (async () => { // setInterval(() => c.write('hello world!', i++), 200); // for await(const data of c) { // console.log('got data', data); // } // })(); } { // example with two readers const c = chan(2); let i = 0; setInterval(() => c.write('hello world!', i++), 200); (async () => { for await(const data of c) { console.log('got data first channel', data); } })(); (async () => { for await(const data of c) { console.log('got data second channel', data); } })(); }