Skip to content

Instantly share code, notes, and snippets.

@dobesv
Created February 20, 2019 19:43
Show Gist options
  • Save dobesv/e637893adb0588a768db70e2c2e7ba29 to your computer and use it in GitHub Desktop.
Save dobesv/e637893adb0588a768db70e2c2e7ba29 to your computer and use it in GitHub Desktop.

Revisions

  1. dobesv created this gist Feb 20, 2019.
    115 changes: 115 additions & 0 deletions csvAsyncIterator.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    import Papa, { ParseConfig, Parser } from 'papaparse';

    export type CsvAsyncIteratorOptions = Exclude<
    ParseConfig,
    'step' | 'chunk' | 'complete' | 'error'
    >;

    /**
    * Helper to allow async iteration over the contents of
    * a CSV input.
    *
    * This allows the caller to walk through the results
    * using a `for await` loop and deal with rows asyncronously
    * one at a time.
    *
    * When process a large input this reduces the memory
    * use of the process because we don't have to store
    * the parsed representation of all the rows in memory
    * at once; just the last returned row is represented
    * as JavaScript objects.
    *
    * Also, if the input is coming from a file or URL this
    * will avoid loading the entire file into memory.
    *
    * The combination of these two factors allows processing
    * of extremely large files without using tons of memory.
    */
    const csvAsyncIterator = <RowType = any>(
    input: Buffer | string | NodeJS.ReadableStream,
    config?: CsvAsyncIteratorOptions,
    ): AsyncIterable<RowType[]> => {
    return {
    [Symbol.asyncIterator]: () => {
    let parser: Parser | null = null;
    let done: boolean = false;
    let error: any = null;
    let resolve: (r: IteratorResult<RowType[]>) => void = () => undefined;
    let reject: (err: any) => void = () => undefined;
    return {
    return: () => {
    done = true;
    if (parser) parser.abort();
    return Promise.resolve({
    done,
    value: (undefined as any) as RowType[],
    });
    },
    throw: err => {
    done = true;
    error = err;
    if (parser) parser.abort();
    return Promise.reject(err);
    },
    next: () =>
    error
    ? Promise.reject(error)
    : done
    ? Promise.resolve({
    done: true,
    value: (undefined as any) as RowType[],
    })
    : new Promise<IteratorResult<RowType[]>>((res, rej) => {
    try {
    resolve = res;
    reject = rej;
    if (parser === null) {
    Papa.parse(
    (Buffer.isBuffer(input)
    ? input.toString('utf8').replace('\r', '')
    : input) as any,
    {
    ...config,
    error: (ioError, f) => {
    done = true;
    error = ioError;
    return reject(ioError);
    },
    step: (results, p) => {
    parser = p;
    parser.pause();
    if (results.errors && results.errors.length) {
    done = true;
    return reject(results.errors[0]);
    } else {
    return resolve({
    value: (results.data as any) as RowType[],
    done: false,
    });
    }
    },
    complete: () => {
    done = true;
    if (error) return reject(error);
    else
    return resolve({
    done: true,
    value: (undefined as any) as RowType[],
    });
    },
    },
    );
    } else {
    parser.resume();
    }
    } catch (err) {
    error = err;
    return reject(err);
    }
    }),
    };
    },
    };
    };

    export default csvAsyncIterator;
    38 changes: 38 additions & 0 deletions mapAsyncIterator.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,38 @@
    export default <T, U>(
    sourceIterator: AsyncIterator<T>,
    transform: (payload: T) => U | Promise<U>,
    ): AsyncIterator<U> & AsyncIterable<U> => {
    const applyTransformToResult = async (
    nextResult: IteratorResult<T>,
    ): Promise<IteratorResult<U>> => {
    return {
    done: nextResult.done,
    value: nextResult.done ? undefined as any : await transform(nextResult.value),
    };
    };

    return {
    next(): Promise<IteratorResult<U>> {
    return sourceIterator.next().then(applyTransformToResult);
    },

    return(arg?: T): Promise<IteratorResult<U>> {
    if (sourceIterator.return)
    return sourceIterator.return(arg).then(applyTransformToResult);
    return Promise.resolve({
    done: true,
    value: arg as any,
    });
    },

    throw(e?: any): Promise<IteratorResult<U>> {
    if (sourceIterator.throw)
    return sourceIterator.throw(e).then(applyTransformToResult);
    return Promise.reject(e);
    },

    [Symbol.asyncIterator]() {
    return this;
    },
    };
    };