1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-26 00:59:24 -05:00

feat: ws client (#394)

This commit is contained in:
Yusuke Sakurai 2019-05-15 04:19:12 +09:00 committed by Ryan Dahl
parent d097e319e8
commit 782e3f690f
6 changed files with 347 additions and 124 deletions

View file

@ -1,39 +0,0 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { serve } from "https://deno.land/std/http/mod.ts";
import {
acceptWebSocket,
isWebSocketCloseEvent,
isWebSocketPingEvent
} from "https://deno.land/std/ws/mod.ts";
async function main(): Promise<void> {
console.log("websocket server is running on 0.0.0.0:8080");
for await (const req of serve("0.0.0.0:8080")) {
if (req.url === "/ws") {
(async (): Promise<void> => {
const sock = await acceptWebSocket(req);
console.log("socket connected!");
for await (const ev of sock.receive()) {
if (typeof ev === "string") {
// text message
console.log("ws:Text", ev);
await sock.send(ev);
} else if (ev instanceof Uint8Array) {
// binary message
console.log("ws:Binary", ev);
} else if (isWebSocketPingEvent(ev)) {
const [, body] = ev;
// ping
console.log("ws:Ping", body);
} else if (isWebSocketCloseEvent(ev)) {
// close
const { code, reason } = ev;
console.log("ws:Close", code, reason);
}
}
})();
}
}
}
main();

View file

@ -33,6 +33,11 @@ export class BufReader implements Reader {
private lastCharSize: number;
private err: BufState;
/** return new BufReader unless r is BufReader */
static create(r: Reader, size = DEFAULT_BUF_SIZE): BufReader {
return r instanceof BufReader ? r : new BufReader(r, size);
}
constructor(rd: Reader, size = DEFAULT_BUF_SIZE) {
if (size < MIN_BUF_SIZE) {
size = MIN_BUF_SIZE;
@ -368,6 +373,11 @@ export class BufWriter implements Writer {
n: number = 0;
err: null | BufState = null;
/** return new BufWriter unless w is BufWriter */
static create(w: Writer, size = DEFAULT_BUF_SIZE): BufWriter {
return w instanceof BufWriter ? w : new BufWriter(w, size);
}
constructor(private wr: Writer, size = DEFAULT_BUF_SIZE) {
if (size <= 0) {
size = DEFAULT_BUF_SIZE;

55
ws/example_client.ts Normal file
View file

@ -0,0 +1,55 @@
import {
connectWebSocket,
isWebSocketCloseEvent,
isWebSocketPingEvent,
isWebSocketPongEvent
} from "../ws/mod.ts";
import { encode } from "../strings/strings.ts";
import { BufReader } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { blue, green, red, yellow } from "../colors/mod.ts";
const endpoint = Deno.args[1] || "ws://127.0.0.1:8080";
/** simple websocket cli */
async function main(): Promise<void> {
const sock = await connectWebSocket(endpoint);
console.log(green("ws connected! (type 'close' to quit)"));
(async function(): Promise<void> {
for await (const msg of sock.receive()) {
if (typeof msg === "string") {
console.log(yellow("< " + msg));
} else if (isWebSocketPingEvent(msg)) {
console.log(blue("< ping"));
} else if (isWebSocketPongEvent(msg)) {
console.log(blue("< pong"));
} else if (isWebSocketCloseEvent(msg)) {
console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
}
}
})();
const tpr = new TextProtoReader(new BufReader(Deno.stdin));
while (true) {
await Deno.stdout.write(encode("> "));
const [line, err] = await tpr.readLine();
if (err) {
console.error(red(`failed to read line from stdin: ${err}`));
break;
}
if (line === "close") {
break;
} else if (line === "ping") {
await sock.ping();
} else {
await sock.send(line);
}
// FIXME: Without this, sock.receive() won't resolved though it is readable...
await new Promise((resolve): void => setTimeout(resolve, 0));
}
await sock.close(1000);
// FIXME: conn.close() won't shutdown process...
Deno.exit(0);
}
if (import.meta.main) {
main();
}

66
ws/example_server.ts Normal file
View file

@ -0,0 +1,66 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { serve } from "../http/server.ts";
import {
acceptWebSocket,
isWebSocketCloseEvent,
isWebSocketPingEvent,
WebSocket
} from "./mod.ts";
/** websocket echo server */
const port = Deno.args[1] || "8080";
async function main(): Promise<void> {
console.log(`websocket server is running on :${port}`);
for await (const req of serve(`:${port}`)) {
const { headers, conn } = req;
acceptWebSocket({
conn,
headers,
bufReader: req.r,
bufWriter: req.w
})
.then(
async (sock: WebSocket): Promise<void> => {
console.log("socket connected!");
const it = sock.receive();
while (true) {
try {
const { done, value } = await it.next();
if (done) {
break;
}
const ev = value;
if (typeof ev === "string") {
// text message
console.log("ws:Text", ev);
await sock.send(ev);
} else if (ev instanceof Uint8Array) {
// binary message
console.log("ws:Binary", ev);
} else if (isWebSocketPingEvent(ev)) {
const [, body] = ev;
// ping
console.log("ws:Ping", body);
} else if (isWebSocketCloseEvent(ev)) {
// close
const { code, reason } = ev;
console.log("ws:Close", code, reason);
}
} catch (e) {
console.error(`failed to receive frame: ${e}`);
await sock.close(1000).catch(console.error);
}
}
}
)
.catch(
(err: Error): void => {
console.error(`failed to accept websocket: ${err}`);
}
);
}
}
if (import.meta.main) {
main();
}

249
ws/mod.ts
View file

@ -1,11 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { Buffer } = Deno;
import { decode, encode } from "../strings/strings.ts";
type Conn = Deno.Conn;
type Writer = Deno.Writer;
import { BufReader, BufWriter } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
import { TextProtoReader } from "../textproto/mod.ts";
export enum OpCode {
Continue = 0x0,
@ -70,13 +73,19 @@ export interface WebSocketFrame {
}
export interface WebSocket {
readonly conn: Conn;
readonly isClosed: boolean;
receive(): AsyncIterableIterator<WebSocketEvent>;
send(data: WebSocketMessage): Promise<void>;
ping(data?: WebSocketMessage): Promise<void>;
close(code: number, reason?: string): Promise<void>;
}
/** Unmask masked websocket payload */
export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
if (mask) {
for (let i = 0, len = payload.length; i < len; i++) {
@ -85,6 +94,7 @@ export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
}
}
/** Write websocket frame to given writer */
export async function writeFrame(
frame: WebSocketFrame,
writer: Writer
@ -92,6 +102,11 @@ export async function writeFrame(
const payloadLength = frame.payload.byteLength;
let header: Uint8Array;
const hasMask = frame.mask ? 0x80 : 0;
if (frame.mask && frame.mask.byteLength !== 4) {
throw new Error(
"invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength
);
}
if (payloadLength < 126) {
header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]);
} else if (payloadLength < 0xffff) {
@ -108,13 +123,18 @@ export async function writeFrame(
...sliceLongToBytes(payloadLength)
]);
}
if (frame.mask) {
header = append(header, frame.mask);
}
unmask(frame.payload, frame.mask);
const bytes = append(header, frame.payload);
const w = new BufWriter(writer);
await w.write(bytes);
await w.flush();
header = append(header, frame.payload);
const w = BufWriter.create(writer);
await w.write(header);
const err = await w.flush();
if (err) throw err;
}
/** Read websocket frame from given BufReader */
export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
let b = await buf.readByte();
let isLastFrame = false;
@ -155,62 +175,38 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
};
}
export async function* receiveFrame(
conn: Conn
): AsyncIterableIterator<WebSocketFrame> {
let receiving = true;
const isLastFrame = true;
const reader = new BufReader(conn);
while (receiving) {
const frame = await readFrame(reader);
const { opcode, payload } = frame;
switch (opcode) {
case OpCode.TextFrame:
case OpCode.BinaryFrame:
case OpCode.Continue:
yield frame;
break;
case OpCode.Close:
await writeFrame(
{
opcode,
payload,
isLastFrame
},
conn
);
conn.close();
yield frame;
receiving = false;
break;
case OpCode.Ping:
await writeFrame(
{
payload,
isLastFrame,
opcode: OpCode.Pong
},
conn
);
yield frame;
break;
case OpCode.Pong:
yield frame;
break;
default:
}
}
// Create client-to-server mask, random 32bit number
function createMask(): Uint8Array {
// TODO: use secure and immutable random function. Crypto.getRandomValues()
const arr = Array.from({ length: 4 }).map(
(): number => Math.round(Math.random() * 0xff)
);
return new Uint8Array(arr);
}
class WebSocketImpl implements WebSocket {
encoder = new TextEncoder();
private readonly mask?: Uint8Array;
private readonly bufReader: BufReader;
private readonly bufWriter: BufWriter;
constructor(private conn: Conn, private mask?: Uint8Array) {}
constructor(
readonly conn: Conn,
opts: {
bufReader?: BufReader;
bufWriter?: BufWriter;
mask?: Uint8Array;
} = {}
) {
this.mask = opts.mask || createMask();
this.bufReader = opts.bufReader || new BufReader(conn);
this.bufWriter = opts.bufWriter || new BufWriter(conn);
}
async *receive(): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = [];
let payloadsLength = 0;
for await (const frame of receiveFrame(this.conn)) {
while (true) {
const frame = await readFrame(this.bufReader);
unmask(frame.payload, frame.mask);
switch (frame.opcode) {
case OpCode.TextFrame:
@ -227,7 +223,7 @@ class WebSocketImpl implements WebSocket {
}
if (frames[0].opcode === OpCode.TextFrame) {
// text
yield new Buffer(concat).toString();
yield decode(concat);
} else {
// binary
yield concat;
@ -237,14 +233,23 @@ class WebSocketImpl implements WebSocket {
}
break;
case OpCode.Close:
const code = (frame.payload[0] << 16) | frame.payload[1];
const reason = new Buffer(
// [0x12, 0x34] -> 0x1234
const code = (frame.payload[0] << 8) | frame.payload[1];
const reason = decode(
frame.payload.subarray(2, frame.payload.length)
).toString();
this._isClosed = true;
);
await this.close(code, reason);
yield { code, reason };
return;
case OpCode.Ping:
await writeFrame(
{
opcode: OpCode.Pong,
payload: frame.payload,
isLastFrame: true
},
this.bufWriter
);
yield ["ping", frame.payload] as WebSocketPingEvent;
break;
case OpCode.Pong:
@ -261,7 +266,7 @@ class WebSocketImpl implements WebSocket {
}
const opcode =
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
const payload = typeof data === "string" ? this.encoder.encode(data) : data;
const payload = typeof data === "string" ? encode(data) : data;
const isLastFrame = true;
await writeFrame(
{
@ -270,20 +275,20 @@ class WebSocketImpl implements WebSocket {
payload,
mask: this.mask
},
this.conn
this.bufWriter
);
}
async ping(data: WebSocketMessage): Promise<void> {
const payload = typeof data === "string" ? this.encoder.encode(data) : data;
async ping(data: WebSocketMessage = ""): Promise<void> {
const payload = typeof data === "string" ? encode(data) : data;
await writeFrame(
{
isLastFrame: true,
opcode: OpCode.Close,
opcode: OpCode.Ping,
mask: this.mask,
payload
},
this.conn
this.bufWriter
);
}
@ -297,7 +302,7 @@ class WebSocketImpl implements WebSocket {
const header = [code >>> 8, code & 0x00ff];
let payload: Uint8Array;
if (reason) {
const reasonBytes = this.encoder.encode(reason);
const reasonBytes = encode(reason);
payload = new Uint8Array(2 + reasonBytes.byteLength);
payload.set(header);
payload.set(reasonBytes, 2);
@ -311,7 +316,7 @@ class WebSocketImpl implements WebSocket {
mask: this.mask,
payload
},
this.conn
this.bufWriter
);
} catch (e) {
throw e;
@ -320,11 +325,10 @@ class WebSocketImpl implements WebSocket {
}
}
private ensureSocketClosed(): Error {
private ensureSocketClosed(): void {
if (this.isClosed) {
return;
}
try {
this.conn.close();
} catch (e) {
@ -335,16 +339,20 @@ class WebSocketImpl implements WebSocket {
}
}
/** Return whether given headers is acceptable for websocket */
export function acceptable(req: { headers: Headers }): boolean {
const secKey = req.headers.get("sec-websocket-key");
return (
req.headers.get("upgrade") === "websocket" &&
req.headers.has("sec-websocket-key") &&
req.headers.get("sec-websocket-key").length > 0
typeof secKey === "string" &&
secKey.length > 0
);
}
const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
/** Create sec-websocket-accept header value with given nonce */
export function createSecAccept(nonce: string): string {
const sha1 = new Sha1();
sha1.update(nonce + kGUID);
@ -352,16 +360,22 @@ export function createSecAccept(nonce: string): string {
return btoa(String.fromCharCode.apply(String, bytes));
}
/** Upgrade given TCP connection into websocket connection */
export async function acceptWebSocket(req: {
conn: Conn;
bufWriter: BufWriter;
bufReader: BufReader;
headers: Headers;
}): Promise<WebSocket> {
const { conn, headers } = req;
const { conn, headers, bufReader, bufWriter } = req;
if (acceptable(req)) {
const sock = new WebSocketImpl(conn);
const sock = new WebSocketImpl(conn, { bufReader, bufWriter });
const secKey = headers.get("sec-websocket-key");
if (typeof secKey !== "string") {
throw new Error("sec-websocket-key is not provided");
}
const secAccept = createSecAccept(secKey);
await writeResponse(conn, {
await writeResponse(bufWriter, {
status: 101,
headers: new Headers({
Upgrade: "websocket",
@ -373,3 +387,94 @@ export async function acceptWebSocket(req: {
}
throw new Error("request is not acceptable");
}
const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_";
/** Create WebSocket-Sec-Key. Base64 encoded 16 bytes string */
export function createSecKey(): string {
let key = "";
for (let i = 0; i < 16; i++) {
const j = Math.round(Math.random() * kSecChars.length);
key += kSecChars[j];
}
return btoa(key);
}
/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */
export async function connectWebSocket(
endpoint: string,
headers: Headers = new Headers()
): Promise<WebSocket> {
const url = new URL(endpoint);
const { hostname, pathname, searchParams } = url;
let port = url.port;
if (!url.port) {
if (url.protocol === "http" || url.protocol === "ws") {
port = "80";
} else if (url.protocol === "https" || url.protocol === "wss") {
throw new Error("currently https/wss is not supported");
}
}
const conn = await Deno.dial("tcp", `${hostname}:${port}`);
const abortHandshake = (err: Error): void => {
conn.close();
throw err;
};
const bufWriter = new BufWriter(conn);
const bufReader = new BufReader(conn);
await bufWriter.write(
encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`)
);
const key = createSecKey();
if (!headers.has("host")) {
headers.set("host", hostname);
}
headers.set("upgrade", "websocket");
headers.set("connection", "upgrade");
headers.set("sec-websocket-key", key);
let headerStr = "";
for (const [key, value] of headers) {
headerStr += `${key}: ${value}\r\n`;
}
headerStr += "\r\n";
await bufWriter.write(encode(headerStr));
let err, statusLine, responseHeaders;
err = await bufWriter.flush();
if (err) {
throw new Error("ws: failed to send handshake: " + err);
}
const tpReader = new TextProtoReader(bufReader);
[statusLine, err] = await tpReader.readLine();
if (err) {
abortHandshake(new Error("ws: failed to read status line: " + err));
}
const m = statusLine.match(/^(.+?) (.+?) (.+?)$/);
if (!m) {
abortHandshake(new Error("ws: invalid status line: " + statusLine));
}
const [_, version, statusCode] = m;
if (version !== "HTTP/1.1" || statusCode !== "101") {
abortHandshake(
new Error(
`ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}`
)
);
}
[responseHeaders, err] = await tpReader.readMIMEHeader();
if (err) {
abortHandshake(new Error("ws: failed to parse response headers: " + err));
}
const expectedSecAccept = createSecAccept(key);
const secAccept = responseHeaders.get("sec-websocket-accept");
if (secAccept !== expectedSecAccept) {
abortHandshake(
new Error(
`ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}`
)
);
}
return new WebSocketImpl(conn, {
bufWriter,
bufReader
});
}

View file

@ -1,19 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./sha1_test.ts";
const { Buffer } = Deno;
import { BufReader } from "../io/bufio.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
import { test } from "../testing/mod.ts";
import { runIfMain, test } from "../testing/mod.ts";
import {
acceptable,
createSecAccept,
OpCode,
readFrame,
unmask
unmask,
writeFrame
} from "./mod.ts";
import { encode } from "../strings/strings.ts";
test(async function testReadUnmaskedTextFrame(): Promise<void> {
const { Buffer } = Deno;
test(async function wsReadUnmaskedTextFrame(): Promise<void> {
// unmasked single text frame with payload "Hello"
const buf = new BufReader(
new Buffer(new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]))
@ -25,7 +27,7 @@ test(async function testReadUnmaskedTextFrame(): Promise<void> {
assertEquals(frame.isLastFrame, true);
});
test(async function testReadMakedTextFrame(): Promise<void> {
test(async function wsReadMaskedTextFrame(): Promise<void> {
//a masked single text frame with payload "Hello"
const buf = new BufReader(
new Buffer(
@ -52,7 +54,7 @@ test(async function testReadMakedTextFrame(): Promise<void> {
assertEquals(frame.isLastFrame, true);
});
test(async function testReadUnmaskedSplittedTextFrames(): Promise<void> {
test(async function wsReadUnmaskedSplitTextFrames(): Promise<void> {
const buf1 = new BufReader(
new Buffer(new Uint8Array([0x01, 0x03, 0x48, 0x65, 0x6c]))
);
@ -71,7 +73,7 @@ test(async function testReadUnmaskedSplittedTextFrames(): Promise<void> {
assertEquals(new Buffer(f2.payload).toString(), "lo");
});
test(async function testReadUnmaksedPingPongFrame(): Promise<void> {
test(async function wsReadUnmaskedPingPongFrame(): Promise<void> {
// unmasked ping with payload "Hello"
const buf = new BufReader(
new Buffer(new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]))
@ -104,7 +106,7 @@ test(async function testReadUnmaksedPingPongFrame(): Promise<void> {
assertEquals(new Buffer(pong.payload).toString(), "Hello");
});
test(async function testReadUnmaksedBigBinaryFrame(): Promise<void> {
test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> {
const a = [0x82, 0x7e, 0x01, 0x00];
for (let i = 0; i < 256; i++) {
a.push(i);
@ -117,7 +119,7 @@ test(async function testReadUnmaksedBigBinaryFrame(): Promise<void> {
assertEquals(bin.payload.length, 256);
});
test(async function testReadUnmaskedBigBigBinaryFrame(): Promise<void> {
test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> {
const a = [0x82, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00];
for (let i = 0; i < 0xffff; i++) {
a.push(i);
@ -130,13 +132,13 @@ test(async function testReadUnmaskedBigBigBinaryFrame(): Promise<void> {
assertEquals(bin.payload.length, 0xffff + 1);
});
test(async function testCreateSecAccept(): Promise<void> {
test(async function wsCreateSecAccept(): Promise<void> {
const nonce = "dGhlIHNhbXBsZSBub25jZQ==";
const d = createSecAccept(nonce);
assertEquals(d, "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=");
});
test(function testAcceptable(): void {
test(function wsAcceptable(): void {
const ret = acceptable({
headers: new Headers({
upgrade: "websocket",
@ -153,7 +155,7 @@ const invalidHeaders = [
{ upgrade: "websocket", "sec-websocket-ky": "" }
];
test(function testAcceptableInvalid(): void {
test(function wsAcceptableInvalid(): void {
for (const pat of invalidHeaders) {
const ret = acceptable({
headers: new Headers(pat)
@ -161,3 +163,27 @@ test(function testAcceptableInvalid(): void {
assertEquals(ret, false);
}
});
test(async function wsWriteReadMaskedFrame(): Promise<void> {
const mask = new Uint8Array([0, 1, 2, 3]);
const msg = "hello";
const buf = new Buffer();
const r = new BufReader(buf);
await writeFrame(
{
isLastFrame: true,
mask,
opcode: OpCode.TextFrame,
payload: encode(msg)
},
buf
);
const frame = await readFrame(r);
assertEquals(frame.opcode, OpCode.TextFrame);
assertEquals(frame.isLastFrame, true);
assertEquals(frame.mask, mask);
unmask(frame.payload, frame.mask);
assertEquals(frame.payload, encode(msg));
});
runIfMain(import.meta);