From 782e3f690ffb9ee0dd89a5a64a3f2b753899719b Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Wed, 15 May 2019 04:19:12 +0900 Subject: [PATCH] feat: ws client (#394) --- examples/ws.ts | 39 ------- io/bufio.ts | 10 ++ ws/example_client.ts | 55 ++++++++++ ws/example_server.ts | 66 ++++++++++++ ws/mod.ts | 249 ++++++++++++++++++++++++++++++------------- ws/test.ts | 52 ++++++--- 6 files changed, 347 insertions(+), 124 deletions(-) delete mode 100644 examples/ws.ts create mode 100644 ws/example_client.ts create mode 100644 ws/example_server.ts diff --git a/examples/ws.ts b/examples/ws.ts deleted file mode 100644 index f5965b7eb4..0000000000 --- a/examples/ws.ts +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { serve } from "https://deno.land/std/http/mod.ts"; -import { - acceptWebSocket, - isWebSocketCloseEvent, - isWebSocketPingEvent -} from "https://deno.land/std/ws/mod.ts"; - -async function main(): Promise { - console.log("websocket server is running on 0.0.0.0:8080"); - for await (const req of serve("0.0.0.0:8080")) { - if (req.url === "/ws") { - (async (): Promise => { - const sock = await acceptWebSocket(req); - console.log("socket connected!"); - for await (const ev of sock.receive()) { - if (typeof ev === "string") { - // text message - console.log("ws:Text", ev); - await sock.send(ev); - } else if (ev instanceof Uint8Array) { - // binary message - console.log("ws:Binary", ev); - } else if (isWebSocketPingEvent(ev)) { - const [, body] = ev; - // ping - console.log("ws:Ping", body); - } else if (isWebSocketCloseEvent(ev)) { - // close - const { code, reason } = ev; - console.log("ws:Close", code, reason); - } - } - })(); - } - } -} - -main(); diff --git a/io/bufio.ts b/io/bufio.ts index f057952fe6..749a7e8faf 100644 --- a/io/bufio.ts +++ b/io/bufio.ts @@ -33,6 +33,11 @@ export class BufReader implements Reader { private lastCharSize: number; private err: BufState; + /** return new BufReader unless r is BufReader */ + static create(r: Reader, size = DEFAULT_BUF_SIZE): BufReader { + return r instanceof BufReader ? r : new BufReader(r, size); + } + constructor(rd: Reader, size = DEFAULT_BUF_SIZE) { if (size < MIN_BUF_SIZE) { size = MIN_BUF_SIZE; @@ -368,6 +373,11 @@ export class BufWriter implements Writer { n: number = 0; err: null | BufState = null; + /** return new BufWriter unless w is BufWriter */ + static create(w: Writer, size = DEFAULT_BUF_SIZE): BufWriter { + return w instanceof BufWriter ? w : new BufWriter(w, size); + } + constructor(private wr: Writer, size = DEFAULT_BUF_SIZE) { if (size <= 0) { size = DEFAULT_BUF_SIZE; diff --git a/ws/example_client.ts b/ws/example_client.ts new file mode 100644 index 0000000000..16f37d0219 --- /dev/null +++ b/ws/example_client.ts @@ -0,0 +1,55 @@ +import { + connectWebSocket, + isWebSocketCloseEvent, + isWebSocketPingEvent, + isWebSocketPongEvent +} from "../ws/mod.ts"; +import { encode } from "../strings/strings.ts"; +import { BufReader } from "../io/bufio.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; +import { blue, green, red, yellow } from "../colors/mod.ts"; + +const endpoint = Deno.args[1] || "ws://127.0.0.1:8080"; +/** simple websocket cli */ +async function main(): Promise { + const sock = await connectWebSocket(endpoint); + console.log(green("ws connected! (type 'close' to quit)")); + (async function(): Promise { + for await (const msg of sock.receive()) { + if (typeof msg === "string") { + console.log(yellow("< " + msg)); + } else if (isWebSocketPingEvent(msg)) { + console.log(blue("< ping")); + } else if (isWebSocketPongEvent(msg)) { + console.log(blue("< pong")); + } else if (isWebSocketCloseEvent(msg)) { + console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`)); + } + } + })(); + const tpr = new TextProtoReader(new BufReader(Deno.stdin)); + while (true) { + await Deno.stdout.write(encode("> ")); + const [line, err] = await tpr.readLine(); + if (err) { + console.error(red(`failed to read line from stdin: ${err}`)); + break; + } + if (line === "close") { + break; + } else if (line === "ping") { + await sock.ping(); + } else { + await sock.send(line); + } + // FIXME: Without this, sock.receive() won't resolved though it is readable... + await new Promise((resolve): void => setTimeout(resolve, 0)); + } + await sock.close(1000); + // FIXME: conn.close() won't shutdown process... + Deno.exit(0); +} + +if (import.meta.main) { + main(); +} diff --git a/ws/example_server.ts b/ws/example_server.ts new file mode 100644 index 0000000000..cd51ff94c7 --- /dev/null +++ b/ws/example_server.ts @@ -0,0 +1,66 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { serve } from "../http/server.ts"; +import { + acceptWebSocket, + isWebSocketCloseEvent, + isWebSocketPingEvent, + WebSocket +} from "./mod.ts"; + +/** websocket echo server */ +const port = Deno.args[1] || "8080"; +async function main(): Promise { + console.log(`websocket server is running on :${port}`); + for await (const req of serve(`:${port}`)) { + const { headers, conn } = req; + acceptWebSocket({ + conn, + headers, + bufReader: req.r, + bufWriter: req.w + }) + .then( + async (sock: WebSocket): Promise => { + console.log("socket connected!"); + const it = sock.receive(); + while (true) { + try { + const { done, value } = await it.next(); + if (done) { + break; + } + const ev = value; + if (typeof ev === "string") { + // text message + console.log("ws:Text", ev); + await sock.send(ev); + } else if (ev instanceof Uint8Array) { + // binary message + console.log("ws:Binary", ev); + } else if (isWebSocketPingEvent(ev)) { + const [, body] = ev; + // ping + console.log("ws:Ping", body); + } else if (isWebSocketCloseEvent(ev)) { + // close + const { code, reason } = ev; + console.log("ws:Close", code, reason); + } + } catch (e) { + console.error(`failed to receive frame: ${e}`); + await sock.close(1000).catch(console.error); + } + } + } + ) + .catch( + (err: Error): void => { + console.error(`failed to accept websocket: ${err}`); + } + ); + } +} + +if (import.meta.main) { + main(); +} diff --git a/ws/mod.ts b/ws/mod.ts index 65dc142ca9..3e20e2d3b4 100644 --- a/ws/mod.ts +++ b/ws/mod.ts @@ -1,11 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -const { Buffer } = Deno; + +import { decode, encode } from "../strings/strings.ts"; + type Conn = Deno.Conn; type Writer = Deno.Writer; import { BufReader, BufWriter } from "../io/bufio.ts"; import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { Sha1 } from "./sha1.ts"; import { writeResponse } from "../http/server.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; export enum OpCode { Continue = 0x0, @@ -70,13 +73,19 @@ export interface WebSocketFrame { } export interface WebSocket { + readonly conn: Conn; readonly isClosed: boolean; + receive(): AsyncIterableIterator; + send(data: WebSocketMessage): Promise; + ping(data?: WebSocketMessage): Promise; + close(code: number, reason?: string): Promise; } +/** Unmask masked websocket payload */ export function unmask(payload: Uint8Array, mask?: Uint8Array): void { if (mask) { for (let i = 0, len = payload.length; i < len; i++) { @@ -85,6 +94,7 @@ export function unmask(payload: Uint8Array, mask?: Uint8Array): void { } } +/** Write websocket frame to given writer */ export async function writeFrame( frame: WebSocketFrame, writer: Writer @@ -92,6 +102,11 @@ export async function writeFrame( const payloadLength = frame.payload.byteLength; let header: Uint8Array; const hasMask = frame.mask ? 0x80 : 0; + if (frame.mask && frame.mask.byteLength !== 4) { + throw new Error( + "invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength + ); + } if (payloadLength < 126) { header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]); } else if (payloadLength < 0xffff) { @@ -108,13 +123,18 @@ export async function writeFrame( ...sliceLongToBytes(payloadLength) ]); } + if (frame.mask) { + header = append(header, frame.mask); + } unmask(frame.payload, frame.mask); - const bytes = append(header, frame.payload); - const w = new BufWriter(writer); - await w.write(bytes); - await w.flush(); + header = append(header, frame.payload); + const w = BufWriter.create(writer); + await w.write(header); + const err = await w.flush(); + if (err) throw err; } +/** Read websocket frame from given BufReader */ export async function readFrame(buf: BufReader): Promise { let b = await buf.readByte(); let isLastFrame = false; @@ -155,62 +175,38 @@ export async function readFrame(buf: BufReader): Promise { }; } -export async function* receiveFrame( - conn: Conn -): AsyncIterableIterator { - let receiving = true; - const isLastFrame = true; - const reader = new BufReader(conn); - while (receiving) { - const frame = await readFrame(reader); - const { opcode, payload } = frame; - switch (opcode) { - case OpCode.TextFrame: - case OpCode.BinaryFrame: - case OpCode.Continue: - yield frame; - break; - case OpCode.Close: - await writeFrame( - { - opcode, - payload, - isLastFrame - }, - conn - ); - conn.close(); - yield frame; - receiving = false; - break; - case OpCode.Ping: - await writeFrame( - { - payload, - isLastFrame, - opcode: OpCode.Pong - }, - conn - ); - yield frame; - break; - case OpCode.Pong: - yield frame; - break; - default: - } - } +// Create client-to-server mask, random 32bit number +function createMask(): Uint8Array { + // TODO: use secure and immutable random function. Crypto.getRandomValues() + const arr = Array.from({ length: 4 }).map( + (): number => Math.round(Math.random() * 0xff) + ); + return new Uint8Array(arr); } class WebSocketImpl implements WebSocket { - encoder = new TextEncoder(); + private readonly mask?: Uint8Array; + private readonly bufReader: BufReader; + private readonly bufWriter: BufWriter; - constructor(private conn: Conn, private mask?: Uint8Array) {} + constructor( + readonly conn: Conn, + opts: { + bufReader?: BufReader; + bufWriter?: BufWriter; + mask?: Uint8Array; + } = {} + ) { + this.mask = opts.mask || createMask(); + this.bufReader = opts.bufReader || new BufReader(conn); + this.bufWriter = opts.bufWriter || new BufWriter(conn); + } async *receive(): AsyncIterableIterator { let frames: WebSocketFrame[] = []; let payloadsLength = 0; - for await (const frame of receiveFrame(this.conn)) { + while (true) { + const frame = await readFrame(this.bufReader); unmask(frame.payload, frame.mask); switch (frame.opcode) { case OpCode.TextFrame: @@ -227,7 +223,7 @@ class WebSocketImpl implements WebSocket { } if (frames[0].opcode === OpCode.TextFrame) { // text - yield new Buffer(concat).toString(); + yield decode(concat); } else { // binary yield concat; @@ -237,14 +233,23 @@ class WebSocketImpl implements WebSocket { } break; case OpCode.Close: - const code = (frame.payload[0] << 16) | frame.payload[1]; - const reason = new Buffer( + // [0x12, 0x34] -> 0x1234 + const code = (frame.payload[0] << 8) | frame.payload[1]; + const reason = decode( frame.payload.subarray(2, frame.payload.length) - ).toString(); - this._isClosed = true; + ); + await this.close(code, reason); yield { code, reason }; return; case OpCode.Ping: + await writeFrame( + { + opcode: OpCode.Pong, + payload: frame.payload, + isLastFrame: true + }, + this.bufWriter + ); yield ["ping", frame.payload] as WebSocketPingEvent; break; case OpCode.Pong: @@ -261,7 +266,7 @@ class WebSocketImpl implements WebSocket { } const opcode = typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; - const payload = typeof data === "string" ? this.encoder.encode(data) : data; + const payload = typeof data === "string" ? encode(data) : data; const isLastFrame = true; await writeFrame( { @@ -270,20 +275,20 @@ class WebSocketImpl implements WebSocket { payload, mask: this.mask }, - this.conn + this.bufWriter ); } - async ping(data: WebSocketMessage): Promise { - const payload = typeof data === "string" ? this.encoder.encode(data) : data; + async ping(data: WebSocketMessage = ""): Promise { + const payload = typeof data === "string" ? encode(data) : data; await writeFrame( { isLastFrame: true, - opcode: OpCode.Close, + opcode: OpCode.Ping, mask: this.mask, payload }, - this.conn + this.bufWriter ); } @@ -297,7 +302,7 @@ class WebSocketImpl implements WebSocket { const header = [code >>> 8, code & 0x00ff]; let payload: Uint8Array; if (reason) { - const reasonBytes = this.encoder.encode(reason); + const reasonBytes = encode(reason); payload = new Uint8Array(2 + reasonBytes.byteLength); payload.set(header); payload.set(reasonBytes, 2); @@ -311,7 +316,7 @@ class WebSocketImpl implements WebSocket { mask: this.mask, payload }, - this.conn + this.bufWriter ); } catch (e) { throw e; @@ -320,11 +325,10 @@ class WebSocketImpl implements WebSocket { } } - private ensureSocketClosed(): Error { + private ensureSocketClosed(): void { if (this.isClosed) { return; } - try { this.conn.close(); } catch (e) { @@ -335,16 +339,20 @@ class WebSocketImpl implements WebSocket { } } +/** Return whether given headers is acceptable for websocket */ export function acceptable(req: { headers: Headers }): boolean { + const secKey = req.headers.get("sec-websocket-key"); return ( req.headers.get("upgrade") === "websocket" && req.headers.has("sec-websocket-key") && - req.headers.get("sec-websocket-key").length > 0 + typeof secKey === "string" && + secKey.length > 0 ); } const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; +/** Create sec-websocket-accept header value with given nonce */ export function createSecAccept(nonce: string): string { const sha1 = new Sha1(); sha1.update(nonce + kGUID); @@ -352,16 +360,22 @@ export function createSecAccept(nonce: string): string { return btoa(String.fromCharCode.apply(String, bytes)); } +/** Upgrade given TCP connection into websocket connection */ export async function acceptWebSocket(req: { conn: Conn; + bufWriter: BufWriter; + bufReader: BufReader; headers: Headers; }): Promise { - const { conn, headers } = req; + const { conn, headers, bufReader, bufWriter } = req; if (acceptable(req)) { - const sock = new WebSocketImpl(conn); + const sock = new WebSocketImpl(conn, { bufReader, bufWriter }); const secKey = headers.get("sec-websocket-key"); + if (typeof secKey !== "string") { + throw new Error("sec-websocket-key is not provided"); + } const secAccept = createSecAccept(secKey); - await writeResponse(conn, { + await writeResponse(bufWriter, { status: 101, headers: new Headers({ Upgrade: "websocket", @@ -373,3 +387,94 @@ export async function acceptWebSocket(req: { } throw new Error("request is not acceptable"); } + +const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_"; + +/** Create WebSocket-Sec-Key. Base64 encoded 16 bytes string */ +export function createSecKey(): string { + let key = ""; + for (let i = 0; i < 16; i++) { + const j = Math.round(Math.random() * kSecChars.length); + key += kSecChars[j]; + } + return btoa(key); +} + +/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */ +export async function connectWebSocket( + endpoint: string, + headers: Headers = new Headers() +): Promise { + const url = new URL(endpoint); + const { hostname, pathname, searchParams } = url; + let port = url.port; + if (!url.port) { + if (url.protocol === "http" || url.protocol === "ws") { + port = "80"; + } else if (url.protocol === "https" || url.protocol === "wss") { + throw new Error("currently https/wss is not supported"); + } + } + const conn = await Deno.dial("tcp", `${hostname}:${port}`); + const abortHandshake = (err: Error): void => { + conn.close(); + throw err; + }; + const bufWriter = new BufWriter(conn); + const bufReader = new BufReader(conn); + await bufWriter.write( + encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`) + ); + const key = createSecKey(); + if (!headers.has("host")) { + headers.set("host", hostname); + } + headers.set("upgrade", "websocket"); + headers.set("connection", "upgrade"); + headers.set("sec-websocket-key", key); + let headerStr = ""; + for (const [key, value] of headers) { + headerStr += `${key}: ${value}\r\n`; + } + headerStr += "\r\n"; + await bufWriter.write(encode(headerStr)); + let err, statusLine, responseHeaders; + err = await bufWriter.flush(); + if (err) { + throw new Error("ws: failed to send handshake: " + err); + } + const tpReader = new TextProtoReader(bufReader); + [statusLine, err] = await tpReader.readLine(); + if (err) { + abortHandshake(new Error("ws: failed to read status line: " + err)); + } + const m = statusLine.match(/^(.+?) (.+?) (.+?)$/); + if (!m) { + abortHandshake(new Error("ws: invalid status line: " + statusLine)); + } + const [_, version, statusCode] = m; + if (version !== "HTTP/1.1" || statusCode !== "101") { + abortHandshake( + new Error( + `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}` + ) + ); + } + [responseHeaders, err] = await tpReader.readMIMEHeader(); + if (err) { + abortHandshake(new Error("ws: failed to parse response headers: " + err)); + } + const expectedSecAccept = createSecAccept(key); + const secAccept = responseHeaders.get("sec-websocket-accept"); + if (secAccept !== expectedSecAccept) { + abortHandshake( + new Error( + `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}` + ) + ); + } + return new WebSocketImpl(conn, { + bufWriter, + bufReader + }); +} diff --git a/ws/test.ts b/ws/test.ts index 5d0cc90934..7b0bd69499 100644 --- a/ws/test.ts +++ b/ws/test.ts @@ -1,19 +1,21 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import "./sha1_test.ts"; - -const { Buffer } = Deno; import { BufReader } from "../io/bufio.ts"; import { assert, assertEquals } from "../testing/asserts.ts"; -import { test } from "../testing/mod.ts"; +import { runIfMain, test } from "../testing/mod.ts"; import { acceptable, createSecAccept, OpCode, readFrame, - unmask + unmask, + writeFrame } from "./mod.ts"; +import { encode } from "../strings/strings.ts"; -test(async function testReadUnmaskedTextFrame(): Promise { +const { Buffer } = Deno; + +test(async function wsReadUnmaskedTextFrame(): Promise { // unmasked single text frame with payload "Hello" const buf = new BufReader( new Buffer(new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) @@ -25,7 +27,7 @@ test(async function testReadUnmaskedTextFrame(): Promise { assertEquals(frame.isLastFrame, true); }); -test(async function testReadMakedTextFrame(): Promise { +test(async function wsReadMaskedTextFrame(): Promise { //a masked single text frame with payload "Hello" const buf = new BufReader( new Buffer( @@ -52,7 +54,7 @@ test(async function testReadMakedTextFrame(): Promise { assertEquals(frame.isLastFrame, true); }); -test(async function testReadUnmaskedSplittedTextFrames(): Promise { +test(async function wsReadUnmaskedSplitTextFrames(): Promise { const buf1 = new BufReader( new Buffer(new Uint8Array([0x01, 0x03, 0x48, 0x65, 0x6c])) ); @@ -71,7 +73,7 @@ test(async function testReadUnmaskedSplittedTextFrames(): Promise { assertEquals(new Buffer(f2.payload).toString(), "lo"); }); -test(async function testReadUnmaksedPingPongFrame(): Promise { +test(async function wsReadUnmaskedPingPongFrame(): Promise { // unmasked ping with payload "Hello" const buf = new BufReader( new Buffer(new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) @@ -104,7 +106,7 @@ test(async function testReadUnmaksedPingPongFrame(): Promise { assertEquals(new Buffer(pong.payload).toString(), "Hello"); }); -test(async function testReadUnmaksedBigBinaryFrame(): Promise { +test(async function wsReadUnmaskedBigBinaryFrame(): Promise { const a = [0x82, 0x7e, 0x01, 0x00]; for (let i = 0; i < 256; i++) { a.push(i); @@ -117,7 +119,7 @@ test(async function testReadUnmaksedBigBinaryFrame(): Promise { assertEquals(bin.payload.length, 256); }); -test(async function testReadUnmaskedBigBigBinaryFrame(): Promise { +test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise { const a = [0x82, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00]; for (let i = 0; i < 0xffff; i++) { a.push(i); @@ -130,13 +132,13 @@ test(async function testReadUnmaskedBigBigBinaryFrame(): Promise { assertEquals(bin.payload.length, 0xffff + 1); }); -test(async function testCreateSecAccept(): Promise { +test(async function wsCreateSecAccept(): Promise { const nonce = "dGhlIHNhbXBsZSBub25jZQ=="; const d = createSecAccept(nonce); assertEquals(d, "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="); }); -test(function testAcceptable(): void { +test(function wsAcceptable(): void { const ret = acceptable({ headers: new Headers({ upgrade: "websocket", @@ -153,7 +155,7 @@ const invalidHeaders = [ { upgrade: "websocket", "sec-websocket-ky": "" } ]; -test(function testAcceptableInvalid(): void { +test(function wsAcceptableInvalid(): void { for (const pat of invalidHeaders) { const ret = acceptable({ headers: new Headers(pat) @@ -161,3 +163,27 @@ test(function testAcceptableInvalid(): void { assertEquals(ret, false); } }); + +test(async function wsWriteReadMaskedFrame(): Promise { + const mask = new Uint8Array([0, 1, 2, 3]); + const msg = "hello"; + const buf = new Buffer(); + const r = new BufReader(buf); + await writeFrame( + { + isLastFrame: true, + mask, + opcode: OpCode.TextFrame, + payload: encode(msg) + }, + buf + ); + const frame = await readFrame(r); + assertEquals(frame.opcode, OpCode.TextFrame); + assertEquals(frame.isLastFrame, true); + assertEquals(frame.mask, mask); + unmask(frame.payload, frame.mask); + assertEquals(frame.payload, encode(msg)); +}); + +runIfMain(import.meta);