Skip to content

Instantly share code, notes, and snippets.

@dodolboks
Forked from ingoogni/sse.nim
Created October 16, 2023 09:56
Show Gist options
  • Select an option

  • Save dodolboks/32f07ab5554eb6b5a8ea19fa755b0280 to your computer and use it in GitHub Desktop.

Select an option

Save dodolboks/32f07ab5554eb6b5a8ea19fa755b0280 to your computer and use it in GitHub Desktop.

Revisions

  1. @ingoogni ingoogni revised this gist May 13, 2022. 2 changed files with 4 additions and 69 deletions.
    8 changes: 4 additions & 4 deletions sse.nim
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,6 @@
    import asyncdispatch, asynchttpserver, asyncnet, nativesockets, net
    import logging, oids, strformat, strutils, tables, times
    import sse_db
    #import sse_db


    type SSEClient = object
    @@ -109,7 +109,7 @@ proc sseFanOut() {.async, gcsafe.}= #unsafe!
    if not startsWith(payload, ":"):
    let id = $genOid()
    data = "{payload}\nid: {id}\n\n".fmt
    put(id, data)
    #put(id, data)
    else:
    data = payload
    if len(sseClients) > 0:
    @@ -152,8 +152,8 @@ proc sse(req: Request) {.async, gcsafe.} =
    #SSE opens a single unidirectional channel, so reads fail.
    #If reads don't fail, the client is/has disconnected.
    addRead(req.client.getFd.AsyncFD, pruneClients)
    if id != "":
    await req.client.send(getdownfrom(id))
    #if id != "":
    # await req.client.send(getdownfrom(id))
    while not req.client.isClosed:
    (hasContent, data) = await sseClients[fd].fs.read
    if hasContent:
    65 changes: 0 additions & 65 deletions sse_db.nim
    Original file line number Diff line number Diff line change
    @@ -1,65 +0,0 @@
    import db_sqlite, os
    import sqlite3

    var
    init: bool
    k, v: string
    preps: seq[SqlPrepared]

    let dBFile = "event.db3"

    if not fileExists(dBFile): init = true

    let dbEvent = open(dBFile, "", "", "")
    dbEvent.exec(sql"PRAGMA journal_mode=wal;")
    dbEvent.exec(sql"PRAGMA wal_autocheckpoint;")
    dbEvent.exec(sql"PRAGMA synchronous = normal;")
    dbEvent.exec(sql"PRAGMA mmap_size = 30000000000;")
    #dbEvent.exec("PRAGMA query_only = boolean;")
    #dbEvent.exec("PRAGMA foreign_keys = on;")
    #dbEvent.exec(sql"PRAGMA temp_store = memory;")

    if init:
    dbEvent.exec(sql"""
    CREATE TABLE IF NOT EXISTS kv(
    k TEXT PRIMARY KEY,
    v TEXT NOT NULL
    )
    WITHOUT ROWID;
    """
    )

    #should use prepare_v3 & flag, but not available
    var psPut = dbEvent.prepare("INSERT INTO kv(k, v) VALUES (?, ?);")
    psPut.bindParams(k, v)
    preps.add(psPut)
    proc put*(key:string, value:string)=
    psPut.exec(key, value)

    var psDelUpFrom = dbEvent.prepare("DELETE FROM kv WHERE k < ?;")
    psDelUpFrom.bindParam(1, k)
    preps.add(psDelUpFrom)
    proc delUpFrom*(key:string)=
    psDelUpFrom.exec(key)

    var psGetDownFrom* = dbEvent.prepare("SELECT v FROM kv WHERE k > ? ORDER BY k;")
    preps.add(psGetDownFrom)
    iterator getdownfrom*(k: string):seq =
    psGetDownFrom.bindParam(1, k)
    for row in dbEvent.fastRows(psGetDownFrom):
    yield row
    if reset(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)
    if clear_bindings(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)

    proc getdownfrom*(k: string):string = #seems spiffier on send
    psGetDownFrom.bindParam(1, k)
    for row in dbEvent.fastRows(psGetDownFrom):
    result &= row[0]
    if reset(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)
    if clear_bindings(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)


    proc closeDB*()=
    for prep in preps:
    finalize(prep)
    db_sqlite.close(dbEvent)
  2. @ingoogni ingoogni created this gist May 12, 2022.
    195 changes: 195 additions & 0 deletions sse.nim
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,195 @@
    import asyncdispatch, asynchttpserver, asyncnet, nativesockets, net
    import logging, oids, strformat, strutils, tables, times
    import sse_db


    type SSEClient = object
    fs: FutureStream[string]
    clientClosed: int
    req: Request

    var
    sseClients {.global.}: Table[int, SSEClient]
    fsInput {.global.} = newFutureStream[string](fromProc = "main")
    consoleLog = newConsoleLogger()

    addHandler(consoleLog)

    proc logConnect(req: Request, resp:int, lenResp:int) {.async.} =
    var msg = $int(req.client.getFd) & " "
    let (peerIp, port) = req.client.getPeerAddr()
    msg &= peerIp & ":" & $port & " - "
    let username = if req.url.username == "": "-" else:req.url.username
    msg &= username
    msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] "
    msg &= "\"" & $req.reqMethod & " "
    msg &= req.url.path & " "
    msg &= req.protocol.orig & "\" "
    msg &= $resp & " "
    msg &= $lenResp
    info msg


    proc msgSSEDisconnect(req: Request):string =
    var msg = $int(req.client.getFd) & " "
    let (peerIp, port) = req.client.getPeerAddr()
    msg &= peerIp & ":" & $port & " - "
    let username = if req.url.username == "": "-" else:req.url.username
    msg &= username
    msg &= " [" & now().utc.format("d/MMM/yyyy:HH:mm:ss") & "] "
    msg &= "\"SSE CLOSE "
    msg &= req.url.path & " "
    msg &= req.protocol.orig & "\" "
    result = msg


    proc sseHeaders(): HttpHeaders {.inline.} =
    let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
    result = {
    "Date": &"{now} GMT",
    "Access-Control-Allow-Origin": "*",
    "Connection": "keep-alive",
    "Content-type": "text/event-stream; charset=utf-8",
    "Cache-Control": "no-cache",
    "X-Accel-Buffering": "no",
    "Content-Length": ""
    }.newHttpHeaders()


    proc error404(req: Request) {.async, gcsafe.}=
    ## eek
    let path = req.url.path
    let msg = "path {path} not found".fmt
    await logConnect(req, 404, len(msg))
    await req.respond(Http404, msg)


    proc initSSEClient(req:Request){.async.}=
    let fd = int(req.client.getFd)
    sseClients[fd] = SSEClient(
    fs : newFutureStream[string](fromProc = "main"),
    clientClosed : 0,
    req: req
    )
    await sseClients[fd].fs.write("retry: 5000\n\n")
    await sseClients[fd].fs.write("event: connect\ndata: connected {fd}\n\n".fmt)


    proc sseTime(interval: float = 10){.async.} =
    ## a date-time dataprovider, for test purposes
    ## fs: FutureStream to write the date-time to.
    ## interval: in seconds, default = 10s.
    while true:
    let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
    await fsInput.write("event: time\ndata: {now}".fmt)
    await sleepAsync(interval*1000)



    proc sseHeartbeat(interval: float = 15){.async.} =
    ## A server sent event heartbeat is sent so that the connection does not stay
    ## idle for too long. The heartbeat is just a commentline without content.
    ## fs: FutureStream to write the heartbeat to.
    ## interval: in seconds, default = 15s.
    while true:
    await sleepAsync(interval*1000)
    await fsInput.write(":\n\n")


    proc sseFanOut() {.async, gcsafe.}= #unsafe!
    ## Distributes incoming data from the stream over the clients. It is the
    ## callback for fsInput asyncstream.
    ## As every message passes here, this is where an message-id is added.
    ## fs: FutureStream to write the heartbeat to.
    var
    data: string
    while true:
    let (hasContent, payload) = await fsInput.read
    if hasContent:
    if not startsWith(payload, ":"):
    let id = $genOid()
    data = "{payload}\nid: {id}\n\n".fmt
    put(id, data)
    else:
    data = payload
    if len(sseClients) > 0:
    for k in sseClients.keys:
    await sseClients[k].fs.write(data)
    else:
    poll()


    proc input(req: Request) {.async, gcsafe.} =
    # HTTP POST here in SSE format(without id). Data will be distributed to listening clients
    # One message per POST.
    # Post as the event ID is added later on.
    await fsInput.write(req.body)
    await req.respond(Http200, "")


    proc pruneClients(fd: AsyncFD):bool {.gcsafe.}=
    let k = int(fd)
    if sseClients.hasKey(k):
    let msg = msgSSEDisconnect(sseClients[k].req)
    sseClients.del(k)
    fd.closeSocket()
    info(msg)
    return true


    proc sse(req: Request) {.async, gcsafe.} =
    ## sse emitter.
    let
    fd = int(req.client.getFd)
    id = req.headers.getOrDefault("last-event-id", default = @[""].HttpHeaderValues)
    var
    fds: seq[SocketHandle]
    hasContent: bool
    data : string
    await req.respond(Http200, "", sseHeaders())
    await initSSEClient(req)
    await logConnect(req, 200, 0)
    #SSE opens a single unidirectional channel, so reads fail.
    #If reads don't fail, the client is/has disconnected.
    addRead(req.client.getFd.AsyncFD, pruneClients)
    if id != "":
    await req.client.send(getdownfrom(id))
    while not req.client.isClosed:
    (hasContent, data) = await sseClients[fd].fs.read
    if hasContent:
    fds = @[req.client.getFd]
    await req.client.send("{data}".fmt)
    #echo fd


    proc dispatch(req: Request){.async, gcsafe.} =
    let path = req.url.path
    case path:
    #of "/index/":
    # await index(req)
    of "/sse/":
    await sse(req)
    of "/input/":
    await input(req)
    else:
    await error404(req)


    proc main {.async.} =
    #two "internal" dataproviders
    asyncCheck sseHeartbeat(5)
    asyncCheck sseTime(1)

    asyncCheck sseFanOut()

    var server = newAsyncHttpServer(reuseAddr = false, reusePort = false)
    server.listen(Port(8088), "192.168.1.4")
    echo "asynchttpserver listening at 192.168.1.4:8088"
    while true:
    if server.shouldAcceptRequest():
    await server.acceptRequest(dispatch)
    else:
    poll()

    asyncCheck main()
    runForever()
    65 changes: 65 additions & 0 deletions sse_db.nim
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,65 @@
    import db_sqlite, os
    import sqlite3

    var
    init: bool
    k, v: string
    preps: seq[SqlPrepared]

    let dBFile = "event.db3"

    if not fileExists(dBFile): init = true

    let dbEvent = open(dBFile, "", "", "")
    dbEvent.exec(sql"PRAGMA journal_mode=wal;")
    dbEvent.exec(sql"PRAGMA wal_autocheckpoint;")
    dbEvent.exec(sql"PRAGMA synchronous = normal;")
    dbEvent.exec(sql"PRAGMA mmap_size = 30000000000;")
    #dbEvent.exec("PRAGMA query_only = boolean;")
    #dbEvent.exec("PRAGMA foreign_keys = on;")
    #dbEvent.exec(sql"PRAGMA temp_store = memory;")

    if init:
    dbEvent.exec(sql"""
    CREATE TABLE IF NOT EXISTS kv(
    k TEXT PRIMARY KEY,
    v TEXT NOT NULL
    )
    WITHOUT ROWID;
    """
    )

    #should use prepare_v3 & flag, but not available
    var psPut = dbEvent.prepare("INSERT INTO kv(k, v) VALUES (?, ?);")
    psPut.bindParams(k, v)
    preps.add(psPut)
    proc put*(key:string, value:string)=
    psPut.exec(key, value)

    var psDelUpFrom = dbEvent.prepare("DELETE FROM kv WHERE k < ?;")
    psDelUpFrom.bindParam(1, k)
    preps.add(psDelUpFrom)
    proc delUpFrom*(key:string)=
    psDelUpFrom.exec(key)

    var psGetDownFrom* = dbEvent.prepare("SELECT v FROM kv WHERE k > ? ORDER BY k;")
    preps.add(psGetDownFrom)
    iterator getdownfrom*(k: string):seq =
    psGetDownFrom.bindParam(1, k)
    for row in dbEvent.fastRows(psGetDownFrom):
    yield row
    if reset(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)
    if clear_bindings(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)

    proc getdownfrom*(k: string):string = #seems spiffier on send
    psGetDownFrom.bindParam(1, k)
    for row in dbEvent.fastRows(psGetDownFrom):
    result &= row[0]
    if reset(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)
    if clear_bindings(psGetDownFrom.PStmt) != SQLITE_OK: dbError(dbEvent)


    proc closeDB*()=
    for prep in preps:
    finalize(prep)
    db_sqlite.close(dbEvent)