/** * Pushes the items of `iterable` into `sink`, a generator. * It uses the generator method `next()` to do so. */ function send(iterable: Iterable, sink: any, config?: { log: boolean }) { for (const x of iterable) { sink.next(x); if (config && config.log) { console.log('===>', x); } } sink.return(); // signal end of stream } /** * This generator logs everything that it receives via `next()`. */ const logItems = coroutine(function* (): any { try { while (true) { const item = yield; // receive item via `next()` console.log(item); } } finally { console.log('DONE'); } }); /** * Receives a sequence of characters (via the generator object * method `next()`), groups them into words and pushes them * into the generator `sink`. */ const tokenize = coroutine(function* (sink: any): any { try { while (true) { // (A) let ch = yield; // (B) if (isWordChar(ch)) { // A word has started let word = ''; try { do { word += ch; ch = yield; // (C) } while (isWordChar(ch)); } finally { // The word is finished. // We get here if // - the loop terminates normally // - the loop is terminated via `return()` in line C sink.next(word); // (D) } } // Ignore all other characters } } finally { // We only get here if the infinite loop is terminated // via `return()` (in line B or C). // Forward `return()` to `sink` so that it is also // aware of the end of stream. sink.return(); } }); function isWordChar(ch: string) { return /^[A-Za-z0-9]$/.test(ch); } /** * Returns a function that, when called, * returns a generator object that is immediately * ready for input via `next()` */ function coroutine(generatorFunction: any) { return function (...args: any[]) { const generatorObject = generatorFunction(...args); generatorObject.next(); return generatorObject; }; } /** * Receives a sequence of strings (via the generator object * method `next()`) and pushes only those strings to the generator * `sink` that are “numbers” (consist only of decimal digits). */ const extractNumbers = coroutine(function* (sink: any): any { try { while (true) { const word = yield; if (/^[0-9]+$/.test(word)) { sink.next(Number(word)); } } } finally { // Only reached via `return()`, forward. sink.return(); } }); /** * Receives a sequence of numbers (via the generator object * method `next()`). For each number, it pushes the total sum * so far to the generator `sink`. */ const addNumbers = coroutine(function* (sink: any): any { let sum = 0; try { while (true) { sum += yield; sink.next(sum); } } finally { // We received an end-of-stream sink.return(); // signal end of stream } }); const INPUT = '2 apples and 5 oranges.'; const CHAIN = tokenize(extractNumbers(addNumbers(logItems()))); // send(INPUT, CHAIN); const CHAIN2 = tokenize( extractNumbers(addNumbers(logItems({ prefix: '-> ' }))) ); send(INPUT, CHAIN2, { log: true });