const Stream = require('stream'); const { EventEmitter } = require('events'); const controller = new AbortController(); class Custom extends EventEmitter { [Symbol.asyncIterator]() { const stream = new Stream.Readable({ destroy(_error, callback) { console.log('error called') callback(null) }, read() { }, autoDestroy: true, objectMode: true, signal: controller.signal, }) this.on('custom-event', (args) => { stream.push(args) }) return stream } } const instance = new Custom() process.nextTick(() => { let iterator = 0; setInterval(() => { iterator++ instance.emit('custom-event', 'current iterator ' + iterator) }, 500) }) setTimeout(() => { controller.abort(); console.log('hello') }, 2000) async function run() { for await (let payload of instance[Symbol.asyncIterator]()) { console.log('payload', payload) if (payload.filename === 'current iterator 5') { break } } } run().then(() => console.log('bitti')).catch(error => console.log('error', error))