Skip to content

Instantly share code, notes, and snippets.

@segphault
Created February 25, 2018 00:43
Show Gist options
  • Select an option

  • Save segphault/c0af3428dbafa6619512ae2afb8e0e7a to your computer and use it in GitHub Desktop.

Select an option

Save segphault/c0af3428dbafa6619512ae2afb8e0e7a to your computer and use it in GitHub Desktop.

Revisions

  1. segphault created this gist Feb 25, 2018.
    140 changes: 140 additions & 0 deletions iterator.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,140 @@
    I recently began experimenting with async iterators, using the `--harmony` flag in Node 9.5. After reading through the MDN docs, I figured that it might be interesting to make a function that takes the values yielded by an async generator function and serves them as an SSE event stream. The following is a simple, contrived example:

    ```javascript
    const http = require("http");

    const timer = time =>
    new Promise(resolve => setTimeout(resolve, time));

    async function* counter() {
    let counter = 0;
    while (true) {
    await timer(5000);
    yield counter++;
    }
    }

    async function iterStream(res, iter) {
    res.writeHead(200, {"Content-Type": "text/event-stream"});
    for await (let item of iter)
    res.write(`event: counter\ndata: ${item}\n\n`);
    }

    http.createServer((req, res) =>
    iterStream(res, counter())).listen(8000);
    ```

    The iterator yields a new value from the counter every five seconds. The `iterStream` function takes each new value and broadcasts it to the connected user. With an `EventSource` on the client side, I received the JSON object with the incrementing value:

    ```javascript
    let events = new EventSource("/");
    events.addEventListener("counter", ({data}) => console.log(data));
    ```

    Next, a slightly less contrived example. Instead of using a timer to introduce an artificial delay between items, I'm going to use a RethinkDB query with a changefeed, which broadcasts live updates from the database:

    ```javascript
    const http = require("http");
    const r = require("rethinkdbdash")();

    async function* query() {
    let cursor = await r.db("rethinkdb").table("stats").changes();
    while (true)
    yield cursor.next();
    }

    async function iterStream(res, iter) {
    res.writeHead(200, {"Content-Type": "text/event-stream"});
    for await (let item of iter)
    res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
    }

    http.createServer((req, res) =>
    iterStream(res, query())).listen(8000);
    ```

    The `query` function waits for and yields new values from the database in a loop. For reference, the client library's `cursor` does not conform with the iteration protocol, it just (coincidentally) happens to use `next` as the name of the method that retrieves a new item. The `cursor.next` method returns a promise that resolves when a new item is available from the database.

    There's a minor wrinkle, however: the database connection continues to stream cursor results even after the user disconnects. I figured that I could address the problem by invoking `iter.return` in my `iterStream` function when the connection closes. When the iterator terminates, I can use a `finally` block in my `query` function to close the database cursor:

    ```javascript
    async function* query() {
    let cursor = await r.db("test").table("test").changes();

    try {
    while (true)
    yield cursor.next();
    }
    finally {
    console.log("Closing the cursor");
    cursor.close();
    }
    }

    async function iterStream(res, iter) {
    res.writeHead(200, {"Content-Type": "text/event-stream"});
    res.connection.on("close", () => iter.return());
    for await (let item of iter)
    res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`);
    }
    ```

    This approach seems to work, but not exactly as I expected. When the user closes their connection and `iter.return` forces the generator to finish, the pending `cursor.next` call sits there and waits until one more item comes in before the `finally` block executes—it doesn't interrupt the pending promise.

    I tried using `iter.throw` instead of `iter.return` to see if it would give me the desired behavior, but it does basically the same thing: with `iter.throw`, a `catch` block in the `query` function doesn't execute until the next yield.

    This same behavior is reproducible with the timed counter example:

    ```javascript
    async function* test() {
    let n = 0;

    try {
    while (true) {
    console.log("Awaiting the timer")
    await timer(5000);
    console.log("Yielding a new item")
    yield n++;
    }
    }
    finally {
    console.log("Iterator finished")
    }
    }

    async function iterStream(res, iter) {
    res.writeHead(200, {"Content-Type": "text/event-stream"});
    res.connection.on("close", () => {
    console.log("Connection closed");
    iter.return();
    });

    for await (let item of iter) {
    console.log("Sending item:", item);
    res.write(`event: item\ndata: ${item}\n\n`);
    }
    }
    ```

    When I run the example above and disconnect after receiving the second item, the resulting output looks like this:

    ```
    waiting the timer
    Yielding a new item
    Sending item: 0
    Awaiting the timer
    Yielding a new item
    Sending item: 1
    Awaiting the timer
    Yielding a new item
    Sending item: 2
    Awaiting the timer
    Connection closed
    Yielding a new item
    Iterator finished
    Sending item: 3
    ```

    After the connection closes, the `finally` block doesn't execute until the pending timer completes.

    Am I missing something? Is there another approach that I could use to address connection termination in this example while still using async iterators?