1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-26 16:09:27 -05:00

feat: Make WebSocket Reader/Writer (#5001) (#5002)

This commit is contained in:
Andrey Trebler 2020-04-30 09:09:48 +02:00 committed by GitHub
parent 12c6055855
commit 8ec36681dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 143 additions and 1 deletions

View file

@ -10,8 +10,10 @@ import { TextProtoReader } from "../textproto/mod.ts";
import { Deferred, deferred } from "../util/async.ts"; import { Deferred, deferred } from "../util/async.ts";
import { assert } from "../testing/asserts.ts"; import { assert } from "../testing/asserts.ts";
import { concat } from "../bytes/mod.ts"; import { concat } from "../bytes/mod.ts";
import { copyBytes } from "../io/util.ts";
import Conn = Deno.Conn; import Conn = Deno.Conn;
import Writer = Deno.Writer; import Writer = Deno.Writer;
import Reader = Deno.Reader;
export enum OpCode { export enum OpCode {
Continue = 0x0, Continue = 0x0,
@ -65,7 +67,7 @@ export interface WebSocketFrame {
payload: Uint8Array; payload: Uint8Array;
} }
export interface WebSocket { export interface WebSocket extends Reader, Writer {
readonly conn: Conn; readonly conn: Conn;
readonly isClosed: boolean; readonly isClosed: boolean;
@ -327,6 +329,26 @@ class WebSocketImpl implements WebSocket {
return this.enqueue(frame); return this.enqueue(frame);
} }
async write(p: Uint8Array): Promise<number> {
await this.send(p);
return p.byteLength;
}
async read(p: Uint8Array): Promise<number | null> {
for await (const ev of this.receive()) {
if (ev instanceof Uint8Array) {
return copyBytes(p, ev);
}
if (typeof ev === "string") {
return copyBytes(p, encode(ev));
}
}
return null;
}
ping(data: WebSocketMessage = ""): Promise<void> { ping(data: WebSocketMessage = ""): Promise<void> {
const payload = typeof data === "string" ? encode(data) : data; const payload = typeof data === "string" ? encode(data) : data;
const frame = { const frame = {

View file

@ -403,3 +403,123 @@ test({
await delay(10); await delay(10);
}, },
}); });
test("[ws] WebSocket should implement Writer", async () => {
const buf = new Buffer();
const conn = dummyConn(buf, buf);
const sock = createWebSocket({ conn });
const [write0, write1, write2] = await Promise.all([
sock.write(new Uint8Array([1, 2, 3])),
sock.write(new Uint8Array([])),
sock.write(new Uint8Array([0])),
]);
assertEquals(write0, 3);
assertEquals(write1, 0);
assertEquals(write2, 1);
});
test("[ws] WebSocket should implement Reader", async () => {
const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const bufHello = new Buffer(hello);
const conn = dummyConn(bufHello, new Buffer());
const sock = createWebSocket({ conn });
const p = new Uint8Array(100);
const read = await sock.read(p);
const readLast = await sock.read(p);
const helloLength = "Hello".length;
assertEquals(read, helloLength);
assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello");
assertEquals(readLast, null);
});
test("[ws] WebSocket Reader should ignore non-message frames", async () => {
const pong = new Uint8Array([
0x8a,
0x85,
0x37,
0xfa,
0x21,
0x3d,
0x7f,
0x9f,
0x4d,
0x51,
0x58,
]);
const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const close = new Uint8Array([0x88]);
const dataPayloadLength = 0x100;
const dataArr = [0x82, 0x7e, 0x01, 0x00];
for (let i = 0; i < dataPayloadLength; i++) {
dataArr.push(i);
}
const data = new Uint8Array(dataArr);
enum Frames {
ping,
text,
pong,
data,
close,
end,
}
let frame = Frames.ping;
const reader: Reader = {
read(p: Uint8Array): Promise<number | null> {
if (frame === Frames.ping) {
frame = Frames.text;
p.set(pingHello);
return Promise.resolve(pingHello.byteLength);
}
if (frame === Frames.text) {
frame = Frames.pong;
p.set(hello);
return Promise.resolve(hello.byteLength);
}
if (frame === Frames.pong) {
frame = Frames.data;
p.set(pong);
return Promise.resolve(pong.byteLength);
}
if (frame === Frames.data) {
frame = Frames.close;
p.set(data);
return Promise.resolve(data.byteLength);
}
if (frame === Frames.close) {
frame = Frames.end;
p.set(close);
return Promise.resolve(close.byteLength);
}
return Promise.resolve(null);
},
};
const conn = dummyConn(reader, new Buffer());
const sock = createWebSocket({ conn });
const p = await Deno.readAll(sock);
const helloLength = "Hello".length;
assertEquals(p.byteLength, helloLength + dataPayloadLength);
assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello");
assertEquals(p.subarray(helloLength), data.subarray(4));
});