From 8ec36681dd4abac346551a54cdd6870ed8e864f5 Mon Sep 17 00:00:00 2001 From: Andrey Trebler Date: Thu, 30 Apr 2020 09:09:48 +0200 Subject: [PATCH] feat: Make WebSocket Reader/Writer (#5001) (#5002) --- std/ws/mod.ts | 24 +++++++++- std/ws/test.ts | 120 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/std/ws/mod.ts b/std/ws/mod.ts index 7e444f274b..2a18cded7d 100644 --- a/std/ws/mod.ts +++ b/std/ws/mod.ts @@ -10,8 +10,10 @@ import { TextProtoReader } from "../textproto/mod.ts"; import { Deferred, deferred } from "../util/async.ts"; import { assert } from "../testing/asserts.ts"; import { concat } from "../bytes/mod.ts"; +import { copyBytes } from "../io/util.ts"; import Conn = Deno.Conn; import Writer = Deno.Writer; +import Reader = Deno.Reader; export enum OpCode { Continue = 0x0, @@ -65,7 +67,7 @@ export interface WebSocketFrame { payload: Uint8Array; } -export interface WebSocket { +export interface WebSocket extends Reader, Writer { readonly conn: Conn; readonly isClosed: boolean; @@ -327,6 +329,26 @@ class WebSocketImpl implements WebSocket { return this.enqueue(frame); } + async write(p: Uint8Array): Promise { + await this.send(p); + + return p.byteLength; + } + + async read(p: Uint8Array): Promise { + for await (const ev of this.receive()) { + if (ev instanceof Uint8Array) { + return copyBytes(p, ev); + } + + if (typeof ev === "string") { + return copyBytes(p, encode(ev)); + } + } + + return null; + } + ping(data: WebSocketMessage = ""): Promise { const payload = typeof data === "string" ? encode(data) : data; const frame = { diff --git a/std/ws/test.ts b/std/ws/test.ts index 796bf3bb81..c59202e89c 100644 --- a/std/ws/test.ts +++ b/std/ws/test.ts @@ -403,3 +403,123 @@ test({ await delay(10); }, }); + +test("[ws] WebSocket should implement Writer", async () => { + const buf = new Buffer(); + + const conn = dummyConn(buf, buf); + const sock = createWebSocket({ conn }); + + const [write0, write1, write2] = await Promise.all([ + sock.write(new Uint8Array([1, 2, 3])), + sock.write(new Uint8Array([])), + sock.write(new Uint8Array([0])), + ]); + + assertEquals(write0, 3); + assertEquals(write1, 0); + assertEquals(write2, 1); +}); + +test("[ws] WebSocket should implement Reader", async () => { + const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); + + const bufHello = new Buffer(hello); + + const conn = dummyConn(bufHello, new Buffer()); + const sock = createWebSocket({ conn }); + + const p = new Uint8Array(100); + const read = await sock.read(p); + const readLast = await sock.read(p); + + const helloLength = "Hello".length; + + assertEquals(read, helloLength); + assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello"); + assertEquals(readLast, null); +}); + +test("[ws] WebSocket Reader should ignore non-message frames", async () => { + const pong = new Uint8Array([ + 0x8a, + 0x85, + 0x37, + 0xfa, + 0x21, + 0x3d, + 0x7f, + 0x9f, + 0x4d, + 0x51, + 0x58, + ]); + const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); + const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); + const close = new Uint8Array([0x88]); + + const dataPayloadLength = 0x100; + const dataArr = [0x82, 0x7e, 0x01, 0x00]; + for (let i = 0; i < dataPayloadLength; i++) { + dataArr.push(i); + } + const data = new Uint8Array(dataArr); + + enum Frames { + ping, + text, + pong, + data, + close, + end, + } + + let frame = Frames.ping; + + const reader: Reader = { + read(p: Uint8Array): Promise { + if (frame === Frames.ping) { + frame = Frames.text; + p.set(pingHello); + return Promise.resolve(pingHello.byteLength); + } + + if (frame === Frames.text) { + frame = Frames.pong; + p.set(hello); + return Promise.resolve(hello.byteLength); + } + + if (frame === Frames.pong) { + frame = Frames.data; + p.set(pong); + return Promise.resolve(pong.byteLength); + } + + if (frame === Frames.data) { + frame = Frames.close; + p.set(data); + return Promise.resolve(data.byteLength); + } + + if (frame === Frames.close) { + frame = Frames.end; + p.set(close); + return Promise.resolve(close.byteLength); + } + + return Promise.resolve(null); + }, + }; + + const conn = dummyConn(reader, new Buffer()); + const sock = createWebSocket({ conn }); + + const p = await Deno.readAll(sock); + + const helloLength = "Hello".length; + + assertEquals(p.byteLength, helloLength + dataPayloadLength); + assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello"); + assertEquals(p.subarray(helloLength), data.subarray(4)); +});