/** * nanoWS * A very basic websocket client. Its does not fully cover rfc6455 so * it must not used for any real world work. I wrote it to find out * how websockets actually worked. * * @licence MIT * @author Jewel Mahanta */ const crypto = require("crypto"); const { URL } = require("url"); const https = require("https"); const http = require("http"); const { EventEmitter } = require("events"); // const zlib = require("zlib"); // const { StringDecoder } = require("string_decoder"); // ERRORS class ProtocolError extends Error { constructor() { super(); this.name = "ProtocolError"; this.message = "only https, http, wss and ws are supported"; } } /////////////////////// // OPCODE Reference // /////////////////////// // prettier-ignore const OPCODE = { CONTINUE: 0x0, TEXT : 0x1, BINARY : 0x2, CLOSE : 0x8, PING : 0x9, PONG : 0xa }; const SOCKET_STATE = { CONNECTING: 0, OPEN: 1, CLOSING: 2, CLOSED: 3 }; // prettier-ignore /** * Decodes a websocket data frame header (byte 1 and byte 2) according * to [rfc6455](https://tools.ietf.org/html/rfc6455#section-5.2) * @param {Buffer} header - The first 2 bytes of frame */ const getFrameInfo = (header) => { // BYTE 1 const FIN = header[0] & 0b10000000; const RESV1 = header[0] & 0b01000000; /* no-use */ const RESV2 = header[0] & 0b00100000; /* no-use */ const RESV3 = header[0] & 0b00010000; /* no-use */ const OPCODE = header[0] & 0b00001111; // BYTE 2 const MASK = header[1] & 0b10000000; const LEN = header[1] & 0b01111111; return { FIN: !!FIN, OPCODE, MASK: !!MASK, LEN }; }; /** * Payload masking according to * [rfc6455](https://tools.ietf.org/html/rfc6455#section-5.3) * @param {Buffer} payload */ const maskPayload = payload => { const maskingKey = crypto.randomBytes(4); const maskedPayload = Buffer.alloc(payload.length); for (let i = 0; i < payload.length; i++) { const j = i % 4; maskedPayload[i] = payload[i] ^ maskingKey[j]; } return { payload: maskedPayload, key: maskingKey }; }; /** * @param {String | Buffer} data * @param {OPCODE} type * @returns {Buffer} * @todo add support for length > 65535 */ const encodeFrame = (data, type) => { // if (type === OPCODE.TEXT); // else if (type === OPCODE.BINARY); // else throw new TypeError(); // Buffer size (in bytes): // 2 for Headers (fin, opcode, mask, len) // :then maybe one of: // - 2 for extended length if 125 < len <= 65535 // - 8 for extended length if len > 65535 // 4 for MASKING KEY // rest is for payload length if (data.length <= 0x7d /* 125 */) { const _b = Buffer.alloc(2); _b[0] = 0b10000000 | type; _b[1] = 0b10000000 | data.length; const masked = maskPayload(Buffer.from(data)); return Buffer.concat( [_b, masked.key, masked.payload], 2 + 4 + data.length ); } else if (data.length <= 0xffff /* 65535 */) { const _b = Buffer.alloc(2 + 2); _b[0] = 0b10000001; _b[1] = 0b11111110; // LEN must be 126 _b[2] = (0b1111111100000000 & data.length) >> 8; _b[3] = 0b0000000011111111 & data.length; const masked = maskPayload(Buffer.from(data)); return Buffer.concat( [_b, masked.key, masked.payload], 2 + 2 + 4 + data.length ); } else { throw new RangeError("Not equipped to handled > 65535 length"); } }; class WebSocket extends EventEmitter { /** * @param {String} url the websocket url * @param {Boolean} [debug=false] log raw socket events */ constructor(url, debug = false) { const _url = new URL(url); if (!["https:", "http:", "wss:", "ws:"].includes(_url.protocol)) throw new ProtocolError(); if (_url.protocol === "wss:") _url.protocol = "https:"; if (_url.protocol === "ws:") _url.protocol = "http:"; super(); this.url = _url; this.debug = debug; this._socket = undefined; this.httpModule = _url.protocol === "https:" || _url.protocol === "wss:" ? https : http; this.SOCKET_STATE = "CONNECTING"; this.getSocket(); // @experimental // this.finBytes = 0; this._processing = false; this._buffers = []; this._remaining = Buffer.alloc(0); this._bufferedBytes = 0; // current frame. Why instance variable? // because a frame can span across chunks. this._frame = {}; } static getHTTPHeaders() { return { headers: { Upgrade: "websocket", Connection: "Upgrade", "Sec-WebSocket-Key": crypto.randomBytes(16).toString("hex"), "Sec-WebSocket-Version": 13 } }; } /** * A websocket starts off as a normal HTTP request. This is the * handshake part. Once our connection is upgraded, the actual * data transfer can start. * * NOTE: not chaining request coz the stack trace gets a bit messy. */ getSocket() { const request = this.httpModule.request( this.url, WebSocket.getHTTPHeaders() ); request.on("response", res => { if (this.debug) console.log("::response::"); res.pipe(process.stdout); }); // This is what we are interested in. Remember // socket (net.Socket) is a Duplex stream. request.on("upgrade", (res, socket, head) => { if (this.debug) console.log("::upgrade::"); if (this.debug) console.log("\x1b[35m%s\x1b[0m", "::head::", head); this._socket = socket; this.attachSocketEvents(); this.SOCKET_STATE = "OPEN"; this.emit("open"); if (head.length > 0) { this._buffers.push(head); this._bufferedBytes += head.length; this.processBuffers(); } }); request.on("error", e => { throw new Error(e); }); request.end(); } /** * We finally got our socket. Cool! Lets add a few listeners to make * the socket actually useful. * @see handleClose */ attachSocketEvents() { this._socket.on("data", chunk => { if (this.debug) console.log("\x1b[35m%s\x1b[0m", "::raw::", chunk); this._buffers.push(chunk); this._bufferedBytes += chunk.length; this.processBuffers(); }); this._socket.on("close", () => { if (this.debug) console.log("\x1b[31m%s\x1b[0m", "::closed::"); this.SOCKET_STATE = "CLOSED"; this.emit("close"); }); } getBytes(n) { if (this.debug) console.log("\x1b[31m%s\x1b[0m", `::GET:: ${n} bytes`); if (this._bufferedBytes === 0) return false; while (this._remaining.length < n) { if (this._buffers.length === 0) return false; const shifted = this._buffers.shift(); // prettier-ignore if (this.debug) console.log("\x1b[35m::FILL REMAINING::", this._remaining, shifted, "\x1b[0m"); this._remaining = Buffer.concat( [this._remaining, shifted], this._remaining.length + shifted.length ); } this._bufferedBytes -= n; const bytes = this._remaining.slice(0, n); this._remaining = this._remaining.slice(n); return bytes; } processBuffers() { if (this._processing) return; if (this.debug) console.log("\x1b[32m%s\x1b[0m", "::processing frames::"); let bytes; while (true) { /////////////////////// // Get Frame Headers // /////////////////////// // const frame = getFrameInfo(chunk.slice(offset, offset + 2)); // NOTE: FIN because it is part of headers if (!this._frame.hasOwnProperty("FIN")) { if (!(bytes = this.getBytes(2))) break; this._frame = getFrameInfo(bytes); if (this.debug) console.log("HEAD", this._frame); // Control Frames have PAYLOAD_LENGTH = 0 if (this._frame.LEN === 0) { this.handleFrame(this._frame); this._frame = {}; // reset frame continue; } } ////////////////////////////// // Calculate Payload Length // ////////////////////////////// if (!this._frame.hasOwnProperty("PAYLOAD_LENGTH")) { if (this._frame.LEN <= 125) { this._frame.PAYLOAD_LENGTH = this._frame.LEN; } else if (this._frame.LEN === 126) { if (!(bytes = this.getBytes(2))) break; this._frame.PAYLOAD_LENGTH = (bytes[0] << 8) | bytes[1]; } else { // prettier-ignore // read the next 64 bits (8bytes) as an unsigned int // payloadLength = // (chunk[offset + 2] << 56) | (chunk[offset + 3] << 48) | // (chunk[offset + 4] << 40) | (chunk[offset + 5] << 32) | // (chunk[offset + 6] << 24) | (chunk[offset + 7] << 16) | // (chunk[offset + 8] << 8) | chunk[offset + 9]; // payloadOffset += 8; } if (this.debug) console.log("LEN", this._frame); } ////////////////////////////////////////// // Add Payload information and dispatch // ////////////////////////////////////////// if (!this._frame.hasOwnProperty("PAYLOAD")) { if (!(bytes = this.getBytes(this._frame.PAYLOAD_LENGTH))) break; this._frame.PAYLOAD = bytes; this.handleFrame(this._frame); this._frame = {}; // reset frame } } this._processing = false; } handleFrame(frame) { if (this.debug) console.log("\x1b[34m%s\x1b[0m", "::FRAME::", frame); // if (!frame.FIN) this.finBytes += frame.PAYLOAD_LENGTH; // console.log("::finbytes::", this.finBytes); switch (frame.OPCODE) { // NOTE: StringDecoder will be useful in Continue case OPCODE.TEXT: this.handleTextData(frame.PAYLOAD.toString()); break; case OPCODE.CLOSE: this.handleClose(); break; case OPCODE.PING: console.log("::PING::"); // TODO: https://tools.ietf.org/html/rfc6455#section-5.5.2 break; } } handleTextData(text) { if (this.debug) console.log("\x1b[33m%s\x1b[0m", "::emit message::", text); this.emit("message", text); } handleClose() { // For now lets close the connection by sending // // ^^^^ if (this.debug) console.log("\x1b[31m%s\x1b[0m", "::closing:: 0x8"); this.SOCKET_STATE = "CLOSING"; this._socket.write(Buffer.from([0b10001000, 0b00000000], 2)); } /** * @param {String} data * @todo add support for binary data */ send(data) { if (this.SOCKET_STATE !== "OPEN") throw new Error("Websocket needs to be open."); if (typeof data !== "string") throw new TypeError("data must be string"); this._socket.write(encodeFrame(data, OPCODE.TEXT)); } close() { this.handleClose(); } } module.exports = WebSocket;