import Papa, { ParseConfig, Parser } from 'papaparse'; export type CsvAsyncIteratorOptions = Exclude< ParseConfig, 'step' | 'chunk' | 'complete' | 'error' >; /** * Helper to allow async iteration over the contents of * a CSV input. * * This allows the caller to walk through the results * using a `for await` loop and deal with rows asyncronously * one at a time. * * When process a large input this reduces the memory * use of the process because we don't have to store * the parsed representation of all the rows in memory * at once; just the last returned row is represented * as JavaScript objects. * * Also, if the input is coming from a file or URL this * will avoid loading the entire file into memory. * * The combination of these two factors allows processing * of extremely large files without using tons of memory. */ const csvAsyncIterator = ( input: Buffer | string | NodeJS.ReadableStream, config?: CsvAsyncIteratorOptions, ): AsyncIterable => { return { [Symbol.asyncIterator]: () => { let parser: Parser | null = null; let done: boolean = false; let error: any = null; let resolve: (r: IteratorResult) => void = () => undefined; let reject: (err: any) => void = () => undefined; return { return: () => { done = true; if (parser) parser.abort(); return Promise.resolve({ done, value: (undefined as any) as RowType[], }); }, throw: err => { done = true; error = err; if (parser) parser.abort(); return Promise.reject(err); }, next: () => error ? Promise.reject(error) : done ? Promise.resolve({ done: true, value: (undefined as any) as RowType[], }) : new Promise>((res, rej) => { try { resolve = res; reject = rej; if (parser === null) { Papa.parse( (Buffer.isBuffer(input) ? input.toString('utf8').replace('\r', '') : input) as any, { ...config, error: (ioError, f) => { done = true; error = ioError; return reject(ioError); }, step: (results, p) => { parser = p; parser.pause(); if (results.errors && results.errors.length) { done = true; return reject(results.errors[0]); } else { return resolve({ value: (results.data as any) as RowType[], done: false, }); } }, complete: () => { done = true; if (error) return reject(error); else return resolve({ done: true, value: (undefined as any) as RowType[], }); }, }, ); } else { parser.resume(); } } catch (err) { error = err; return reject(err); } }), }; }, }; }; export default csvAsyncIterator;