@@ -0,0 +1,195 @@
import asyncdispatch, asynchttpserver, asyncnet, nativesockets, net
import logging, oids, strformat, strutils, tables, times
import sse_db
type SSEClient = object
fs: FutureStream [string ]
clientClosed: int
req: Request
var
sseClients {.global .}: Table [int , SSEClient ]
fsInput {.global .} = newFutureStream [string ](fromProc = " main" )
consoleLog = newConsoleLogger ()
addHandler (consoleLog)
proc logConnect (req: Request , resp:int , lenResp:int ) {.async .} =
var msg = $ int (req.client.getFd) & " "
let (peerIp, port) = req.client.getPeerAddr ()
msg &= peerIp & " :" & $ port & " - "
let username = if req.url.username == " " : " -" else :req.url.username
msg &= username
msg &= " [" & now ().utc.format (" d/MMM/yyyy:HH:mm:ss" ) & " ] "
msg &= " \" " & $ req.reqMethod & " "
msg &= req.url.path & " "
msg &= req.protocol.orig & " \" "
msg &= $ resp & " "
msg &= $ lenResp
info msg
proc msgSSEDisconnect (req: Request ):string =
var msg = $ int (req.client.getFd) & " "
let (peerIp, port) = req.client.getPeerAddr ()
msg &= peerIp & " :" & $ port & " - "
let username = if req.url.username == " " : " -" else :req.url.username
msg &= username
msg &= " [" & now ().utc.format (" d/MMM/yyyy:HH:mm:ss" ) & " ] "
msg &= " \" SSE CLOSE "
msg &= req.url.path & " "
msg &= req.protocol.orig & " \" "
result = msg
proc sseHeaders (): HttpHeaders {.inline .} =
let now = now ().utc.format (" ddd, d MMM yyyy HH:mm:ss" )
result = {
" Date" : & " { now} GMT" ,
" Access-Control-Allow-Origin" : " *" ,
" Connection" : " keep-alive" ,
" Content-type" : " text/event-stream; charset=utf-8" ,
" Cache-Control" : " no-cache" ,
" X-Accel-Buffering" : " no" ,
" Content-Length" : " "
}.newHttpHeaders ()
proc error404 (req: Request ) {.async , gcsafe .}=
# # eek
let path = req.url.path
let msg = " path {path} not found" .fmt
await logConnect (req, 404 , len (msg))
await req.respond (Http404 , msg)
proc initSSEClient (req:Request ){.async .}=
let fd = int (req.client.getFd)
sseClients[fd] = SSEClient (
fs : newFutureStream [string ](fromProc = " main" ),
clientClosed : 0 ,
req: req
)
await sseClients[fd].fs.write (" retry: 5000\n\n " )
await sseClients[fd].fs.write (" event: connect\n data: connected {fd}\n\n " .fmt)
proc sseTime (interval: float = 10 ){.async .} =
# # a date-time dataprovider, for test purposes
# # fs: FutureStream to write the date-time to.
# # interval: in seconds, default = 10s.
while true :
let now = now ().utc.format (" ddd, d MMM yyyy HH:mm:ss" )
await fsInput.write (" event: time\n data: {now}" .fmt)
await sleepAsync (interval* 1000 )
proc sseHeartbeat (interval: float = 15 ){.async .} =
# # A server sent event heartbeat is sent so that the connection does not stay
# # idle for too long. The heartbeat is just a commentline without content.
# # fs: FutureStream to write the heartbeat to.
# # interval: in seconds, default = 15s.
while true :
await sleepAsync (interval* 1000 )
await fsInput.write (" :\n\n " )
proc sseFanOut () {.async , gcsafe .}= # unsafe!
# # Distributes incoming data from the stream over the clients. It is the
# # callback for fsInput asyncstream.
# # As every message passes here, this is where an message-id is added.
# # fs: FutureStream to write the heartbeat to.
var
data: string
while true :
let (hasContent, payload) = await fsInput.read
if hasContent:
if not startsWith (payload, " :" ):
let id = $ genOid ()
data = " {payload}\n id: {id}\n\n " .fmt
put (id, data)
else :
data = payload
if len (sseClients) > 0 :
for k in sseClients.keys:
await sseClients[k].fs.write (data)
else :
poll ()
proc input (req: Request ) {.async , gcsafe .} =
# HTTP POST here in SSE format(without id). Data will be distributed to listening clients
# One message per POST.
# Post as the event ID is added later on.
await fsInput.write (req.body)
await req.respond (Http200 , " " )
proc pruneClients (fd: AsyncFD ):bool {.gcsafe .}=
let k = int (fd)
if sseClients.hasKey (k):
let msg = msgSSEDisconnect (sseClients[k].req)
sseClients.del (k)
fd.closeSocket ()
info (msg)
return true
proc sse (req: Request ) {.async , gcsafe .} =
# # sse emitter.
let
fd = int (req.client.getFd)
id = req.headers.getOrDefault (" last-event-id" , default = @ [" " ].HttpHeaderValues )
var
fds: seq [SocketHandle ]
hasContent: bool
data : string
await req.respond (Http200 , " " , sseHeaders ())
await initSSEClient (req)
await logConnect (req, 200 , 0 )
# SSE opens a single unidirectional channel, so reads fail.
# If reads don't fail, the client is/has disconnected.
addRead (req.client.getFd.AsyncFD , pruneClients)
if id != " " :
await req.client.send (getdownfrom (id))
while not req.client.isClosed:
(hasContent, data) = await sseClients[fd].fs.read
if hasContent:
fds = @ [req.client.getFd]
await req.client.send (" {data}" .fmt)
# echo fd
proc dispatch (req: Request ){.async , gcsafe .} =
let path = req.url.path
case path:
# of "/index/":
# await index(req)
of " /sse/" :
await sse (req)
of " /input/" :
await input (req)
else :
await error404 (req)
proc main {.async .} =
# two "internal" dataproviders
asyncCheck sseHeartbeat (5 )
asyncCheck sseTime (1 )
asyncCheck sseFanOut ()
var server = newAsyncHttpServer (reuseAddr = false , reusePort = false )
server.listen (Port (8088 ), " 192.168.1.4" )
echo " asynchttpserver listening at 192.168.1.4:8088"
while true :
if server.shouldAcceptRequest ():
await server.acceptRequest (dispatch)
else :
poll ()
asyncCheck main ()
runForever ()