const { Readable, Transform, pipeline } = require('stream') const createCounterReader = () => { let count = 0; return new Readable({ objectMode: true, read() { count += 1; console.log('read', count) this.push({count}); }, }); }; const sleep = (delay) => { return new Promise(resolve => { setTimeout(() => { resolve() }, delay) }) } async function main () { const readable = createCounterReader(); let counter = 0; const transform = new Transform({ objectMode: true, highWaterMark: 10, transform: async (chunk, encoding, callback) => { const { count } = chunk console.log('transform:', count); await sleep(1000) console.log('transformEnd:', count); ++counter callback(null, `${counter}. ${JSON.stringify(chunk)} \n`) } }) // pipeline stream executes sequential (downstream ↓) readable.pipe(transform) } main().catch((error) => console.error(error.toString()))