Last active
March 13, 2020 06:07
-
-
Save Gooseus/ba5310f52a16f16650c2669f553e85a9 to your computer and use it in GitHub Desktop.
Revisions
-
Gooseus revised this gist
Nov 7, 2016 . 1 changed file with 208 additions and 36 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,26 @@ // Rx HTTP Server // first attempt at creating a minimal reactive nodejs HTTP server // // has: // * url/querystring parsing // * body parsing // * some kind of routing // * some kind of error handling // * minimalist request logging // // has not: // * robust routing // * cookie parsing // * solid error handling // * file uploads const Rx = require("rxjs/Rx"), fs = require("fs"), http = require("http"), url = require("url"), mime = require("mime"); // creates Observable from a nodejs ReadableStream const fromReadableStream = stream => { stream.pause(); return Rx.Observable.create(observer => { @@ -23,55 +42,208 @@ const fromReadableStream = stream => { }).share(); }; // creates an HTTP server observable // I wonder if this is a case where I should be considering doing some OOP stuff with a RxServer class or some nonesense... nah const createHttpRxStream = (http,port) => { return Rx.Observable .create(observer => { // create a http server that emits a connection event of the request and response objects const server = http.createServer((req,res) => observer.next({req,res})).listen(port); // close the server as our unsubscriber fn return server.close.bind(server); }); }; // url parsing middleware // add querystring object and other URL parsed data to the request object const urlParser = ({req}) => { const urlObj = url.parse(req.url,true); req.query = urlObj.query; req.hash = urlObj.hash; req.pathname = urlObj.pathname; req.search = urlObj.search; }; // request logging middleware // log the incoming request data const logger = ({req}) => console.log(`${req.headers.host} - - ${req.method} ${req.headers['content-type'] || '-'} ${req.url} - ${req.headers['user-agent'] || '-'}`); // body parsing middleware // adds the `rawBody` buffer and the parsed `body` object/string to the request object // this returns an observable so it needs to be added to the stream using `flatMap` rather than `do` or regular `map` const bodyParser = (conn) => { let { req, res } = conn; if(req.method!="POST" && req.method != "PUT" && req.method != "PATCH") return Rx.Observable.of(conn); // PRO - will allow subsequent handlers to have ready access to the body data // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data let body_ = fromReadableStream(req) .toArray() .map(chunks => Buffer.concat(chunks)) .do(rawBody => req.rawBody=rawBody) .map(rawBody => { switch(req.headers["content-type"]) { case "application/json": case "application/javascript": return JSON.parse(rawBody); default: return rawBody.toString(); } }) .do(body => req.body=body) .do(body => console.log("request body: ", req.body, typeof req.body)) .map(body => conn) .catch(err => { console.log("Error caught: ", err); if(err instanceof SyntaxError) { res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } else { res.writeHead(500, { 'Content-Type': 'text/plain' }); res.end("Internal Server Error"); } return body_; }); return body_; }; // this is our trunkline subscription endpoint (sink) const _404 = { "next": ({res}) => { res.writeHead(404, { 'Content-Type': 'text/plain' }); res.end("Not Found"); }, "error": err => console.log("default server connection stream error: ", err), "complete": () => console.log("default server connection stream completed") }; // this is our index handler, this is where we'd deliver the root of a client web app const _index = { "next": ({res}) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end("Hello World\n"); }, "error": err => console.log("index server connection stream error: ", err), "complete": () => console.log("index server connection stream completed") }; // this our interaction tracking handler, all tracking requests (POST /interaction) should come here const _tracker = { "next": ({res}) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end('{ "ok": true }'); }, "error": err => console.log("`tracking` resource connection stream error: ", err), "complete": () => console.log("`tracking` resource connection stream completed") } // take a folder path and make a subscriber which will server matching files from that path // ideally also has a mechanism to re-emit connections into another stream const createStaticSubscriber = (dir) => { return { "next": (conn) => { let {req,res} = conn, pathname = __dirname + dir + (req.pathname=="/" ? "/index.html" : req.pathname); console.log("get static file at ", pathname); fs.readFile(pathname, (err,file) => { if(err) { if(err.code=="ENOENT") { // TODO - fix up this stuff here console.log("static file 404"); return _404.next(conn); } console.log("problem getting the file", err); res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end(err.message); return; } res.writeHead(200, { 'Content-Type': mime.lookup(pathname) }); res.end(file.toString()); }); }, "error": err => console.log(`${dir} static resource connection stream error: `, err), "complete": () => console.log(`${dir} static resource connection stream completed`) } }; // this actually creates our server stream and sets it up to share the events const server_ = Rx.Observable .onErrorResumeNext( createHttpRxStream(http,8000) .do(urlParser) .do(logger) .flatMap(bodyParser)) .share(); // Rx Routing // take a trunk stream and a dictionary of branching predicate functions // return a matching dictionary of branch streams which produce events from the trunk stream if passing the predicate // adds a default branch to the returned dictionary which produces all the events that matched none of the predicates // to consider, adding some kind of "break" or stopPropagation functionality to stop the event if it matches const branchStream = (trunk$,cases) => { let branches = {}, branchList = []; Object.keys(cases) .forEach(k=>{ let predicate = cases[k]; branch = new Rx.Subject(); branches[k] = branch; branchList.push([predicate,branch]); }); branches.default = new Rx.Subject(); trunk$.subscribe({ next: (e)=>{ let gutter = true; branchList.forEach(([predicate,branch])=>{ if(predicate(e)) { branch.next(e); gutter=false; } }); if(gutter) { branches.default.next(e); } }, error: err=>console.error(err), complete: ()=>{ branchList.forEach(([predicate,branch])=>branch.complete()); } }); return branches; } const routes = { "getAll$": ({req})=>(req.method=="GET"), "postInteraction$": ({req})=>(req.pathname=="/interaction"&&req.method=="POST") }, Router = branchStream(server_,routes); // console.log("our router!", Router); // GET static subscription // should handle all requests in this stream with a static file or 404 // need to consider various test cases for propagating events that match multiple routes Router.getAll$.subscribe(createStaticSubscriber("/public")); // POST Interaction subscription // we should try to collect over a time interval and process all the interaction requests at once Router.postInteraction$.subscribe(_tracker); // Send everything that isn't routed somewhere to 404ville Router.default.subscribe(_404); ; -
Gooseus revised this gist
Nov 4, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -50,7 +50,7 @@ Rx.Observable.onErrorResumeNext( try { return JSON.parse(rawBody); } catch(e) { // I guess catching an error will stop the propagation of this event? res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } -
Gooseus revised this gist
Nov 4, 2016 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -50,6 +50,7 @@ Rx.Observable.onErrorResumeNext( try { return JSON.parse(rawBody); } catch(e) { // I guess catching an error will stop the propagation of this event? res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } -
Gooseus revised this gist
Nov 4, 2016 . 1 changed file with 60 additions and 60 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,76 +1,76 @@ // my first Rx nodejs server const Rx = require("rxjs/Rx"), http = require("http"); const fromReadableStream = stream => { stream.pause(); return Rx.Observable.create(observer => { let next = chunk => observer.next(chunk), complete = () => observer.complete(), error = err => observer.error(err); stream .on('data', next) .on('error', error) .on('end', complete) .resume(); return () => { stream.removeListener('data',next); stream.removeListener('error',error); stream.removeListener('end',complete); }; }).share(); }; // Observable HTTP Server Rx.Observable.onErrorResumeNext( Rx.Observable .create((observer) => { let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000); return () => server.close(); }) .do(({req}) => console.log("got a request", req.url)) // this is basically our bodyParser() middleware .flatMap((conn) => { let { req, res } = conn; if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn); // PRO - will allow subsequent handlers to have ready access to the body data // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data return fromReadableStream(req) .toArray() .map(chunks => Buffer.concat(chunks)) .do(rawBody => req.rawBody=rawBody) .map(rawBody => { switch(req.headers["content-type"]) { case "application/json": case "application/javascript": try { return JSON.parse(rawBody); } catch(e) { res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } default: return rawBody.toString(); } }) .do(body => req.body=body) .do(body => console.log("request body: ", req.body, typeof req.body)) .map(body => conn); }) ) .subscribe( ({res}) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end("Hello World\n"); }, // How the hell do I handle responding for my connection? // would be great to get the event that caused an error in the error handler, or is that wrong? (err) => console.error(err), () => console.log("server complete.") ) ; -
Gooseus revised this gist
Nov 4, 2016 . 1 changed file with 60 additions and 60 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,76 +1,76 @@ // my first Rx nodejs server const Rx = require("rxjs/Rx"), http = require("http"); const fromReadableStream = stream => { stream.pause(); return Rx.Observable.create(observer => { let next = chunk => observer.next(chunk), complete = () => observer.complete(), error = err => observer.error(err); stream .on('data', next) .on('error', error) .on('end', complete) .resume(); return () => { stream.removeListener('data',next); stream.removeListener('error',error); stream.removeListener('end',complete); }; }).share(); }; // Observable HTTP Server Rx.Observable.onErrorResumeNext( Rx.Observable .create((observer) => { let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000); return () => server.close(); }) .do(({req}) => console.log("got a request", req.url)) // this is basically our bodyParser() middleware .flatMap((conn) => { let { req, res } = conn; if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn); // PRO - will allow subsequent handlers to have ready access to the body data // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data return fromReadableStream(req) .toArray() .map(chunks => Buffer.concat(chunks)) .do(rawBody => req.rawBody=rawBody) .map(rawBody => { switch(req.headers["content-type"]) { case "application/json": case "application/javascript": try { return JSON.parse(rawBody); } catch(e) { res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } default: return rawBody.toString(); } }) .do(body => req.body=body) .do(body => console.log("request body: ", req.body, typeof req.body)) .map(body => conn); }) ) .subscribe( ({res}) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end("Hello World\n"); }, // How the hell do I handle responding for my connection? // would be great to get the event that caused an error in the error handler, or is that wrong? (err) => console.error(err), () => console.log("server complete.") ) ; -
Gooseus created this gist
Nov 4, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,76 @@ // my first Rx nodejs server const Rx = require("rxjs/Rx"), http = require("http"); const fromReadableStream = stream => { stream.pause(); return Rx.Observable.create(observer => { let next = chunk => observer.next(chunk), complete = () => observer.complete(), error = err => observer.error(err); stream .on('data', next) .on('error', error) .on('end', complete) .resume(); return () => { stream.removeListener('data',next); stream.removeListener('error',error); stream.removeListener('end',complete); }; }).share(); }; // Observable HTTP Server Rx.Observable.onErrorResumeNext( Rx.Observable .create((observer) => { let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000); return () => server.close(); }) .do(({req}) => console.log("got a request", req.url)) // this is basically our bodyParser() middleware .flatMap((evt) => { let { req, res } = evt; if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(evt); // PRO - will allow subsequent handlers to have ready access to the body data // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data return fromReadableStream(req) .toArray() .map(chunks => Buffer.concat(chunks)) .do(rawBody => req.rawBody=rawBody) .map(rawBody => { switch(req.headers["content-type"]) { case "application/json": case "application/javascript": try { return JSON.parse(rawBody); } catch(e) { res.writeHead(400, { 'Content-Type': 'text/plain' }); res.end("Bad JSON"); } default: return rawBody.toString(); } }) .do(body => req.body=body) .do(body => console.log("request body: ", req.body, typeof req.body)) .map(body => evt); }) ) .subscribe( ({res}) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end("Hello World\n"); }, // How the hell do I handle responding for my connection? // would be great to get the event that caused an error in the error handler, or is that wrong? (err) => console.error(err), () => console.log("server complete.") ) ;