Skip to content

Instantly share code, notes, and snippets.

@Gooseus
Last active March 13, 2020 06:07
Show Gist options
  • Select an option

  • Save Gooseus/ba5310f52a16f16650c2669f553e85a9 to your computer and use it in GitHub Desktop.

Select an option

Save Gooseus/ba5310f52a16f16650c2669f553e85a9 to your computer and use it in GitHub Desktop.

Revisions

  1. Gooseus revised this gist Nov 7, 2016. 1 changed file with 208 additions and 36 deletions.
    244 changes: 208 additions & 36 deletions rx-server.js
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,26 @@
    // my first Rx nodejs server
    // Rx HTTP Server
    // first attempt at creating a minimal reactive nodejs HTTP server
    //
    // has:
    // * url/querystring parsing
    // * body parsing
    // * some kind of routing
    // * some kind of error handling
    // * minimalist request logging
    //
    // has not:
    // * robust routing
    // * cookie parsing
    // * solid error handling
    // * file uploads

    const Rx = require("rxjs/Rx"),
    http = require("http");
    fs = require("fs"),
    http = require("http"),
    url = require("url"),
    mime = require("mime");

    // creates Observable from a nodejs ReadableStream
    const fromReadableStream = stream => {
    stream.pause();
    return Rx.Observable.create(observer => {
    @@ -23,55 +42,208 @@ const fromReadableStream = stream => {
    }).share();
    };

    // Observable HTTP Server
    Rx.Observable.onErrorResumeNext(
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);

    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((conn) => {
    let { req, res } = conn;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn);

    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    // creates an HTTP server observable
    // I wonder if this is a case where I should be considering doing some OOP stuff with a RxServer class or some nonesense... nah
    const createHttpRxStream = (http,port) => {
    return Rx.Observable
    .create(observer => {
    // create a http server that emits a connection event of the request and response objects
    const server = http.createServer((req,res) => observer.next({req,res})).listen(port);

    // close the server as our unsubscriber fn
    return server.close.bind(server);
    });
    };

    // url parsing middleware
    // add querystring object and other URL parsed data to the request object
    const urlParser = ({req}) => {
    const urlObj = url.parse(req.url,true);

    req.query = urlObj.query;
    req.hash = urlObj.hash;
    req.pathname = urlObj.pathname;
    req.search = urlObj.search;
    };

    // request logging middleware
    // log the incoming request data
    const logger = ({req}) => console.log(`${req.headers.host} - - ${req.method} ${req.headers['content-type'] || '-'} ${req.url} - ${req.headers['user-agent'] || '-'}`);

    // body parsing middleware
    // adds the `rawBody` buffer and the parsed `body` object/string to the request object
    // this returns an observable so it needs to be added to the stream using `flatMap` rather than `do` or regular `map`
    const bodyParser = (conn) => {
    let { req, res } = conn;
    if(req.method!="POST" && req.method != "PUT" && req.method != "PATCH") return Rx.Observable.of(conn);

    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    let body_ = fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    // I guess catching an error will stop the propagation of this event?
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    return JSON.parse(rawBody);
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => conn);
    })
    )
    .subscribe(
    ({res}) => {
    .map(body => conn)
    .catch(err => {
    console.log("Error caught: ", err);

    if(err instanceof SyntaxError) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    } else {
    res.writeHead(500, { 'Content-Type': 'text/plain' });
    res.end("Internal Server Error");
    }

    return body_;
    });

    return body_;
    };

    // this is our trunkline subscription endpoint (sink)
    const _404 = {
    "next": ({res}) => {
    res.writeHead(404, { 'Content-Type': 'text/plain' });
    res.end("Not Found");
    },
    "error": err => console.log("default server connection stream error: ", err),
    "complete": () => console.log("default server connection stream completed")
    };

    // this is our index handler, this is where we'd deliver the root of a client web app
    const _index = {
    "next": ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    )
    "error": err => console.log("index server connection stream error: ", err),
    "complete": () => console.log("index server connection stream completed")
    };

    // this our interaction tracking handler, all tracking requests (POST /interaction) should come here
    const _tracker = {
    "next": ({res}) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end('{ "ok": true }');
    },
    "error": err => console.log("`tracking` resource connection stream error: ", err),
    "complete": () => console.log("`tracking` resource connection stream completed")
    }

    // take a folder path and make a subscriber which will server matching files from that path
    // ideally also has a mechanism to re-emit connections into another stream
    const createStaticSubscriber = (dir) => {
    return {
    "next": (conn) => {
    let {req,res} = conn,
    pathname = __dirname + dir + (req.pathname=="/" ? "/index.html" : req.pathname);

    console.log("get static file at ", pathname);
    fs.readFile(pathname, (err,file) => {
    if(err) {
    if(err.code=="ENOENT") {
    // TODO - fix up this stuff here
    console.log("static file 404");
    return _404.next(conn);
    }

    console.log("problem getting the file", err);
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end(err.message);
    return;
    }

    res.writeHead(200, { 'Content-Type': mime.lookup(pathname) });
    res.end(file.toString());
    });
    },
    "error": err => console.log(`${dir} static resource connection stream error: `, err),
    "complete": () => console.log(`${dir} static resource connection stream completed`)
    }
    };

    // this actually creates our server stream and sets it up to share the events
    const server_ = Rx.Observable
    .onErrorResumeNext(
    createHttpRxStream(http,8000)
    .do(urlParser)
    .do(logger)
    .flatMap(bodyParser))
    .share();

    // Rx Routing
    // take a trunk stream and a dictionary of branching predicate functions
    // return a matching dictionary of branch streams which produce events from the trunk stream if passing the predicate
    // adds a default branch to the returned dictionary which produces all the events that matched none of the predicates
    // to consider, adding some kind of "break" or stopPropagation functionality to stop the event if it matches
    const branchStream = (trunk$,cases) => {
    let branches = {},
    branchList = [];

    Object.keys(cases)
    .forEach(k=>{
    let predicate = cases[k];
    branch = new Rx.Subject();

    branches[k] = branch;
    branchList.push([predicate,branch]);
    });

    branches.default = new Rx.Subject();

    trunk$.subscribe({
    next: (e)=>{
    let gutter = true;
    branchList.forEach(([predicate,branch])=>{
    if(predicate(e)) {
    branch.next(e);
    gutter=false;
    }
    });

    if(gutter) {
    branches.default.next(e);
    }
    },
    error: err=>console.error(err),
    complete: ()=>{
    branchList.forEach(([predicate,branch])=>branch.complete());
    }
    });

    return branches;
    }

    const routes = {
    "getAll$": ({req})=>(req.method=="GET"),
    "postInteraction$": ({req})=>(req.pathname=="/interaction"&&req.method=="POST")
    },
    Router = branchStream(server_,routes);

    // console.log("our router!", Router);

    // GET static subscription
    // should handle all requests in this stream with a static file or 404
    // need to consider various test cases for propagating events that match multiple routes
    Router.getAll$.subscribe(createStaticSubscriber("/public"));

    // POST Interaction subscription
    // we should try to collect over a time interval and process all the interaction requests at once
    Router.postInteraction$.subscribe(_tracker);

    // Send everything that isn't routed somewhere to 404ville
    Router.default.subscribe(_404);

    ;
  2. Gooseus revised this gist Nov 4, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion rx-server.js
    Original file line number Diff line number Diff line change
    @@ -50,7 +50,7 @@ Rx.Observable.onErrorResumeNext(
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    // I guess catching an error will stop the propagation of this event?
    // I guess catching an error will stop the propagation of this event?
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
  3. Gooseus revised this gist Nov 4, 2016. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions rx-server.js
    Original file line number Diff line number Diff line change
    @@ -50,6 +50,7 @@ Rx.Observable.onErrorResumeNext(
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    // I guess catching an error will stop the propagation of this event?
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
  4. Gooseus revised this gist Nov 4, 2016. 1 changed file with 60 additions and 60 deletions.
    120 changes: 60 additions & 60 deletions rx-server.js
    Original file line number Diff line number Diff line change
    @@ -1,76 +1,76 @@
    // my first Rx nodejs server
    const Rx = require("rxjs/Rx"),
    http = require("http");
    http = require("http");

    const fromReadableStream = stream => {
    stream.pause();
    return Rx.Observable.create(observer => {
    let next = chunk => observer.next(chunk),
    complete = () => observer.complete(),
    error = err => observer.error(err);
    stream
    .on('data', next)
    .on('error', error)
    .on('end', complete)
    .resume();
    stream.pause();
    return Rx.Observable.create(observer => {
    let next = chunk => observer.next(chunk),
    complete = () => observer.complete(),
    error = err => observer.error(err);
    stream
    .on('data', next)
    .on('error', error)
    .on('end', complete)
    .resume();

    return () => {
    stream.removeListener('data',next);
    stream.removeListener('error',error);
    stream.removeListener('end',complete);
    };
    }).share();
    return () => {
    stream.removeListener('data',next);
    stream.removeListener('error',error);
    stream.removeListener('end',complete);
    };
    }).share();
    };

    // Observable HTTP Server
    Rx.Observable.onErrorResumeNext(
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);

    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((conn) => {
    let { req, res } = conn;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn);
    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((conn) => {
    let { req, res } = conn;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn);

    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => conn);
    })
    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => conn);
    })
    )
    .subscribe(
    ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    )

    ;
  5. Gooseus revised this gist Nov 4, 2016. 1 changed file with 60 additions and 60 deletions.
    120 changes: 60 additions & 60 deletions rx-server.js
    Original file line number Diff line number Diff line change
    @@ -1,76 +1,76 @@
    // my first Rx nodejs server
    const Rx = require("rxjs/Rx"),
    http = require("http");
    http = require("http");

    const fromReadableStream = stream => {
    stream.pause();
    return Rx.Observable.create(observer => {
    let next = chunk => observer.next(chunk),
    complete = () => observer.complete(),
    error = err => observer.error(err);
    stream
    .on('data', next)
    .on('error', error)
    .on('end', complete)
    .resume();
    stream.pause();
    return Rx.Observable.create(observer => {
    let next = chunk => observer.next(chunk),
    complete = () => observer.complete(),
    error = err => observer.error(err);
    stream
    .on('data', next)
    .on('error', error)
    .on('end', complete)
    .resume();

    return () => {
    stream.removeListener('data',next);
    stream.removeListener('error',error);
    stream.removeListener('end',complete);
    };
    }).share();
    return () => {
    stream.removeListener('data',next);
    stream.removeListener('error',error);
    stream.removeListener('end',complete);
    };
    }).share();
    };

    // Observable HTTP Server
    Rx.Observable.onErrorResumeNext(
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);

    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((evt) => {
    let { req, res } = evt;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(evt);
    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((conn) => {
    let { req, res } = conn;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(conn);

    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => evt);
    })
    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => conn);
    })
    )
    .subscribe(
    ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    )

    ;
  6. Gooseus created this gist Nov 4, 2016.
    76 changes: 76 additions & 0 deletions rx-server.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    // my first Rx nodejs server
    const Rx = require("rxjs/Rx"),
    http = require("http");

    const fromReadableStream = stream => {
    stream.pause();
    return Rx.Observable.create(observer => {
    let next = chunk => observer.next(chunk),
    complete = () => observer.complete(),
    error = err => observer.error(err);

    stream
    .on('data', next)
    .on('error', error)
    .on('end', complete)
    .resume();

    return () => {
    stream.removeListener('data',next);
    stream.removeListener('error',error);
    stream.removeListener('end',complete);
    };
    }).share();
    };

    // Observable HTTP Server
    Rx.Observable.onErrorResumeNext(
    Rx.Observable
    .create((observer) => {
    let server = http.createServer((req,res) => observer.next({ req, res })).listen(8000);

    return () => server.close();
    })
    .do(({req}) => console.log("got a request", req.url))
    // this is basically our bodyParser() middleware
    .flatMap((evt) => {
    let { req, res } = evt;
    if(req.method!="POST" && req.method != "PUT") return Rx.Observable.of(evt);

    // PRO - will allow subsequent handlers to have ready access to the body data
    // CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
    return fromReadableStream(req)
    .toArray()
    .map(chunks => Buffer.concat(chunks))
    .do(rawBody => req.rawBody=rawBody)
    .map(rawBody => {
    switch(req.headers["content-type"]) {
    case "application/json":
    case "application/javascript":
    try {
    return JSON.parse(rawBody);
    } catch(e) {
    res.writeHead(400, { 'Content-Type': 'text/plain' });
    res.end("Bad JSON");
    }
    default:
    return rawBody.toString();
    }
    })
    .do(body => req.body=body)
    .do(body => console.log("request body: ", req.body, typeof req.body))
    .map(body => evt);
    })
    )
    .subscribe(
    ({res}) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    res.end("Hello World\n");
    },
    // How the hell do I handle responding for my connection?
    // would be great to get the event that caused an error in the error handler, or is that wrong?
    (err) => console.error(err),
    () => console.log("server complete.")
    )

    ;