Skip to content

Instantly share code, notes, and snippets.

@bsingr
Created November 20, 2019 11:37
Show Gist options
  • Save bsingr/32c7792df163206416bccbd49fb5254e to your computer and use it in GitHub Desktop.
Save bsingr/32c7792df163206416bccbd49fb5254e to your computer and use it in GitHub Desktop.

Revisions

  1. bsingr created this gist Nov 20, 2019.
    45 changes: 45 additions & 0 deletions nodejs-transform-stream.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    const { Readable, Transform, pipeline } = require('stream')

    const createCounterReader = () => {
    let count = 0;
    return new Readable({
    objectMode: true,
    read() {
    count += 1;
    console.log('read', count)
    this.push({count});
    },
    });
    };

    const sleep = (delay) => {
    return new Promise(resolve => {
    setTimeout(() => {
    resolve()
    }, delay)
    })
    }


    async function main () {
    const readable = createCounterReader();
    let counter = 0;

    const transform = new Transform({
    objectMode: true,
    highWaterMark: 10,
    transform: async (chunk, encoding, callback) => {
    const { count } = chunk
    console.log('transform:', count);
    await sleep(1000)
    console.log('transformEnd:', count);
    ++counter
    callback(null, `${counter}. ${JSON.stringify(chunk)} \n`)
    }
    })

    // pipeline stream executes sequential (downstream ↓)
    readable.pipe(transform)
    }

    main().catch((error) => console.error(error.toString()))