Created
February 25, 2018 00:43
-
-
Save segphault/c0af3428dbafa6619512ae2afb8e0e7a to your computer and use it in GitHub Desktop.
Revisions
-
segphault created this gist
Feb 25, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,140 @@ I recently began experimenting with async iterators, using the `--harmony` flag in Node 9.5. After reading through the MDN docs, I figured that it might be interesting to make a function that takes the values yielded by an async generator function and serves them as an SSE event stream. The following is a simple, contrived example: ```javascript const http = require("http"); const timer = time => new Promise(resolve => setTimeout(resolve, time)); async function* counter() { let counter = 0; while (true) { await timer(5000); yield counter++; } } async function iterStream(res, iter) { res.writeHead(200, {"Content-Type": "text/event-stream"}); for await (let item of iter) res.write(`event: counter\ndata: ${item}\n\n`); } http.createServer((req, res) => iterStream(res, counter())).listen(8000); ``` The iterator yields a new value from the counter every five seconds. The `iterStream` function takes each new value and broadcasts it to the connected user. With an `EventSource` on the client side, I received the JSON object with the incrementing value: ```javascript let events = new EventSource("/"); events.addEventListener("counter", ({data}) => console.log(data)); ``` Next, a slightly less contrived example. Instead of using a timer to introduce an artificial delay between items, I'm going to use a RethinkDB query with a changefeed, which broadcasts live updates from the database: ```javascript const http = require("http"); const r = require("rethinkdbdash")(); async function* query() { let cursor = await r.db("rethinkdb").table("stats").changes(); while (true) yield cursor.next(); } async function iterStream(res, iter) { res.writeHead(200, {"Content-Type": "text/event-stream"}); for await (let item of iter) res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`); } http.createServer((req, res) => iterStream(res, query())).listen(8000); ``` The `query` function waits for and yields new values from the database in a loop. For reference, the client library's `cursor` does not conform with the iteration protocol, it just (coincidentally) happens to use `next` as the name of the method that retrieves a new item. The `cursor.next` method returns a promise that resolves when a new item is available from the database. There's a minor wrinkle, however: the database connection continues to stream cursor results even after the user disconnects. I figured that I could address the problem by invoking `iter.return` in my `iterStream` function when the connection closes. When the iterator terminates, I can use a `finally` block in my `query` function to close the database cursor: ```javascript async function* query() { let cursor = await r.db("test").table("test").changes(); try { while (true) yield cursor.next(); } finally { console.log("Closing the cursor"); cursor.close(); } } async function iterStream(res, iter) { res.writeHead(200, {"Content-Type": "text/event-stream"}); res.connection.on("close", () => iter.return()); for await (let item of iter) res.write(`event: change\ndata: ${JSON.stringify(item)}\n\n`); } ``` This approach seems to work, but not exactly as I expected. When the user closes their connection and `iter.return` forces the generator to finish, the pending `cursor.next` call sits there and waits until one more item comes in before the `finally` block executes—it doesn't interrupt the pending promise. I tried using `iter.throw` instead of `iter.return` to see if it would give me the desired behavior, but it does basically the same thing: with `iter.throw`, a `catch` block in the `query` function doesn't execute until the next yield. This same behavior is reproducible with the timed counter example: ```javascript async function* test() { let n = 0; try { while (true) { console.log("Awaiting the timer") await timer(5000); console.log("Yielding a new item") yield n++; } } finally { console.log("Iterator finished") } } async function iterStream(res, iter) { res.writeHead(200, {"Content-Type": "text/event-stream"}); res.connection.on("close", () => { console.log("Connection closed"); iter.return(); }); for await (let item of iter) { console.log("Sending item:", item); res.write(`event: item\ndata: ${item}\n\n`); } } ``` When I run the example above and disconnect after receiving the second item, the resulting output looks like this: ``` waiting the timer Yielding a new item Sending item: 0 Awaiting the timer Yielding a new item Sending item: 1 Awaiting the timer Yielding a new item Sending item: 2 Awaiting the timer Connection closed Yielding a new item Iterator finished Sending item: 3 ``` After the connection closes, the `finally` block doesn't execute until the pending timer completes. Am I missing something? Is there another approach that I could use to address connection termination in this example while still using async iterators?