mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 00:21:05 -05:00
This commit is contained in:
parent
38ecabf205
commit
796fc9bc3e
7 changed files with 230 additions and 185 deletions
|
@ -18,7 +18,7 @@ async function wsHandler(ws: WebSocket): Promise<void> {
|
|||
const id = ++clientId;
|
||||
clients.set(id, ws);
|
||||
dispatch(`Connected: [${id}]`);
|
||||
for await (const msg of ws.receive()) {
|
||||
for await (const msg of ws) {
|
||||
console.log(`msg:${id}`, msg);
|
||||
if (typeof msg === "string") {
|
||||
dispatch(`[${id}]: ${msg}`);
|
||||
|
|
|
@ -59,7 +59,8 @@ test({
|
|||
let ws: WebSocket | undefined;
|
||||
try {
|
||||
ws = await connectWebSocket("http://127.0.0.1:8080/ws");
|
||||
const it = ws.receive();
|
||||
const it = ws[Symbol.asyncIterator]();
|
||||
|
||||
assertEquals((await it.next()).value, "Connected: [1]");
|
||||
ws.send("Hello");
|
||||
assertEquals((await it.next()).value, "[1]: Hello");
|
||||
|
|
181
std/ws/README.md
181
std/ws/README.md
|
@ -12,57 +12,54 @@ import {
|
|||
acceptWebSocket,
|
||||
isWebSocketCloseEvent,
|
||||
isWebSocketPingEvent,
|
||||
WebSocket,
|
||||
} from "https://deno.land/std/ws/mod.ts";
|
||||
|
||||
/** websocket echo server */
|
||||
const port = Deno.args[0] || "8080";
|
||||
console.log(`websocket server is running on :${port}`);
|
||||
for await (const req of serve(`:${port}`)) {
|
||||
const { headers, conn } = req;
|
||||
acceptWebSocket({
|
||||
conn,
|
||||
headers,
|
||||
bufReader: req.r,
|
||||
bufWriter: req.w,
|
||||
})
|
||||
.then(
|
||||
async (sock: WebSocket): Promise<void> => {
|
||||
console.log("socket connected!");
|
||||
const it = sock.receive();
|
||||
while (true) {
|
||||
try {
|
||||
const { done, value } = await it.next();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
const ev = value;
|
||||
if (typeof ev === "string") {
|
||||
// text message
|
||||
console.log("ws:Text", ev);
|
||||
await sock.send(ev);
|
||||
} else if (ev instanceof Uint8Array) {
|
||||
// binary message
|
||||
console.log("ws:Binary", ev);
|
||||
} else if (isWebSocketPingEvent(ev)) {
|
||||
const [, body] = ev;
|
||||
// ping
|
||||
console.log("ws:Ping", body);
|
||||
} else if (isWebSocketCloseEvent(ev)) {
|
||||
// close
|
||||
const { code, reason } = ev;
|
||||
console.log("ws:Close", code, reason);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(`failed to receive frame: ${e}`);
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
const { conn, r: bufReader, w: bufWriter, headers } = req;
|
||||
|
||||
try {
|
||||
const sock = await acceptWebSocket({
|
||||
conn,
|
||||
bufReader,
|
||||
bufWriter,
|
||||
headers,
|
||||
});
|
||||
|
||||
console.log("socket connected!");
|
||||
|
||||
try {
|
||||
for await (const ev of sock) {
|
||||
if (typeof ev === "string") {
|
||||
// text message
|
||||
console.log("ws:Text", ev);
|
||||
await sock.send(ev);
|
||||
} else if (ev instanceof Uint8Array) {
|
||||
// binary message
|
||||
console.log("ws:Binary", ev);
|
||||
} else if (isWebSocketPingEvent(ev)) {
|
||||
const [, body] = ev;
|
||||
// ping
|
||||
console.log("ws:Ping", body);
|
||||
} else if (isWebSocketCloseEvent(ev)) {
|
||||
// close
|
||||
const { code, reason } = ev;
|
||||
console.log("ws:Close", code, reason);
|
||||
}
|
||||
}
|
||||
)
|
||||
.catch((err: Error): void => {
|
||||
console.error(`failed to accept websocket: ${err}`);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`failed to receive frame: ${err}`);
|
||||
|
||||
if (!sock.isClosed) {
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`failed to accept websocket: ${err}`);
|
||||
await req.respond({ status: 400 });
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -75,51 +72,58 @@ import {
|
|||
isWebSocketPingEvent,
|
||||
isWebSocketPongEvent,
|
||||
} from "https://deno.land/std/ws/mod.ts";
|
||||
import { encode } from "https://deno.land/std/strings/mod.ts";
|
||||
import { encode } from "https://deno.land/std/encoding/utf8.ts";
|
||||
import { BufReader } from "https://deno.land/std/io/bufio.ts";
|
||||
import { TextProtoReader } from "https://deno.land/std/textproto/mod.ts";
|
||||
import { blue, green, red, yellow } from "https://deno.land/std/fmt/colors.ts";
|
||||
|
||||
const endpoint = Deno.args[0] || "ws://127.0.0.1:8080";
|
||||
/** simple websocket cli */
|
||||
const sock = await connectWebSocket(endpoint);
|
||||
console.log(green("ws connected! (type 'close' to quit)"));
|
||||
(async function (): Promise<void> {
|
||||
for await (const msg of sock.receive()) {
|
||||
if (typeof msg === "string") {
|
||||
console.log(yellow("< " + msg));
|
||||
} else if (isWebSocketPingEvent(msg)) {
|
||||
console.log(blue("< ping"));
|
||||
} else if (isWebSocketPongEvent(msg)) {
|
||||
console.log(blue("< pong"));
|
||||
} else if (isWebSocketCloseEvent(msg)) {
|
||||
console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
|
||||
}
|
||||
}
|
||||
})();
|
||||
try {
|
||||
const sock = await connectWebSocket(endpoint);
|
||||
console.log(green("ws connected! (type 'close' to quit)"));
|
||||
|
||||
const tpr = new TextProtoReader(new BufReader(Deno.stdin));
|
||||
while (true) {
|
||||
await Deno.stdout.write(encode("> "));
|
||||
const line = await tpr.readLine();
|
||||
if (line === null) {
|
||||
break;
|
||||
const messages = async (): Promise<void> => {
|
||||
for await (const msg of sock) {
|
||||
if (typeof msg === "string") {
|
||||
console.log(yellow(`< ${msg}`));
|
||||
} else if (isWebSocketPingEvent(msg)) {
|
||||
console.log(blue("< ping"));
|
||||
} else if (isWebSocketPongEvent(msg)) {
|
||||
console.log(blue("< pong"));
|
||||
} else if (isWebSocketCloseEvent(msg)) {
|
||||
console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const cli = async (): Promise<void> => {
|
||||
const tpr = new TextProtoReader(new BufReader(Deno.stdin));
|
||||
while (true) {
|
||||
await Deno.stdout.write(encode("> "));
|
||||
const line = await tpr.readLine();
|
||||
if (line === null) {
|
||||
break;
|
||||
}
|
||||
if (line === "close") {
|
||||
break;
|
||||
} else if (line === "ping") {
|
||||
await sock.ping();
|
||||
} else {
|
||||
await sock.send(line);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.race([messages(), cli()]).catch(console.error);
|
||||
|
||||
if (!sock.isClosed) {
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
if (line === "close") {
|
||||
break;
|
||||
} else if (line === "ping") {
|
||||
await sock.ping();
|
||||
} else {
|
||||
await sock.send(line);
|
||||
}
|
||||
// FIXME: Without this,
|
||||
// sock.receive() won't resolved though it is readable...
|
||||
await new Promise((resolve): void => {
|
||||
setTimeout(resolve, 0);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(red(`Could not connect to WebSocket: '${err}'`));
|
||||
}
|
||||
await sock.close(1000);
|
||||
// FIXME: conn.close() won't shutdown process...
|
||||
|
||||
Deno.exit(0);
|
||||
```
|
||||
|
||||
|
@ -137,25 +141,6 @@ Returns true if input value is a WebSocketPingEvent, false otherwise.
|
|||
|
||||
Returns true if input value is a WebSocketPongEvent, false otherwise.
|
||||
|
||||
### append
|
||||
|
||||
This module is used to merge two Uint8Arrays.
|
||||
|
||||
- note: This module might move to common/util.
|
||||
|
||||
```ts
|
||||
import { append } from "https://deno.land/std/ws/mod.ts";
|
||||
|
||||
// a = [1], b = [2]
|
||||
append(a, b); // output: [1, 2]
|
||||
|
||||
// a = [1], b = null
|
||||
append(a, b); // output: [1]
|
||||
|
||||
// a = [], b = [2]
|
||||
append(a, b); // output: [2]
|
||||
```
|
||||
|
||||
### unmask
|
||||
|
||||
Unmask masked WebSocket payload.
|
||||
|
|
|
@ -11,42 +11,49 @@ import { blue, green, red, yellow } from "../fmt/colors.ts";
|
|||
|
||||
const endpoint = Deno.args[0] || "ws://127.0.0.1:8080";
|
||||
/** simple websocket cli */
|
||||
const sock = await connectWebSocket(endpoint);
|
||||
console.log(green("ws connected! (type 'close' to quit)"));
|
||||
(async function (): Promise<void> {
|
||||
for await (const msg of sock.receive()) {
|
||||
if (typeof msg === "string") {
|
||||
console.log(yellow("< " + msg));
|
||||
} else if (isWebSocketPingEvent(msg)) {
|
||||
console.log(blue("< ping"));
|
||||
} else if (isWebSocketPongEvent(msg)) {
|
||||
console.log(blue("< pong"));
|
||||
} else if (isWebSocketCloseEvent(msg)) {
|
||||
console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
|
||||
}
|
||||
}
|
||||
})();
|
||||
try {
|
||||
const sock = await connectWebSocket(endpoint);
|
||||
console.log(green("ws connected! (type 'close' to quit)"));
|
||||
|
||||
const tpr = new TextProtoReader(new BufReader(Deno.stdin));
|
||||
while (true) {
|
||||
await Deno.stdout.write(encode("> "));
|
||||
const line = await tpr.readLine();
|
||||
if (line === null) {
|
||||
break;
|
||||
const messages = async (): Promise<void> => {
|
||||
for await (const msg of sock) {
|
||||
if (typeof msg === "string") {
|
||||
console.log(yellow(`< ${msg}`));
|
||||
} else if (isWebSocketPingEvent(msg)) {
|
||||
console.log(blue("< ping"));
|
||||
} else if (isWebSocketPongEvent(msg)) {
|
||||
console.log(blue("< pong"));
|
||||
} else if (isWebSocketCloseEvent(msg)) {
|
||||
console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const cli = async (): Promise<void> => {
|
||||
const tpr = new TextProtoReader(new BufReader(Deno.stdin));
|
||||
while (true) {
|
||||
await Deno.stdout.write(encode("> "));
|
||||
const line = await tpr.readLine();
|
||||
if (line === null) {
|
||||
break;
|
||||
}
|
||||
if (line === "close") {
|
||||
break;
|
||||
} else if (line === "ping") {
|
||||
await sock.ping();
|
||||
} else {
|
||||
await sock.send(line);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.race([messages(), cli()]).catch(console.error);
|
||||
|
||||
if (!sock.isClosed) {
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
if (line === "close") {
|
||||
break;
|
||||
} else if (line === "ping") {
|
||||
await sock.ping();
|
||||
} else {
|
||||
await sock.send(line);
|
||||
}
|
||||
// FIXME: Without this,
|
||||
// sock.receive() won't resolved though it is readable...
|
||||
await new Promise((resolve): void => {
|
||||
setTimeout(resolve, 0);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(red(`Could not connect to WebSocket: '${err}'`));
|
||||
}
|
||||
await sock.close(1000);
|
||||
// FIXME: conn.close() won't shutdown process...
|
||||
|
||||
Deno.exit(0);
|
||||
|
|
|
@ -4,55 +4,52 @@ import {
|
|||
acceptWebSocket,
|
||||
isWebSocketCloseEvent,
|
||||
isWebSocketPingEvent,
|
||||
WebSocket,
|
||||
} from "./mod.ts";
|
||||
|
||||
/** websocket echo server */
|
||||
const port = Deno.args[0] || "8080";
|
||||
console.log(`websocket server is running on :${port}`);
|
||||
for await (const req of serve(`:${port}`)) {
|
||||
const { headers, conn } = req;
|
||||
acceptWebSocket({
|
||||
conn,
|
||||
headers,
|
||||
bufReader: req.r,
|
||||
bufWriter: req.w,
|
||||
})
|
||||
.then(
|
||||
async (sock: WebSocket): Promise<void> => {
|
||||
console.log("socket connected!");
|
||||
const it = sock.receive();
|
||||
while (true) {
|
||||
try {
|
||||
const { done, value } = await it.next();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
const ev = value;
|
||||
if (typeof ev === "string") {
|
||||
// text message
|
||||
console.log("ws:Text", ev);
|
||||
await sock.send(ev);
|
||||
} else if (ev instanceof Uint8Array) {
|
||||
// binary message
|
||||
console.log("ws:Binary", ev);
|
||||
} else if (isWebSocketPingEvent(ev)) {
|
||||
const [, body] = ev;
|
||||
// ping
|
||||
console.log("ws:Ping", body);
|
||||
} else if (isWebSocketCloseEvent(ev)) {
|
||||
// close
|
||||
const { code, reason } = ev;
|
||||
console.log("ws:Close", code, reason);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(`failed to receive frame: ${e}`);
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
const { conn, r: bufReader, w: bufWriter, headers } = req;
|
||||
|
||||
try {
|
||||
const sock = await acceptWebSocket({
|
||||
conn,
|
||||
bufReader,
|
||||
bufWriter,
|
||||
headers,
|
||||
});
|
||||
|
||||
console.log("socket connected!");
|
||||
|
||||
try {
|
||||
for await (const ev of sock) {
|
||||
if (typeof ev === "string") {
|
||||
// text message
|
||||
console.log("ws:Text", ev);
|
||||
await sock.send(ev);
|
||||
} else if (ev instanceof Uint8Array) {
|
||||
// binary message
|
||||
console.log("ws:Binary", ev);
|
||||
} else if (isWebSocketPingEvent(ev)) {
|
||||
const [, body] = ev;
|
||||
// ping
|
||||
console.log("ws:Ping", body);
|
||||
} else if (isWebSocketCloseEvent(ev)) {
|
||||
// close
|
||||
const { code, reason } = ev;
|
||||
console.log("ws:Close", code, reason);
|
||||
}
|
||||
}
|
||||
)
|
||||
.catch((err: Error): void => {
|
||||
console.error(`failed to accept websocket: ${err}`);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`failed to receive frame: ${err}`);
|
||||
|
||||
if (!sock.isClosed) {
|
||||
await sock.close(1000).catch(console.error);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`failed to accept websocket: ${err}`);
|
||||
await req.respond({ status: 400 });
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,11 +67,14 @@ export interface WebSocketFrame {
|
|||
payload: Uint8Array;
|
||||
}
|
||||
|
||||
export interface WebSocket extends Reader, Writer {
|
||||
export interface WebSocket
|
||||
extends Reader,
|
||||
Writer,
|
||||
AsyncIterable<WebSocketEvent> {
|
||||
readonly conn: Conn;
|
||||
readonly isClosed: boolean;
|
||||
|
||||
receive(): AsyncIterableIterator<WebSocketEvent>;
|
||||
[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent>;
|
||||
|
||||
/**
|
||||
* @throws `Deno.errors.ConnectionReset`
|
||||
|
@ -228,7 +231,7 @@ class WebSocketImpl implements WebSocket {
|
|||
this.bufWriter = bufWriter || new BufWriter(conn);
|
||||
}
|
||||
|
||||
async *receive(): AsyncIterableIterator<WebSocketEvent> {
|
||||
async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent> {
|
||||
let frames: WebSocketFrame[] = [];
|
||||
let payloadsLength = 0;
|
||||
while (!this._isClosed) {
|
||||
|
@ -336,7 +339,7 @@ class WebSocketImpl implements WebSocket {
|
|||
}
|
||||
|
||||
async read(p: Uint8Array): Promise<number | null> {
|
||||
for await (const ev of this.receive()) {
|
||||
for await (const ev of this) {
|
||||
if (ev instanceof Uint8Array) {
|
||||
return copyBytes(ev, p);
|
||||
}
|
||||
|
|
|
@ -355,7 +355,7 @@ test("[ws] WebSocket should throw `Deno.errors.ConnectionReset` when peer closed
|
|||
await assertThrowsAsync(() => sock.close(0), Deno.errors.ConnectionReset);
|
||||
});
|
||||
|
||||
test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", async () => {
|
||||
test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof`", async () => {
|
||||
const buf = new Buffer();
|
||||
const eofReader: Deno.Reader = {
|
||||
read(_: Uint8Array): Promise<number | null> {
|
||||
|
@ -364,7 +364,7 @@ test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", a
|
|||
};
|
||||
const conn = dummyConn(eofReader, buf);
|
||||
const sock = createWebSocket({ conn });
|
||||
const it = sock.receive();
|
||||
const it = sock[Symbol.asyncIterator]();
|
||||
const { value, done } = await it.next();
|
||||
assertEquals(value, undefined);
|
||||
assertEquals(done, true);
|
||||
|
@ -456,7 +456,7 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => {
|
|||
]);
|
||||
const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
|
||||
const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
|
||||
const close = new Uint8Array([0x88]);
|
||||
const close = new Uint8Array([0x88, 0x02, 0x03, 0xe8]);
|
||||
|
||||
const dataPayloadLength = 0x100;
|
||||
const dataArr = [0x82, 0x7e, 0x01, 0x00];
|
||||
|
@ -523,3 +523,55 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => {
|
|||
assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello");
|
||||
assertEquals(p.subarray(helloLength), data.subarray(4));
|
||||
});
|
||||
|
||||
test("[ws] WebSocket should act as asyncIterator", async () => {
|
||||
const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
|
||||
const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
|
||||
const close = new Uint8Array([0x88, 0x04, 0x03, 0xf3, 0x34, 0x32]);
|
||||
|
||||
enum Frames {
|
||||
ping,
|
||||
hello,
|
||||
close,
|
||||
end,
|
||||
}
|
||||
|
||||
let frame = Frames.ping;
|
||||
|
||||
const reader: Reader = {
|
||||
read(p: Uint8Array): Promise<number | null> {
|
||||
if (frame === Frames.ping) {
|
||||
frame = Frames.hello;
|
||||
p.set(pingHello);
|
||||
return Promise.resolve(pingHello.byteLength);
|
||||
}
|
||||
|
||||
if (frame === Frames.hello) {
|
||||
frame = Frames.close;
|
||||
p.set(hello);
|
||||
return Promise.resolve(hello.byteLength);
|
||||
}
|
||||
|
||||
if (frame === Frames.close) {
|
||||
frame = Frames.end;
|
||||
p.set(close);
|
||||
return Promise.resolve(close.byteLength);
|
||||
}
|
||||
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
};
|
||||
|
||||
const conn = dummyConn(reader, new Buffer());
|
||||
const sock = createWebSocket({ conn });
|
||||
|
||||
const events = [];
|
||||
for await (const wsEvent of sock) {
|
||||
events.push(wsEvent);
|
||||
}
|
||||
|
||||
assertEquals(events.length, 3);
|
||||
assertEquals(events[0], ["ping", encode("Hello")]);
|
||||
assertEquals(events[1], "Hello");
|
||||
assertEquals(events[2], { code: 1011, reason: "42" });
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue