Skip to content

Instantly share code, notes, and snippets.

@dmitryshelomanov
Created March 28, 2020 15:04
Show Gist options
  • Select an option

  • Save dmitryshelomanov/e0425765c9c265f27b1f3f209a4deada to your computer and use it in GitHub Desktop.

Select an option

Save dmitryshelomanov/e0425765c9c265f27b1f3f209a4deada to your computer and use it in GitHub Desktop.

Revisions

  1. dmitryshelomanov created this gist Mar 28, 2020.
    79 changes: 79 additions & 0 deletions rx-effect.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,79 @@
    // @flow

    import { of, Subject, defer, Observable } from "rxjs"
    import { map, catchError, flatMap, switchMap } from "rxjs/operators"
    import {
    type Event,
    type Store,
    createEvent,
    createStore,
    merge,
    forward,
    } from "effector"
    import { mapTo } from "./operators"

    export function createRxEffect<Params, Done, E = Error>(
    api: (Params) => Promise<Done>,
    mapper?: (Observable<Done>) => Observable<Done> = (v) => v,
    takeEvery?: boolean = true,
    ) {
    const run: Event<Params> = createEvent()
    const doneData: Event<Done> = createEvent()
    const failData: Event<E> = createEvent()
    const finallyEvent: Event<
    { status: "done", result: Done } | { status: "fail", error: E },
    > = createEvent()

    const $pending: Store<boolean> = createStore(false)
    const $error: Store<?E> = createStore(null)

    const observer = new Subject<Params>()

    const wrapper = takeEvery ? flatMap : switchMap

    const source$ = observer.pipe(
    wrapper((params) =>
    mapper(defer(() => api(params))).pipe(
    map((rs) => ({ isSuccess: true, data: rs })),
    catchError((error) => of({ isSuccess: false, error })),
    ),
    ),
    )

    $pending
    .on(run, () => true)
    .on(merge([mapTo(doneData), mapTo(failData)]), () => false)

    $error.reset(run, doneData).on(failData, (_, error) => error)

    run.watch((params) => observer.next(params))

    source$.subscribe((rs) => {
    if (rs.isSuccess) {
    doneData(rs.data)
    }

    if (!rs.isSuccess) {
    failData(rs.error)
    }
    })

    forward({
    from: doneData,
    to: finallyEvent.prepend((result) => ({ status: "done", result })),
    })

    forward({
    from: failData,
    to: finallyEvent.prepend((error) => ({ status: "fail", error })),
    })

    return {
    doneData,
    failData,
    finally: finallyEvent,
    run,
    $pending,
    $error,
    }
    }