1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)

This is a rewrite of the `Deno.serve` API to live on top of hyper
1.0-rc3. The code should be more maintainable long-term, and avoids some
of the slower mpsc patterns that made the older code less efficient than
it could have been.

Missing features:

- `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available,
however).
- Automatic compression is unavailable on responses.
This commit is contained in:
Matt Mastracci 2023-04-22 11:48:21 -06:00 committed by GitHub
parent d137501a63
commit bdffcb409f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 2912 additions and 336 deletions

64
Cargo.lock generated
View file

@ -728,7 +728,7 @@ dependencies = [
"fwdansi",
"glibc_version",
"http",
"hyper",
"hyper 0.14.26",
"import_map 0.15.0",
"indexmap",
"jsonc-parser",
@ -1022,11 +1022,14 @@ dependencies = [
"bytes",
"cache_control",
"deno_core",
"deno_net",
"deno_websocket",
"flate2",
"fly-accept-encoding",
"http",
"httparse",
"hyper",
"hyper 0.14.26",
"hyper 1.0.0-rc.3",
"memmem",
"mime",
"once_cell",
@ -1035,6 +1038,8 @@ dependencies = [
"pin-project",
"ring",
"serde",
"slab",
"thiserror",
"tokio",
"tokio-util",
]
@ -1119,6 +1124,7 @@ dependencies = [
"deno_core",
"deno_tls",
"log",
"pin-project",
"serde",
"socket2",
"tokio",
@ -1242,7 +1248,7 @@ dependencies = [
"fs3",
"fwdansi",
"http",
"hyper",
"hyper 0.14.26",
"libc",
"log",
"netif",
@ -1345,11 +1351,13 @@ dependencies = [
name = "deno_websocket"
version = "0.104.0"
dependencies = [
"bytes",
"deno_core",
"deno_net",
"deno_tls",
"fastwebsockets",
"http",
"hyper",
"hyper 0.14.26",
"serde",
"tokio",
"tokio-rustls",
@ -1794,13 +1802,13 @@ dependencies = [
[[package]]
name = "fastwebsockets"
version = "0.2.5"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965"
checksum = "2fbc4aeb6c0ab927a93b5e5fc70d4c7f834260fc414021ac40c58d046ea0e394"
dependencies = [
"base64 0.21.0",
"cc",
"hyper",
"hyper 0.14.26",
"pin-project",
"rand",
"sha1",
@ -2237,6 +2245,16 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "httparse"
version = "1.8.0"
@ -2267,7 +2285,7 @@ dependencies = [
"futures-util",
"h2",
"http",
"http-body",
"http-body 0.4.5",
"httparse",
"httpdate",
"itoa",
@ -2279,6 +2297,28 @@ dependencies = [
"want",
]
[[package]]
name = "hyper"
version = "1.0.0-rc.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body 1.0.0-rc.2",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"tokio",
"tracing",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.23.2"
@ -2286,7 +2326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"http",
"hyper",
"hyper 0.14.26",
"rustls",
"tokio",
"tokio-rustls",
@ -3614,8 +3654,8 @@ dependencies = [
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-rustls",
"ipnet",
"js-sys",
@ -4870,7 +4910,7 @@ dependencies = [
"fastwebsockets",
"flate2",
"futures",
"hyper",
"hyper 0.14.26",
"lazy-regex",
"lsp-types",
"nix",

View file

@ -91,7 +91,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
ecb = "=0.1.1"
fastwebsockets = "=0.2.5"
fastwebsockets = "=0.2.6"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"
@ -126,6 +126,7 @@ serde_json = "1.0.85"
serde_repr = "=0.1.9"
sha2 = { version = "0.10.6", features = ["oid"] }
signature = "=1.6.4"
slab = "0.4"
smallvec = "1.8"
socket2 = "0.4.7"
tar = "=0.4.38"

View file

@ -0,0 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const addr = Deno.args[0] ?? "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;
function readFileSync(file) {
return Deno.readTextFileSync(new URL(file, import.meta.url).pathname);
}
const CERT = readFileSync("../../tests/testdata/tls/localhost.crt");
const KEY = readFileSync("../../tests/testdata/tls/localhost.key");
function handler() {
return new Response("Hello World");
}
serve(handler, { hostname, port, reusePort: true, cert: CERT, key: KEY });

View file

@ -2,6 +2,7 @@
// deno-lint-ignore-file
import { assertMatch } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts";
import { TextProtoReader } from "../testdata/run/textproto.ts";
import {
@ -31,6 +32,27 @@ function onListen<T>(
};
}
Deno.test(async function httpServerShutsDownPortBeforeResolving() {
const ac = new AbortController();
const listeningPromise = deferred();
const server = Deno.serve({
handler: (_req) => new Response("ok"),
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
});
await listeningPromise;
assertThrows(() => Deno.listen({ port: 4501 }));
ac.abort();
await server;
const listener = Deno.listen({ port: 4501 });
listener!.close();
});
Deno.test(async function httpServerCanResolveHostnames() {
const ac = new AbortController();
const listeningPromise = deferred();
@ -120,6 +142,71 @@ Deno.test({ permissions: { net: true } }, async function httpServerBasic() {
await server;
});
Deno.test({ permissions: { net: true } }, async function httpServerOnError() {
const ac = new AbortController();
const promise = deferred();
const listeningPromise = deferred();
let requestStash: Request | null;
const server = Deno.serve({
handler: async (request: Request) => {
requestStash = request;
await new Promise((r) => setTimeout(r, 100));
throw "fail";
},
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: () => {
return new Response("failed: " + requestStash!.url, { status: 500 });
},
});
await listeningPromise;
const resp = await fetch("http://127.0.0.1:4501/", {
headers: { "connection": "close" },
});
const text = await resp.text();
ac.abort();
await server;
assertEquals(text, "failed: http://127.0.0.1:4501/");
});
Deno.test(
{ permissions: { net: true } },
async function httpServerOnErrorFails() {
const ac = new AbortController();
const promise = deferred();
const listeningPromise = deferred();
let requestStash: Request | null;
const server = Deno.serve({
handler: async (request: Request) => {
requestStash = request;
await new Promise((r) => setTimeout(r, 100));
throw "fail";
},
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: () => {
throw "again";
},
});
await listeningPromise;
const resp = await fetch("http://127.0.0.1:4501/", {
headers: { "connection": "close" },
});
const text = await resp.text();
ac.abort();
await server;
assertEquals(text, "Internal Server Error");
},
);
Deno.test({ permissions: { net: true } }, async function httpServerOverload1() {
const ac = new AbortController();
const promise = deferred();
@ -238,7 +325,7 @@ Deno.test(
console.log = (msg) => {
try {
const match = msg.match(/Listening on http:\/\/localhost:(\d+)\//);
assert(!!match);
assert(!!match, `Didn't match ${msg}`);
const port = +match[1];
assert(port > 0 && port < 65536);
} finally {
@ -301,6 +388,109 @@ Deno.test(
},
);
function createUrlTest(
name: string,
methodAndPath: string,
host: string | null,
expected: string,
) {
Deno.test(`httpServerUrl${name}`, async () => {
const listeningPromise: Deferred<number> = deferred();
const urlPromise = deferred();
const ac = new AbortController();
const server = Deno.serve({
handler: async (request: Request) => {
urlPromise.resolve(request.url);
return new Response("");
},
port: 0,
signal: ac.signal,
onListen: ({ port }: { port: number }) => {
listeningPromise.resolve(port);
},
onError: createOnErrorCb(ac),
});
const port = await listeningPromise;
const conn = await Deno.connect({ port });
const encoder = new TextEncoder();
const body = `${methodAndPath} HTTP/1.1\r\n${
host ? ("Host: " + host + "\r\n") : ""
}Content-Length: 5\r\n\r\n12345`;
const writeResult = await conn.write(encoder.encode(body));
assertEquals(body.length, writeResult);
try {
const expectedResult = expected.replace("HOST", "localhost").replace(
"PORT",
`${port}`,
);
assertEquals(await urlPromise, expectedResult);
} finally {
ac.abort();
await server;
conn.close();
}
});
}
createUrlTest("WithPath", "GET /path", null, "http://HOST:PORT/path");
createUrlTest(
"WithPathAndHost",
"GET /path",
"deno.land",
"http://deno.land/path",
);
createUrlTest(
"WithAbsolutePath",
"GET http://localhost/path",
null,
"http://localhost/path",
);
createUrlTest(
"WithAbsolutePathAndHost",
"GET http://localhost/path",
"deno.land",
"http://localhost/path",
);
createUrlTest(
"WithPortAbsolutePath",
"GET http://localhost:1234/path",
null,
"http://localhost:1234/path",
);
createUrlTest(
"WithPortAbsolutePathAndHost",
"GET http://localhost:1234/path",
"deno.land",
"http://localhost:1234/path",
);
createUrlTest(
"WithPortAbsolutePathAndHostWithPort",
"GET http://localhost:1234/path",
"deno.land:9999",
"http://localhost:1234/path",
);
createUrlTest("WithAsterisk", "OPTIONS *", null, "*");
createUrlTest(
"WithAuthorityForm",
"CONNECT deno.land:80",
null,
"deno.land:80",
);
// TODO(mmastrac): These should probably be 400 errors
createUrlTest("WithInvalidAsterisk", "GET *", null, "*");
createUrlTest("WithInvalidNakedPath", "GET path", null, "path");
createUrlTest(
"WithInvalidNakedAuthority",
"GET deno.land:1234",
null,
"deno.land:1234",
);
Deno.test(
{ permissions: { net: true } },
async function httpServerGetRequestBody() {
@ -536,7 +726,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
response,
socket,
} = Deno.upgradeWebSocket(request);
socket.onerror = () => fail();
socket.onerror = (e) => {
console.error(e);
fail();
};
socket.onmessage = (m) => {
socket.send(m.data);
socket.close(1001);
@ -553,7 +746,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
ws.onerror = () => fail();
ws.onerror = (e) => {
console.error(e);
fail();
};
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
@ -562,6 +758,50 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
await server;
});
Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketCanAccessRequest() {
const ac = new AbortController();
const listeningPromise = deferred();
const server = Deno.serve({
handler: async (request) => {
const {
response,
socket,
} = Deno.upgradeWebSocket(request);
socket.onerror = (e) => {
console.error(e);
fail();
};
socket.onmessage = (m) => {
socket.send(request.url.toString());
socket.close(1001);
};
return response;
},
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});
await listeningPromise;
const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "http://localhost:4501/");
ws.onerror = (e) => {
console.error(e);
fail();
};
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
await def;
ac.abort();
await server;
},
);
Deno.test(
{ permissions: { net: true } },
async function httpVeryLargeRequest() {
@ -682,47 +922,46 @@ Deno.test(
},
);
// FIXME: auto request body reading is intefering with passing it as response.
// Deno.test(
// { permissions: { net: true } },
// async function httpServerStreamDuplex() {
// const promise = deferred();
// const ac = new AbortController();
Deno.test(
{ permissions: { net: true } },
async function httpServerStreamDuplex() {
const promise = deferred();
const ac = new AbortController();
// const server = Deno.serve(request => {
// assert(request.body);
const server = Deno.serve((request) => {
assert(request.body);
// promise.resolve();
// return new Response(request.body);
// }, { port: 2333, signal: ac.signal });
promise.resolve();
return new Response(request.body);
}, { port: 2333, signal: ac.signal });
// const ts = new TransformStream();
// const writable = ts.writable.getWriter();
const ts = new TransformStream();
const writable = ts.writable.getWriter();
// const resp = await fetch("http://127.0.0.1:2333/", {
// method: "POST",
// body: ts.readable,
// });
const resp = await fetch("http://127.0.0.1:2333/", {
method: "POST",
body: ts.readable,
});
// await promise;
// assert(resp.body);
// const reader = resp.body.getReader();
// await writable.write(new Uint8Array([1]));
// const chunk1 = await reader.read();
// assert(!chunk1.done);
// assertEquals(chunk1.value, new Uint8Array([1]));
// await writable.write(new Uint8Array([2]));
// const chunk2 = await reader.read();
// assert(!chunk2.done);
// assertEquals(chunk2.value, new Uint8Array([2]));
// await writable.close();
// const chunk3 = await reader.read();
// assert(chunk3.done);
await promise;
assert(resp.body);
const reader = resp.body.getReader();
await writable.write(new Uint8Array([1]));
const chunk1 = await reader.read();
assert(!chunk1.done);
assertEquals(chunk1.value, new Uint8Array([1]));
await writable.write(new Uint8Array([2]));
const chunk2 = await reader.read();
assert(!chunk2.done);
assertEquals(chunk2.value, new Uint8Array([2]));
await writable.close();
const chunk3 = await reader.read();
assert(chunk3.done);
// ac.abort();
// await server;
// },
// );
ac.abort();
await server;
},
);
Deno.test(
{ permissions: { net: true } },
@ -867,10 +1106,10 @@ Deno.test(
let responseText = new TextDecoder("iso-8859-1").decode(buf);
clientConn.close();
assert(/\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/.test(responseText));
ac.abort();
await server;
assertMatch(responseText, /\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/);
},
);
@ -1355,12 +1594,11 @@ createServerLengthTest("autoResponseWithKnownLengthEmpty", {
expects_con_len: true,
});
// FIXME: https://github.com/denoland/deno/issues/15892
// createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
// body: stream(""),
// expects_chunked: true,
// expects_con_len: false,
// });
createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
body: stream(""),
expects_chunked: true,
expects_con_len: false,
});
Deno.test(
{ permissions: { net: true } },
@ -1841,6 +2079,7 @@ Deno.test(
method: "GET",
headers: { "connection": "close" },
});
assertEquals(resp.status, 204);
assertEquals(resp.headers.get("Content-Length"), null);
} finally {
ac.abort();
@ -2162,11 +2401,11 @@ Deno.test(
count++;
return new Response(`hello world ${count}`);
}, {
async onListen() {
const res1 = await fetch("http://localhost:9000/");
async onListen({ port }: { port: number }) {
const res1 = await fetch(`http://localhost:${port}/`);
assertEquals(await res1.text(), "hello world 1");
const res2 = await fetch("http://localhost:9000/");
const res2 = await fetch(`http://localhost:${port}/`);
assertEquals(await res2.text(), "hello world 2");
promise.resolve();
@ -2199,13 +2438,13 @@ Deno.test(
return new Response("ok");
},
signal: ac.signal,
onListen: onListen(listeningPromise),
onListen: ({ port }: { port: number }) => listeningPromise.resolve(port),
onError: createOnErrorCb(ac),
});
try {
await listeningPromise;
const resp = await fetch("http://localhost:9000/", {
const port = await listeningPromise;
const resp = await fetch(`http://localhost:${port}/`, {
headers: { connection: "close" },
method: "POST",
body: '{"sus":true}',
@ -2238,8 +2477,8 @@ Deno.test(
},
}),
), {
async onListen() {
const res1 = await fetch("http://localhost:9000/");
async onListen({ port }) {
const res1 = await fetch(`http://localhost:${port}/`);
assertEquals((await res1.text()).length, 40 * 50_000);
promise.resolve();

View file

@ -43,6 +43,22 @@ Deno.test(async function websocketPingPong() {
ws.close();
});
// TODO(mmastrac): This requires us to ignore bad certs
// Deno.test(async function websocketSecureConnect() {
// const promise = deferred();
// const ws = new WebSocket("wss://localhost:4243/");
// assertEquals(ws.url, "wss://localhost:4243/");
// ws.onerror = (error) => {
// console.log(error);
// fail();
// };
// ws.onopen = () => ws.close();
// ws.onclose = () => {
// promise.resolve();
// };
// await promise;
// });
// https://github.com/denoland/deno/issues/18700
Deno.test(
{ sanitizeOps: false, sanitizeResources: false },

View file

@ -3,6 +3,7 @@
use std::ops::Deref;
use std::ops::DerefMut;
use bytes::Buf;
use serde_v8::ZeroCopyBuf;
/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can
@ -26,11 +27,11 @@ enum BufViewInner {
}
impl BufView {
fn from_inner(inner: BufViewInner) -> Self {
const fn from_inner(inner: BufViewInner) -> Self {
Self { inner, cursor: 0 }
}
pub fn empty() -> Self {
pub const fn empty() -> Self {
Self::from_inner(BufViewInner::Empty)
}
@ -65,6 +66,20 @@ impl BufView {
}
}
impl Buf for BufView {
fn remaining(&self) -> usize {
self.len()
}
fn chunk(&self) -> &[u8] {
self.deref()
}
fn advance(&mut self, cnt: usize) {
self.advance_cursor(cnt)
}
}
impl Deref for BufView {
type Target = [u8];
@ -210,6 +225,20 @@ impl BufMutView {
}
}
impl Buf for BufMutView {
fn remaining(&self) -> usize {
self.len()
}
fn chunk(&self) -> &[u8] {
self.deref()
}
fn advance(&mut self, cnt: usize) {
self.advance_cursor(cnt)
}
}
impl Deref for BufMutView {
type Target = [u8];

534
ext/http/00_serve.js Normal file
View file

@ -0,0 +1,534 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const core = globalThis.Deno.core;
const primordials = globalThis.__bootstrap.primordials;
const { BadResourcePrototype } = core;
import { InnerBody } from "ext:deno_fetch/22_body.js";
import { Event } from "ext:deno_web/02_event.js";
import {
fromInnerResponse,
newInnerResponse,
toInnerResponse,
} from "ext:deno_fetch/23_response.js";
import { fromInnerRequest } from "ext:deno_fetch/23_request.js";
import { AbortController } from "ext:deno_web/03_abort_signal.js";
import {
_eventLoop,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_protocol,
_readyState,
_rid,
_role,
_server,
_serverHandleIdleTimeout,
SERVER,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
import {
Deferred,
getReadableStreamResourceBacking,
readableStreamForRid,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
const {
ObjectPrototypeIsPrototypeOf,
SafeSet,
SafeSetIterator,
SetPrototypeAdd,
SetPrototypeDelete,
Symbol,
TypeError,
Uint8ArrayPrototype,
Uint8Array,
} = primordials;
const _upgraded = Symbol("_upgraded");
function internalServerError() {
// "Internal Server Error"
return new Response(
new Uint8Array([
73,
110,
116,
101,
114,
110,
97,
108,
32,
83,
101,
114,
118,
101,
114,
32,
69,
114,
114,
111,
114,
]),
{ status: 500 },
);
}
// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded.
const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
newInnerResponse(101),
"immutable",
);
class InnerRequest {
#slabId;
#context;
#methodAndUri;
#streamRid;
#body;
#upgraded;
constructor(slabId, context) {
this.#slabId = slabId;
this.#context = context;
this.#upgraded = false;
}
close() {
if (this.#streamRid !== undefined) {
core.close(this.#streamRid);
this.#streamRid = undefined;
}
this.#slabId = undefined;
}
get [_upgraded]() {
return this.#upgraded;
}
_wantsUpgrade(upgradeType, ...originalArgs) {
// upgradeHttp is async
// TODO(mmastrac)
if (upgradeType == "upgradeHttp") {
throw "upgradeHttp is unavailable in Deno.serve at this time";
}
// upgradeHttpRaw is async
// TODO(mmastrac)
if (upgradeType == "upgradeHttpRaw") {
throw "upgradeHttp is unavailable in Deno.serve at this time";
}
// upgradeWebSocket is sync
if (upgradeType == "upgradeWebSocket") {
const response = originalArgs[0];
const ws = originalArgs[1];
this.url();
this.headerList;
this.close();
const goAhead = new Deferred();
this.#upgraded = () => {
goAhead.resolve();
};
// Start the upgrade in the background.
(async () => {
try {
// Returns the connection and extra bytes, which we can pass directly to op_ws_server_create
const upgrade = await core.opAsync2(
"op_upgrade",
this.#slabId,
response.headerList,
);
const wsRid = core.ops.op_ws_server_create(upgrade[0], upgrade[1]);
// We have to wait for the go-ahead signal
await goAhead;
ws[_rid] = wsRid;
ws[_readyState] = WebSocket.OPEN;
ws[_role] = SERVER;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
} catch (error) {
const event = new ErrorEvent("error", { error });
ws.dispatchEvent(event);
}
})();
return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws };
}
}
url() {
if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
throw new TypeError("request closed");
}
// TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
// splitting this up into multiple ops.
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
}
const path = this.#methodAndUri[2];
// * is valid for OPTIONS
if (path === "*") {
return "*";
}
// If the path is empty, return the authority (valid for CONNECT)
if (path == "") {
return this.#methodAndUri[1];
}
// CONNECT requires an authority
if (this.#methodAndUri[0] == "CONNECT") {
return this.#methodAndUri[1];
}
const hostname = this.#methodAndUri[1];
if (hostname) {
// Construct a URL from the scheme, the hostname, and the path
return this.#context.scheme + hostname + path;
}
// Construct a URL from the scheme, the fallback hostname, and the path
return this.#context.scheme + this.#context.fallbackHost + path;
}
get remoteAddr() {
if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
throw new TypeError("request closed");
}
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
}
return {
transport: "tcp",
hostname: this.#methodAndUri[3],
port: this.#methodAndUri[4],
};
}
get method() {
if (this.#methodAndUri === undefined) {
if (this.#slabId === undefined) {
throw new TypeError("request closed");
}
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
}
return this.#methodAndUri[0];
}
get body() {
if (this.#slabId === undefined) {
throw new TypeError("request closed");
}
if (this.#body !== undefined) {
return this.#body;
}
// If the method is GET or HEAD, we do not want to include a body here, even if the Rust
// side of the code is willing to provide it to us.
if (this.method == "GET" || this.method == "HEAD") {
this.#body = null;
return null;
}
this.#streamRid = core.ops.op_read_request_body(this.#slabId);
this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
return this.#body;
}
get headerList() {
if (this.#slabId === undefined) {
throw new TypeError("request closed");
}
return core.ops.op_get_request_headers(this.#slabId);
}
get slabId() {
return this.#slabId;
}
}
class CallbackContext {
scheme;
fallbackHost;
serverRid;
closed;
initialize(args) {
this.serverRid = args[0];
this.scheme = args[1];
this.fallbackHost = args[2];
this.closed = false;
}
close() {
try {
this.closed = true;
core.tryClose(this.serverRid);
} catch {
// Pass
}
}
}
function fastSyncResponseOrStream(req, respBody) {
if (respBody === null || respBody === undefined) {
// Don't set the body
return null;
}
const stream = respBody.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
core.ops.op_set_response_body_bytes(req, body);
return null;
}
if (typeof body === "string") {
core.ops.op_set_response_body_text(req, body);
return null;
}
// At this point in the response it needs to be a stream
if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
throw TypeError("invalid response");
}
const resourceBacking = getReadableStreamResourceBacking(stream);
if (resourceBacking) {
core.ops.op_set_response_body_resource(
req,
resourceBacking.rid,
resourceBacking.autoClose,
);
return null;
}
return stream;
}
async function asyncResponse(responseBodies, req, status, stream) {
const responseRid = core.ops.op_set_response_body_stream(req);
SetPrototypeAdd(responseBodies, responseRid);
const reader = stream.getReader();
core.ops.op_set_promise_complete(req, status);
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
await core.writeAll(responseRid, value);
}
} catch (error) {
await reader.cancel(error);
} finally {
core.tryClose(responseRid);
SetPrototypeDelete(responseBodies, responseRid);
reader.releaseLock();
}
}
/**
* Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided
* callback, then extracts the response that was returned from that callback. The response is then pulled
* apart and handled on the Rust side.
*
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(responseBodies, context, signal, callback, onError) {
return async function (req) {
const innerRequest = new InnerRequest(req, context);
const request = fromInnerRequest(innerRequest, signal, "immutable");
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error.
let response;
try {
response = await callback(request, {
remoteAddr: innerRequest.remoteAddr,
});
} catch (error) {
try {
response = await onError(error);
} catch (error) {
console.error("Exception in onError while handling exception", error);
response = internalServerError();
}
}
const inner = toInnerResponse(response);
if (innerRequest[_upgraded]) {
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
if (response !== UPGRADE_RESPONSE_SENTINEL) {
console.error("Upgrade response was not returned from callback");
context.close();
}
innerRequest[_upgraded]();
return;
}
// Did everything shut down while we were waiting?
if (context.closed) {
innerRequest.close();
return;
}
const status = inner.status;
const headers = inner.headerList;
if (headers && headers.length > 0) {
if (headers.length == 1) {
core.ops.op_set_response_header(req, headers[0][0], headers[0][1]);
} else {
core.ops.op_set_response_headers(req, headers);
}
}
// Attempt to response quickly to this request, otherwise extract the stream
const stream = fastSyncResponseOrStream(req, inner.body);
if (stream !== null) {
// Handle the stream asynchronously
await asyncResponse(responseBodies, req, status, stream);
} else {
core.ops.op_set_promise_complete(req, status);
}
innerRequest.close();
};
}
async function serve(arg1, arg2) {
let options = undefined;
let handler = undefined;
if (typeof arg1 === "function") {
handler = arg1;
options = arg2;
} else if (typeof arg2 === "function") {
handler = arg2;
options = arg1;
} else {
options = arg1;
}
if (handler === undefined) {
if (options === undefined) {
throw new TypeError(
"No handler was provided, so an options bag is mandatory.",
);
}
handler = options.handler;
}
if (typeof handler !== "function") {
throw new TypeError("A handler function must be provided.");
}
if (options === undefined) {
options = {};
}
const wantsHttps = options.cert || options.key;
const signal = options.signal;
const onError = options.onError ?? function (error) {
console.error(error);
return internalServerError();
};
const listenOpts = {
hostname: options.hostname ?? "0.0.0.0",
port: options.port ?? (wantsHttps ? 9000 : 8000),
reusePort: options.reusePort ?? false,
};
const abortController = new AbortController();
const responseBodies = new SafeSet();
const context = new CallbackContext();
const callback = mapToCallback(
responseBodies,
context,
abortController.signal,
handler,
onError,
);
if (wantsHttps) {
if (!options.cert || !options.key) {
throw new TypeError(
"Both cert and key must be provided to enable HTTPS.",
);
}
listenOpts.cert = options.cert;
listenOpts.key = options.key;
listenOpts.alpnProtocols = ["h2", "http/1.1"];
const listener = Deno.listenTls(listenOpts);
listenOpts.port = listener.addr.port;
context.initialize(core.ops.op_serve_http(
listener.rid,
));
} else {
const listener = Deno.listen(listenOpts);
listenOpts.port = listener.addr.port;
context.initialize(core.ops.op_serve_http(
listener.rid,
));
}
signal?.addEventListener(
"abort",
() => context.close(),
{ once: true },
);
const onListen = options.onListen ?? function ({ port }) {
// If the hostname is "0.0.0.0", we display "localhost" in console
// because browsers in Windows don't resolve "0.0.0.0".
// See the discussion in https://github.com/denoland/deno_std/issues/1165
const hostname = listenOpts.hostname == "0.0.0.0"
? "localhost"
: listenOpts.hostname;
console.log(`Listening on ${context.scheme}${hostname}:${port}/`);
};
onListen({ port: listenOpts.port });
while (true) {
const rid = context.serverRid;
let req;
try {
req = await core.opAsync("op_http_wait", rid);
} catch (error) {
if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
break;
}
throw new Deno.errors.Http(error);
}
if (req === 0xffffffff) {
break;
}
callback(req).catch((error) => {
// Abnormal exit
console.error(
"Terminating Deno.serve loop due to unexpected error",
error,
);
context.close();
});
}
for (const streamRid of new SafeSetIterator(responseBodies)) {
core.tryClose(streamRid);
}
}
export { serve };

View file

@ -32,8 +32,8 @@ import {
SERVER,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js";
import { listenTls, TlsConn } from "ext:deno_net/02_tls.js";
import { TcpConn, UnixConn } from "ext:deno_net/01_net.js";
import { TlsConn } from "ext:deno_net/02_tls.js";
import {
Deferred,
getReadableStreamResourceBacking,
@ -41,18 +41,17 @@ import {
readableStreamForRid,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
import { serve } from "ext:deno_http/00_serve.js";
const {
ArrayPrototypeIncludes,
ArrayPrototypeMap,
ArrayPrototypePush,
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
SafeSet,
SafeSetIterator,
SetPrototypeAdd,
SetPrototypeDelete,
SetPrototypeClear,
StringPrototypeCharCodeAt,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
@ -406,6 +405,7 @@ const websocketCvf = buildCaseInsensitiveCommaValueFinder("websocket");
const upgradeCvf = buildCaseInsensitiveCommaValueFinder("upgrade");
function upgradeWebSocket(request, options = {}) {
const inner = toInnerRequest(request);
const upgrade = request.headers.get("upgrade");
const upgradeHasWebSocketOption = upgrade !== null &&
websocketCvf(upgrade);
@ -455,25 +455,39 @@ function upgradeWebSocket(request, options = {}) {
}
}
const response = fromInnerResponse(r, "immutable");
const socket = webidl.createBranded(WebSocket);
setEventTargetData(socket);
socket[_server] = true;
response[_ws] = socket;
socket[_idleTimeoutDuration] = options.idleTimeout ?? 120;
socket[_idleTimeoutTimeout] = null;
if (inner._wantsUpgrade) {
return inner._wantsUpgrade("upgradeWebSocket", r, socket);
}
const response = fromInnerResponse(r, "immutable");
response[_ws] = socket;
return { response, socket };
}
function upgradeHttp(req) {
const inner = toInnerRequest(req);
if (inner._wantsUpgrade) {
return inner._wantsUpgrade("upgradeHttp", arguments);
}
req[_deferred] = new Deferred();
return req[_deferred].promise;
}
async function upgradeHttpRaw(req, tcpConn) {
const inner = toInnerRequest(req);
if (inner._wantsUpgrade) {
return inner._wantsUpgrade("upgradeHttpRaw", arguments);
}
const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]);
return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr);
}
@ -552,233 +566,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) {
internals.buildCaseInsensitiveCommaValueFinder =
buildCaseInsensitiveCommaValueFinder;
function hostnameForDisplay(hostname) {
// If the hostname is "0.0.0.0", we display "localhost" in console
// because browsers in Windows don't resolve "0.0.0.0".
// See the discussion in https://github.com/denoland/deno_std/issues/1165
return hostname === "0.0.0.0" ? "localhost" : hostname;
}
async function respond(handler, requestEvent, connInfo, onError) {
let response;
try {
response = await handler(requestEvent.request, connInfo);
if (response.bodyUsed && response.body !== null) {
throw new TypeError("Response body already consumed.");
}
} catch (e) {
// Invoke `onError` handler if the request handler throws.
response = await onError(e);
}
try {
// Send the response.
await requestEvent.respondWith(response);
} catch {
// `respondWith()` can throw for various reasons, including downstream and
// upstream connection errors, as well as errors thrown during streaming
// of the response content. In order to avoid false negatives, we ignore
// the error here and let `serveHttp` close the connection on the
// following iteration if it is in fact a downstream connection error.
}
}
async function serveConnection(
server,
activeHttpConnections,
handler,
httpConn,
connInfo,
onError,
) {
while (!server.closed) {
let requestEvent = null;
try {
// Yield the new HTTP request on the connection.
requestEvent = await httpConn.nextRequest();
} catch {
// Connection has been closed.
break;
}
if (requestEvent === null) {
break;
}
respond(handler, requestEvent, connInfo, onError);
}
SetPrototypeDelete(activeHttpConnections, httpConn);
try {
httpConn.close();
} catch {
// Connection has already been closed.
}
}
async function serve(arg1, arg2) {
let options = undefined;
let handler = undefined;
if (typeof arg1 === "function") {
handler = arg1;
options = arg2;
} else if (typeof arg2 === "function") {
handler = arg2;
options = arg1;
} else {
options = arg1;
}
if (handler === undefined) {
if (options === undefined) {
throw new TypeError(
"No handler was provided, so an options bag is mandatory.",
);
}
handler = options.handler;
}
if (typeof handler !== "function") {
throw new TypeError("A handler function must be provided.");
}
if (options === undefined) {
options = {};
}
const signal = options.signal;
const onError = options.onError ?? function (error) {
console.error(error);
return new Response("Internal Server Error", { status: 500 });
};
const onListen = options.onListen ?? function ({ port }) {
console.log(
`Listening on http://${hostnameForDisplay(listenOpts.hostname)}:${port}/`,
);
};
const listenOpts = {
hostname: options.hostname ?? "127.0.0.1",
port: options.port ?? 9000,
reusePort: options.reusePort ?? false,
};
if (options.cert || options.key) {
if (!options.cert || !options.key) {
throw new TypeError(
"Both cert and key must be provided to enable HTTPS.",
);
}
listenOpts.cert = options.cert;
listenOpts.key = options.key;
}
let listener;
if (listenOpts.cert && listenOpts.key) {
listener = listenTls({
hostname: listenOpts.hostname,
port: listenOpts.port,
cert: listenOpts.cert,
key: listenOpts.key,
reusePort: listenOpts.reusePort,
});
} else {
listener = listen({
hostname: listenOpts.hostname,
port: listenOpts.port,
reusePort: listenOpts.reusePort,
});
}
const serverDeferred = new Deferred();
const activeHttpConnections = new SafeSet();
const server = {
transport: listenOpts.cert && listenOpts.key ? "https" : "http",
hostname: listenOpts.hostname,
port: listenOpts.port,
closed: false,
close() {
if (server.closed) {
return;
}
server.closed = true;
try {
listener.close();
} catch {
// Might have been already closed.
}
for (const httpConn of new SafeSetIterator(activeHttpConnections)) {
try {
httpConn.close();
} catch {
// Might have been already closed.
}
}
SetPrototypeClear(activeHttpConnections);
serverDeferred.resolve();
},
async serve() {
while (!server.closed) {
let conn;
try {
conn = await listener.accept();
} catch {
// Listener has been closed.
if (!server.closed) {
console.log("Listener has closed unexpectedly");
}
break;
}
let httpConn;
try {
const rid = ops.op_http_start(conn.rid);
httpConn = new HttpConn(rid, conn.remoteAddr, conn.localAddr);
} catch {
// Connection has been closed;
continue;
}
SetPrototypeAdd(activeHttpConnections, httpConn);
const connInfo = {
localAddr: conn.localAddr,
remoteAddr: conn.remoteAddr,
};
// Serve the HTTP connection
serveConnection(
server,
activeHttpConnections,
handler,
httpConn,
connInfo,
onError,
);
}
await serverDeferred.promise;
},
};
signal?.addEventListener(
"abort",
() => {
try {
server.close();
} catch {
// Pass
}
},
{ once: true },
);
onListen(listener.addr);
await PromisePrototypeCatch(server.serve(), console.error);
}
export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket };

View file

@ -10,6 +10,9 @@ readme = "README.md"
repository.workspace = true
description = "HTTP server implementation for Deno"
[features]
"__zombie_http_tracking" = []
[lib]
path = "lib.rs"
@ -24,11 +27,14 @@ brotli = "3.3.4"
bytes.workspace = true
cache_control.workspace = true
deno_core.workspace = true
deno_net.workspace = true
deno_websocket.workspace = true
flate2.workspace = true
fly-accept-encoding = "0.2.0"
http.workspace = true
httparse.workspace = true
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
hyper1 = { package = "hyper", features = ["full"], version = "1.0.0-rc.3" }
memmem.workspace = true
mime = "0.3.16"
once_cell.workspace = true
@ -37,6 +43,8 @@ phf = { version = "0.10", features = ["macros"] }
pin-project.workspace = true
ring.workspace = true
serde.workspace = true
slab.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["io"] }

765
ext/http/http_next.rs Normal file
View file

@ -0,0 +1,765 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::extract_network_stream;
use crate::request_body::HttpRequestBody;
use crate::request_properties::DefaultHttpRequestProperties;
use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::response_body::V8StreamHttpResponseBody;
use crate::LocalExecutor;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::op;
use deno_core::AsyncRefCell;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_net::ops_tls::TlsStream;
use deno_net::raw::put_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress;
use http::request::Parts;
use hyper1::body::Incoming;
use hyper1::header::COOKIE;
use hyper1::http::HeaderName;
use hyper1::http::HeaderValue;
use hyper1::server::conn::http1;
use hyper1::server::conn::http2;
use hyper1::service::service_fn;
use hyper1::upgrade::OnUpgrade;
use hyper1::StatusCode;
use pin_project::pin_project;
use pin_project::pinned_drop;
use slab::Slab;
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::rc::Rc;
use tokio::task::spawn_local;
use tokio::task::JoinHandle;
type Request = hyper1::Request<Incoming>;
type Response = hyper1::Response<ResponseBytes>;
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<Incoming>,
// The response may get taken before we tear this down
response: Option<Response>,
body: Option<Rc<HttpRequestBody>>,
promise: CompletionHandle,
#[cfg(__zombie_http_tracking)]
alive: bool,
}
thread_local! {
pub static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
}
/// Generates getters and setters for the [`SLAB`]. For example,
/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to:
///
/// ```ignore
/// #[inline(always)]
/// #[allow(dead_code)]
/// pub(crate) fn with_req_mut<T>(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T {
/// SLAB.with(|slab| {
/// let mut borrow = slab.borrow_mut();
/// let mut http = borrow.get_mut(key).unwrap();
/// #[cfg(__zombie_http_tracking)]
/// if !http.alive {
/// panic!("Attempted to access a dead HTTP object")
/// }
/// f(&mut http.expr)
/// })
/// }
/// #[inline(always)]
/// #[allow(dead_code)]
/// pub(crate) fn with_req<T>(key: usize, f: impl FnOnce(&Parts) -> T) -> T {
/// SLAB.with(|slab| {
/// let mut borrow = slab.borrow();
/// let mut http = borrow.get(key).unwrap();
/// #[cfg(__zombie_http_tracking)]
/// if !http.alive {
/// panic!("Attempted to access a dead HTTP object")
/// }
/// f(&http.expr)
/// })
/// }
/// ```
macro_rules! with {
($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => {
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $mut<T>(key: usize, f: impl FnOnce(&mut $type) -> T) -> T {
SLAB.with(|slab| {
let mut borrow = slab.borrow_mut();
#[allow(unused_mut)] // TODO(mmastrac): compiler issue?
let mut $http = borrow.get_mut(key).unwrap();
#[cfg(__zombie_http_tracking)]
if !$http.alive {
panic!("Attempted to access a dead HTTP object")
}
f(&mut $expr)
})
}
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $ref<T>(key: usize, f: impl FnOnce(&$type) -> T) -> T {
SLAB.with(|slab| {
let borrow = slab.borrow();
let $http = borrow.get(key).unwrap();
#[cfg(__zombie_http_tracking)]
if !$http.alive {
panic!("Attempted to access a dead HTTP object")
}
f(&$expr)
})
}
};
}
with!(with_req, with_req_mut, Parts, http, http.request_parts);
with!(
with_req_body,
with_req_body_mut,
Option<Incoming>,
http,
http.request_body
);
with!(
with_resp,
with_resp_mut,
Option<Response>,
http,
http.response
);
with!(
with_body,
with_body_mut,
Option<Rc<HttpRequestBody>>,
http,
http.body
);
with!(
with_promise,
with_promise_mut,
CompletionHandle,
http,
http.promise
);
with!(with_http, with_http_mut, HttpSlabRecord, http, http);
fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
) -> usize {
SLAB.with(|slab| {
let (request_parts, request_body) = request.into_parts();
slab.borrow_mut().insert(HttpSlabRecord {
request_info,
request_parts,
request_body: Some(request_body),
response: Some(Response::new(ResponseBytes::default())),
body: None,
promise: CompletionHandle::default(),
#[cfg(__zombie_http_tracking)]
alive: true,
})
})
}
#[op]
pub fn op_upgrade_raw(_index: usize) {}
#[op]
pub async fn op_upgrade(
state: Rc<RefCell<OpState>>,
index: usize,
headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, ZeroCopyBuf), AnyError> {
// Stage 1: set the respnse to 101 Switching Protocols and send it
let upgrade = with_http_mut(index, |http| {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
let upgrade = http.request_parts.extensions.remove::<OnUpgrade>().unwrap();
let response = http.response.as_mut().unwrap();
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
response.headers_mut().append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
http.promise.complete(true);
upgrade
});
// Stage 2: wait for the request to finish upgrading
let upgraded = upgrade.await?;
// Stage 3: return the extracted raw network stream
let (stream, bytes) = extract_network_stream(upgraded);
// We're allocating for those extra bytes, but they are probably going to be empty most of the time
Ok((
put_network_stream_resource(
&mut state.borrow_mut().resource_table,
stream,
)?,
ZeroCopyBuf::from(bytes.to_vec()),
))
}
#[op]
pub fn op_set_promise_complete(index: usize, status: u16) {
with_resp_mut(index, |resp| {
// The Javascript code will never provide a status that is invalid here (see 23_response.js)
*resp.as_mut().unwrap().status_mut() =
StatusCode::from_u16(status).unwrap();
});
with_promise_mut(index, |promise| {
promise.complete(true);
});
}
#[op]
pub fn op_get_request_method_and_url(
index: usize,
) -> (String, Option<String>, String, String, Option<u16>) {
// TODO(mmastrac): Passing method can be optimized
with_http(index, |http| {
let request_properties = DefaultHttpRequestProperties::request_properties(
&http.request_info,
&http.request_parts.uri,
&http.request_parts.headers,
);
// Only extract the path part - we handle authority elsewhere
let path = match &http.request_parts.uri.path_and_query() {
Some(path_and_query) => path_and_query.to_string(),
None => "".to_owned(),
};
(
http.request_parts.method.as_str().to_owned(),
request_properties.authority,
path,
String::from(http.request_info.peer_address.as_ref()),
http.request_info.peer_port,
)
})
}
#[op]
pub fn op_get_request_header(index: usize, name: String) -> Option<ByteString> {
with_req(index, |req| {
let value = req.headers.get(name);
value.map(|value| value.as_bytes().into())
})
}
#[op]
pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> {
with_req(index, |req| {
let headers = &req.headers;
let mut vec = Vec::with_capacity(headers.len());
let mut cookies: Option<Vec<&[u8]>> = None;
for (name, value) in headers {
if name == COOKIE {
if let Some(ref mut cookies) = cookies {
cookies.push(value.as_bytes());
} else {
cookies = Some(vec![value.as_bytes()]);
}
} else {
let name: &[u8] = name.as_ref();
vec.push((name.into(), value.as_bytes().into()))
}
}
// We treat cookies specially, because we don't want them to get them
// mangled by the `Headers` object in JS. What we do is take all cookie
// headers and concat them into a single cookie header, separated by
// semicolons.
// TODO(mmastrac): This should probably happen on the JS side on-demand
if let Some(cookies) = cookies {
let cookie_sep = "; ".as_bytes();
vec.push((
ByteString::from(COOKIE.as_str()),
ByteString::from(cookies.join(cookie_sep)),
));
}
vec
})
}
#[op]
pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId {
let incoming = with_req_body_mut(index, |body| body.take().unwrap());
let body_resource = Rc::new(HttpRequestBody::new(incoming));
let res = state.resource_table.add_rc(body_resource.clone());
with_body_mut(index, |body| {
*body = Some(body_resource);
});
res
}
#[op]
pub fn op_set_response_header(
index: usize,
name: ByteString,
value: ByteString,
) {
with_resp_mut(index, |resp| {
let resp_headers = resp.as_mut().unwrap().headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = HeaderValue::from_bytes(&value).unwrap();
resp_headers.append(name, value);
});
}
#[op]
pub fn op_set_response_headers(
index: usize,
headers: Vec<(ByteString, ByteString)>,
) {
// TODO(mmastrac): Invalid headers should be handled?
with_resp_mut(index, |resp| {
let resp_headers = resp.as_mut().unwrap().headers_mut();
resp_headers.reserve(headers.len());
for (name, value) in headers {
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = HeaderValue::from_bytes(&value).unwrap();
resp_headers.append(name, value);
}
})
}
#[op]
pub fn op_set_response_body_resource(
state: &mut OpState,
index: usize,
stream_rid: ResourceId,
auto_close: bool,
) -> Result<(), AnyError> {
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
let resource = if auto_close {
state.resource_table.take_any(stream_rid)?
} else {
state.resource_table.get_any(stream_rid)?
};
with_resp_mut(index, move |response| {
let future = resource.clone().read(64 * 1024);
response
.as_mut()
.unwrap()
.body_mut()
.initialize(ResponseBytesInner::Resource(auto_close, resource, future));
});
Ok(())
}
#[op]
pub fn op_set_response_body_stream(
state: &mut OpState,
index: usize,
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = (
V8StreamHttpResponseBody::new(tx),
ResponseBytesInner::V8Stream(rx),
);
with_resp_mut(index, move |response| {
response.as_mut().unwrap().body_mut().initialize(rx);
});
Ok(state.resource_table.add(tx))
}
#[op]
pub fn op_set_response_body_text(index: usize, text: String) {
if !text.is_empty() {
with_resp_mut(index, move |response| {
response
.as_mut()
.unwrap()
.body_mut()
.initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes())))
});
}
}
#[op]
pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) {
if !buffer.is_empty() {
with_resp_mut(index, |response| {
response
.as_mut()
.unwrap()
.body_mut()
.initialize(ResponseBytesInner::Bytes(BufView::from(buffer)))
});
};
}
#[op]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
index: usize,
server_rid: ResourceId,
) -> Result<(), AnyError> {
let handle = with_resp(index, |resp| {
resp.as_ref().unwrap().body().completion_handle()
});
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
match handle.or_cancel(join_handle.cancel_handle()).await {
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
}
Err(_e) => Ok(()),
}
}
#[pin_project(PinnedDrop)]
pub struct SlabFuture<F: Future<Output = ()>>(usize, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
tx: tokio::sync::mpsc::Sender<usize>,
) -> SlabFuture<impl Future<Output = ()>> {
let index = slab_insert(request, request_info);
let rx = with_promise(index, |promise| promise.clone());
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
// We only need to wait for completion if we aren't closed
rx.await;
}
})
}
impl<F: Future<Output = ()>> SlabFuture<F> {}
#[pinned_drop]
impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
fn drop(self: Pin<&mut Self>) {
SLAB.with(|slab| {
#[cfg(__zombie_http_tracking)]
{
slab.borrow_mut().get_mut(self.0).unwrap().alive = false;
}
#[cfg(not(__zombie_http_tracking))]
{
slab.borrow_mut().remove(self.0);
}
});
}
}
impl<F: Future<Output = ()>> Future for SlabFuture<F> {
type Output = Result<Response, hyper::Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let index = self.0;
self
.project()
.1
.poll(cx)
.map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap())))
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
spawn_local(async {
io.handshake().await?;
let handshake = io.get_ref().1.alpn_protocol();
// h2
if handshake == Some(&[104, 50]) {
let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
conn.map_err(AnyError::from).try_or_cancel(cancel).await
} else {
let conn = http1::Builder::new()
.keep_alive(true)
.serve_connection(io, svc);
conn
.with_upgrades()
.map_err(AnyError::from)
.try_or_cancel(cancel)
.await
}
})
}
fn serve_http(
io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
spawn_local(async {
let conn = http1::Builder::new()
.keep_alive(true)
.serve_connection(io, svc);
conn
.with_upgrades()
.map_err(AnyError::from)
.try_or_cancel(cancel)
.await
})
}
fn serve_http_on(
network_stream: NetworkStream,
listen_properties: &HttpListenProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
) -> JoinHandle<Result<(), AnyError>> {
// We always want some sort of peer address. If we can't get one, just make up one.
let peer_address = network_stream.peer_address().unwrap_or_else(|_| {
NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
0,
)))
});
let connection_properties: HttpConnectionProperties =
DefaultHttpRequestProperties::connection_properties(
listen_properties,
&peer_address,
);
match network_stream {
NetworkStream::Tcp(conn) => {
serve_http(conn, connection_properties, cancel, tx)
}
NetworkStream::Tls(conn) => {
serve_https(conn, connection_properties, cancel, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
serve_http(conn, connection_properties, cancel, tx)
}
}
}
struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
CancelHandle,
AsyncRefCell<tokio::sync::mpsc::Receiver<usize>>,
);
impl HttpJoinHandle {
fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
RcRef::map(self, |this| &this.1)
}
}
impl Resource for HttpJoinHandle {
fn name(&self) -> Cow<str> {
"http".into()
}
fn close(self: Rc<Self>) {
self.1.cancel()
}
}
#[op(v8)]
pub fn op_serve_http(
state: Rc<RefCell<OpState>>,
listener_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> {
let listener =
DefaultHttpRequestProperties::get_network_stream_listener_for_rid(
&mut state.borrow_mut(),
listener_rid,
)?;
let local_address = listener.listen_address()?;
let listen_properties = DefaultHttpRequestProperties::listen_properties(
listener.stream(),
&local_address,
);
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
CancelHandle::new(),
AsyncRefCell::new(rx),
));
let cancel_clone = resource.cancel_handle();
let listen_properties_clone = listen_properties.clone();
let handle = spawn_local(async move {
loop {
let conn = listener
.accept()
.try_or_cancel(cancel_clone.clone())
.await?;
serve_http_on(
conn,
&listen_properties_clone,
cancel_clone.clone(),
tx.clone(),
);
}
#[allow(unreachable_code)]
Ok::<_, AnyError>(())
});
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.0)
.try_borrow_mut()
.unwrap() = Some(handle);
Ok((
state.borrow_mut().resource_table.add_rc(resource),
listen_properties.scheme,
listen_properties.fallback_host,
))
}
#[op(v8)]
pub fn op_serve_http_on(
state: Rc<RefCell<OpState>>,
conn: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> {
let network_stream =
DefaultHttpRequestProperties::get_network_stream_for_rid(
&mut state.borrow_mut(),
conn,
)?;
let local_address = network_stream.local_address()?;
let listen_properties = DefaultHttpRequestProperties::listen_properties(
network_stream.stream(),
&local_address,
);
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
CancelHandle::new(),
AsyncRefCell::new(rx),
));
let handle = serve_http_on(
network_stream,
&listen_properties,
resource.cancel_handle(),
tx,
);
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.0)
.try_borrow_mut()
.unwrap() = Some(handle);
Ok((
state.borrow_mut().resource_table.add_rc(resource),
listen_properties.scheme,
listen_properties.fallback_host,
))
}
#[op]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<u32, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(rid)?;
let cancel = join_handle.clone().cancel_handle();
let next = async {
let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
.unwrap_or_else(|_| None)
.await;
// Do we have a request?
if let Some(req) = next {
return Ok(req as u32);
}
// No - we're shutting down
let res = RcRef::map(join_handle, |this| &this.0)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
// Drop the cancel and join handles
state
.borrow_mut()
.resource_table
.take::<HttpJoinHandle>(rid)?;
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
return Ok(u32::MAX);
}
}
}
return Err(err);
}
Ok(u32::MAX)
}

View file

@ -34,6 +34,7 @@ use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::WriteOutcome;
use deno_core::ZeroCopyBuf;
use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
use flate2::Compression;
@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream;
use crate::reader_stream::ShutdownHandle;
pub mod compressible;
mod http_next;
mod reader_stream;
mod request_body;
mod request_properties;
mod response_body;
mod websocket_upgrade;
deno_core::extension!(
@ -92,8 +97,25 @@ deno_core::extension!(
op_http_websocket_accept_header,
op_http_upgrade_early,
op_http_upgrade_websocket,
http_next::op_serve_http,
http_next::op_serve_http_on,
http_next::op_http_wait,
http_next::op_http_track,
http_next::op_set_response_header,
http_next::op_set_response_headers,
http_next::op_set_response_body_text,
http_next::op_set_promise_complete,
http_next::op_set_response_body_bytes,
http_next::op_set_response_body_resource,
http_next::op_set_response_body_stream,
http_next::op_get_request_header,
http_next::op_get_request_headers,
http_next::op_get_request_method_and_url,
http_next::op_read_request_body,
http_next::op_upgrade,
http_next::op_upgrade_raw,
],
esm = ["01_http.js"],
esm = ["00_serve.js", "01_http.js"],
);
pub enum HttpSocketAddr {
@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket(
}
};
let transport = hyper::upgrade::on(request).await?;
let ws_rid = ws_create_server_stream(&state, transport).await?;
let (transport, bytes) =
extract_network_stream(hyper::upgrade::on(request).await?);
let ws_rid =
ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?;
Ok(ws_rid)
}
@ -1166,6 +1190,16 @@ where
}
}
impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
spawn_local(fut);
}
}
fn http_error(message: &'static str) -> AnyError {
custom_error("Http", message)
}
@ -1192,3 +1226,47 @@ fn filter_enotconn(
fn never() -> Pending<Never> {
pending()
}
trait CanDowncastUpgrade: Sized {
fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
self,
) -> Result<(T, Bytes), Self>;
}
impl CanDowncastUpgrade for hyper1::upgrade::Upgraded {
fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
self,
) -> Result<(T, Bytes), Self> {
let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
Ok((io, read_buf))
}
}
impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
self,
) -> Result<(T, Bytes), Self> {
let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
Ok((io, read_buf))
}
}
fn extract_network_stream<U: CanDowncastUpgrade>(
upgraded: U,
) -> (NetworkStream, Bytes) {
let upgraded = match upgraded.downcast::<tokio::net::TcpStream>() {
Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes),
Err(x) => x,
};
let upgraded = match upgraded.downcast::<deno_net::ops_tls::TlsStream>() {
Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes),
Err(x) => x,
};
#[cfg(unix)]
let upgraded = match upgraded.downcast::<tokio::net::UnixStream>() {
Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes),
Err(x) => x,
};
drop(upgraded);
unreachable!("unexpected stream type");
}

84
ext/http/request_body.rs Normal file
View file

@ -0,0 +1,84 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes;
use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::RcRef;
use deno_core::Resource;
use hyper1::body::Body;
use hyper1::body::Incoming;
use hyper1::body::SizeHint;
use std::borrow::Cow;
use std::pin::Pin;
use std::rc::Rc;
/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
struct ReadFuture(Incoming);
impl Stream for ReadFuture {
type Item = Result<Bytes, AnyError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let res = Pin::new(&mut self.get_mut().0).poll_frame(cx);
match res {
std::task::Poll::Ready(Some(Ok(frame))) => {
if let Ok(data) = frame.into_data() {
// Ensure that we never yield an empty frame
if !data.is_empty() {
return std::task::Poll::Ready(Some(Ok(data)));
}
}
}
std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
_ => {}
}
std::task::Poll::Pending
}
}
pub struct HttpRequestBody(AsyncRefCell<Peekable<ReadFuture>>, SizeHint);
impl HttpRequestBody {
pub fn new(body: Incoming) -> Self {
let size_hint = body.size_hint();
Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
}
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
let peekable = RcRef::map(self, |this| &this.0);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable).peek_mut().await {
None => Ok(BufView::empty()),
Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
Some(Ok(bytes)) => {
if bytes.len() <= limit {
// We can safely take the next item since we peeked it
return Ok(BufView::from(peekable.next().await.unwrap()?));
}
let ret = bytes.split_to(limit);
Ok(BufView::from(ret))
}
}
}
}
impl Resource for HttpRequestBody {
fn name(&self) -> Cow<str> {
"requestBody".into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(HttpRequestBody::read(self, limit))
}
fn size_hint(&self) -> (u64, Option<u64>) {
(self.1.lower(), self.1.upper())
}
}

View file

@ -0,0 +1,249 @@
use deno_core::error::AnyError;
use deno_core::OpState;
use deno_core::ResourceId;
use deno_net::raw::NetworkStream;
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_net::raw::take_network_stream_listener_resource;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStreamAddress;
use deno_net::raw::NetworkStreamListener;
use deno_net::raw::NetworkStreamType;
use hyper::HeaderMap;
use hyper::Uri;
use hyper1::header::HOST;
use std::borrow::Cow;
use std::rc::Rc;
// TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup
#[derive(Clone)]
pub struct HttpListenProperties {
pub stream_type: NetworkStreamType,
pub scheme: &'static str,
pub fallback_host: String,
pub local_port: Option<u16>,
}
#[derive(Clone)]
pub struct HttpConnectionProperties {
pub stream_type: NetworkStreamType,
pub peer_address: Rc<str>,
pub peer_port: Option<u16>,
pub local_port: Option<u16>,
}
pub struct HttpRequestProperties {
pub authority: Option<String>,
}
/// Pluggable trait to determine listen, connection and request properties
/// for embedders that wish to provide alternative routes for incoming HTTP.
pub trait HttpPropertyExtractor {
/// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`].
fn get_network_stream_listener_for_rid(
state: &mut OpState,
listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError>;
/// Given a connection [`ResourceId`], returns the [`NetworkStream`].
fn get_network_stream_for_rid(
state: &mut OpState,
rid: ResourceId,
) -> Result<NetworkStream, AnyError>;
/// Determines the listener properties.
fn listen_properties(
stream_type: NetworkStreamType,
local_address: &NetworkStreamAddress,
) -> HttpListenProperties;
/// Determines the connection properties.
fn connection_properties(
listen_properties: &HttpListenProperties,
peer_address: &NetworkStreamAddress,
) -> HttpConnectionProperties;
/// Determines the request properties.
fn request_properties(
connection_properties: &HttpConnectionProperties,
uri: &Uri,
headers: &HeaderMap,
) -> HttpRequestProperties;
}
pub struct DefaultHttpRequestProperties {}
impl HttpPropertyExtractor for DefaultHttpRequestProperties {
fn get_network_stream_for_rid(
state: &mut OpState,
rid: ResourceId,
) -> Result<NetworkStream, AnyError> {
take_network_stream_resource(&mut state.resource_table, rid)
}
fn get_network_stream_listener_for_rid(
state: &mut OpState,
listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError> {
take_network_stream_listener_resource(
&mut state.resource_table,
listener_rid,
)
}
fn listen_properties(
stream_type: NetworkStreamType,
local_address: &NetworkStreamAddress,
) -> HttpListenProperties {
let scheme = req_scheme_from_stream_type(stream_type);
let fallback_host = req_host_from_addr(stream_type, local_address);
let local_port: Option<u16> = match local_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)]
NetworkStreamAddress::Unix(_) => None,
};
HttpListenProperties {
scheme,
fallback_host,
local_port,
stream_type,
}
}
fn connection_properties(
listen_properties: &HttpListenProperties,
peer_address: &NetworkStreamAddress,
) -> HttpConnectionProperties {
let peer_port: Option<u16> = match peer_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)]
NetworkStreamAddress::Unix(_) => None,
};
let peer_address = match peer_address {
NetworkStreamAddress::Ip(addr) => Rc::from(addr.ip().to_string()),
#[cfg(unix)]
NetworkStreamAddress::Unix(_) => Rc::from("unix"),
};
let local_port = listen_properties.local_port;
let stream_type = listen_properties.stream_type;
HttpConnectionProperties {
stream_type,
peer_address,
peer_port,
local_port,
}
}
fn request_properties(
connection_properties: &HttpConnectionProperties,
uri: &Uri,
headers: &HeaderMap,
) -> HttpRequestProperties {
let authority = req_host(
uri,
headers,
connection_properties.stream_type,
connection_properties.local_port.unwrap_or_default(),
)
.map(|s| s.into_owned());
HttpRequestProperties { authority }
}
}
/// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in
/// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this.
fn req_host_from_addr(
stream_type: NetworkStreamType,
addr: &NetworkStreamAddress,
) -> String {
match addr {
NetworkStreamAddress::Ip(addr) => {
if (stream_type == NetworkStreamType::Tls && addr.port() == 443)
|| (stream_type == NetworkStreamType::Tcp && addr.port() == 80)
{
if addr.ip().is_loopback() || addr.ip().is_unspecified() {
return "localhost".to_owned();
}
addr.ip().to_string()
} else {
if addr.ip().is_loopback() || addr.ip().is_unspecified() {
return format!("localhost:{}", addr.port());
}
addr.to_string()
}
}
// There is no standard way for unix domain socket URLs
// nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL
// httpie uses http+unix://[percent_encoding_of_path]/ which we follow
#[cfg(unix)]
NetworkStreamAddress::Unix(unix) => percent_encoding::percent_encode(
unix
.as_pathname()
.and_then(|x| x.to_str())
.unwrap_or_default()
.as_bytes(),
percent_encoding::NON_ALPHANUMERIC,
)
.to_string(),
}
}
fn req_scheme_from_stream_type(stream_type: NetworkStreamType) -> &'static str {
match stream_type {
NetworkStreamType::Tcp => "http://",
NetworkStreamType::Tls => "https://",
#[cfg(unix)]
NetworkStreamType::Unix => "http+unix://",
}
}
fn req_host<'a>(
uri: &'a Uri,
headers: &'a HeaderMap,
addr_type: NetworkStreamType,
port: u16,
) -> Option<Cow<'a, str>> {
// Unix sockets always use the socket address
#[cfg(unix)]
if addr_type == NetworkStreamType::Unix {
return None;
}
// It is rare that an authority will be passed, but if it does, it takes priority
if let Some(auth) = uri.authority() {
match addr_type {
NetworkStreamType::Tcp => {
if port == 80 {
return Some(Cow::Borrowed(auth.host()));
}
}
NetworkStreamType::Tls => {
if port == 443 {
return Some(Cow::Borrowed(auth.host()));
}
}
#[cfg(unix)]
NetworkStreamType::Unix => {}
}
return Some(Cow::Borrowed(auth.as_str()));
}
// TODO(mmastrac): Most requests will use this path and we probably will want to optimize it in the future
if let Some(host) = headers.get(HOST) {
return Some(match host.to_str() {
Ok(host) => Cow::Borrowed(host),
Err(_) => Cow::Owned(
host
.as_bytes()
.iter()
.cloned()
.map(char::from)
.collect::<String>(),
),
});
}
None
}

253
ext/http/response_body.rs Normal file
View file

@ -0,0 +1,253 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Waker;
use deno_core::error::bad_resource;
use deno_core::error::AnyError;
use deno_core::futures::FutureExt;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::WriteOutcome;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
inner: Rc<RefCell<CompletionHandleInner>>,
}
#[derive(Debug, Default)]
struct CompletionHandleInner {
complete: bool,
success: bool,
waker: Option<Waker>,
}
impl CompletionHandle {
pub fn complete(&self, success: bool) {
let mut mut_self = self.inner.borrow_mut();
mut_self.complete = true;
mut_self.success = success;
if let Some(waker) = mut_self.waker.take() {
drop(mut_self);
waker.wake();
}
}
}
impl Future for CompletionHandle {
type Output = bool;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.inner.borrow_mut();
if mut_self.complete {
return std::task::Poll::Ready(mut_self.success);
}
mut_self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
#[derive(Default)]
pub enum ResponseBytesInner {
/// An empty stream.
#[default]
Empty,
/// A completed stream.
Done,
/// A static buffer of bytes, sent it one fell swoop.
Bytes(BufView),
/// A resource stream, piped in fast mode.
Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>),
/// A JS-backed stream, written in JS and transported via pipe.
V8Stream(tokio::sync::mpsc::Receiver<BufView>),
}
impl std::fmt::Debug for ResponseBytesInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Done => f.write_str("Done"),
Self::Empty => f.write_str("Empty"),
Self::Bytes(..) => f.write_str("Bytes"),
Self::Resource(..) => f.write_str("Resource"),
Self::V8Stream(..) => f.write_str("V8Stream"),
}
}
}
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
#[derive(Debug, Default)]
pub struct ResponseBytes(ResponseBytesInner, CompletionHandle);
impl ResponseBytes {
pub fn initialize(&mut self, inner: ResponseBytesInner) {
debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
self.0 = inner;
}
pub fn completion_handle(&self) -> CompletionHandle {
self.1.clone()
}
fn complete(&mut self, success: bool) -> ResponseBytesInner {
if matches!(self.0, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
}
let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
self.1.complete(success);
current
}
}
impl ResponseBytesInner {
pub fn size_hint(&self) -> SizeHint {
match self {
Self::Done => SizeHint::with_exact(0),
Self::Empty => SizeHint::with_exact(0),
Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
Self::Resource(_, res, _) => {
let hint = res.size_hint();
let mut size_hint = SizeHint::new();
size_hint.set_lower(hint.0);
if let Some(upper) = hint.1 {
size_hint.set_upper(upper)
}
size_hint
}
Self::V8Stream(..) => SizeHint::default(),
}
}
}
impl Body for ResponseBytes {
type Data = BufView;
type Error = AnyError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut self.0 {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
if let ResponseBytesInner::Bytes(data) = self.complete(true) {
std::task::Poll::Ready(Some(Ok(Frame::data(data))))
} else {
unreachable!()
}
}
ResponseBytesInner::Resource(auto_close, stm, ref mut future) => {
match future.poll_unpin(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready(Err(err)) => {
std::task::Poll::Ready(Some(Err(err)))
}
std::task::Poll::Ready(Ok(buf)) => {
if buf.is_empty() {
if *auto_close {
stm.clone().close();
}
self.complete(true);
return std::task::Poll::Ready(None);
}
// Re-arm the future
*future = stm.clone().read(64 * 1024);
std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
}
}
}
ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready(Some(buf)) => {
std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
}
std::task::Poll::Ready(None) => {
self.complete(true);
std::task::Poll::Ready(None)
}
},
}
}
fn is_end_stream(&self) -> bool {
matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
self.0.size_hint()
}
}
impl Drop for ResponseBytes {
fn drop(&mut self) {
// We won't actually poll_frame for Empty responses so this is where we return success
self.complete(matches!(self.0, ResponseBytesInner::Empty));
}
}
/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
/// feed's hyper's HTTP response.
pub struct V8StreamHttpResponseBody(
AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
CancelHandle,
);
impl V8StreamHttpResponseBody {
pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
}
}
impl Resource for V8StreamHttpResponseBody {
fn name(&self) -> Cow<str> {
"responseBody".into()
}
fn write(
self: Rc<Self>,
buf: BufView,
) -> AsyncResult<deno_core::WriteOutcome> {
let cancel_handle = RcRef::map(&self, |this| &this.1);
Box::pin(
async move {
let nwritten = buf.len();
let res = RcRef::map(self, |this| &this.0).borrow().await;
if let Some(tx) = res.as_ref() {
tx.send(buf)
.await
.map_err(|_| bad_resource("failed to write"))?;
Ok(WriteOutcome::Full { nwritten })
} else {
Err(bad_resource("failed to write"))
}
}
.try_or_cancel(cancel_handle),
)
}
fn close(self: Rc<Self>) {
self.1.cancel();
}
}

View file

@ -17,6 +17,7 @@ path = "lib.rs"
deno_core.workspace = true
deno_tls.workspace = true
log.workspace = true
pin-project.workspace = true
serde.workspace = true
socket2.workspace = true
tokio.workspace = true

View file

@ -5,6 +5,7 @@ pub mod ops;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
pub mod raw;
pub mod resolve_addr;
use deno_core::error::AnyError;

View file

@ -61,6 +61,7 @@ use std::fs::File;
use std::io;
use std::io::BufReader;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
@ -115,6 +116,13 @@ impl TlsStream {
Self::new(tcp, Connection::Client(tls))
}
pub fn new_client_side_from(
tcp: TcpStream,
connection: ClientConnection,
) -> Self {
Self::new(tcp, Connection::Client(connection))
}
pub fn new_server_side(
tcp: TcpStream,
tls_config: Arc<ServerConfig>,
@ -123,6 +131,13 @@ impl TlsStream {
Self::new(tcp, Connection::Server(tls))
}
pub fn new_server_side_from(
tcp: TcpStream,
connection: ServerConnection,
) -> Self {
Self::new(tcp, Connection::Server(connection))
}
pub fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
@ -132,6 +147,16 @@ impl TlsStream {
(rd, wr)
}
/// Convenience method to match [`TcpStream`].
pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> {
self.0.as_ref().unwrap().tcp.peer_addr()
}
/// Convenience method to match [`TcpStream`].
pub fn local_addr(&self) -> Result<SocketAddr, io::Error> {
self.0.as_ref().unwrap().tcp.local_addr()
}
/// Tokio-rustls compatibility: returns a reference to the underlying TCP
/// stream, and a reference to the Rustls `Connection` object.
pub fn get_ref(&self) -> (&TcpStream, &Connection) {
@ -954,8 +979,8 @@ fn load_private_keys_from_file(
}
pub struct TlsListenerResource {
tcp_listener: AsyncRefCell<TcpListener>,
tls_config: Arc<ServerConfig>,
pub(crate) tcp_listener: AsyncRefCell<TcpListener>,
pub(crate) tls_config: Arc<ServerConfig>,
cancel_handle: CancelHandle,
}

View file

@ -32,8 +32,8 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
})
}
struct UnixListenerResource {
listener: AsyncRefCell<UnixListener>,
pub(crate) struct UnixListenerResource {
pub listener: AsyncRefCell<UnixListener>,
cancel: CancelHandle,
}

304
ext/net/raw.rs Normal file
View file

@ -0,0 +1,304 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::io::TcpStreamResource;
#[cfg(unix)]
use crate::io::UnixStreamResource;
use crate::ops::TcpListenerResource;
use crate::ops_tls::TlsListenerResource;
use crate::ops_tls::TlsStream;
use crate::ops_tls::TlsStreamResource;
#[cfg(unix)]
use crate::ops_unix::UnixListenerResource;
use deno_core::error::bad_resource;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::ResourceId;
use deno_core::ResourceTable;
use deno_tls::rustls::ServerConfig;
use pin_project::pin_project;
use std::rc::Rc;
use std::sync::Arc;
use tokio::net::TcpStream;
#[cfg(unix)]
use tokio::net::UnixStream;
/// A raw stream of one of the types handled by this extension.
#[pin_project(project = NetworkStreamProject)]
pub enum NetworkStream {
Tcp(#[pin] TcpStream),
Tls(#[pin] TlsStream),
#[cfg(unix)]
Unix(#[pin] UnixStream),
}
/// A raw stream of one of the types handled by this extension.
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum NetworkStreamType {
Tcp,
Tls,
#[cfg(unix)]
Unix,
}
impl NetworkStream {
pub fn local_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
match self {
Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.local_addr()?)),
#[cfg(unix)]
Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
}
}
pub fn peer_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
match self {
Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.peer_addr()?)),
Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.peer_addr()?)),
#[cfg(unix)]
Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.peer_addr()?)),
}
}
pub fn stream(&self) -> NetworkStreamType {
match self {
Self::Tcp(_) => NetworkStreamType::Tcp,
Self::Tls(_) => NetworkStreamType::Tls,
#[cfg(unix)]
Self::Unix(_) => NetworkStreamType::Unix,
}
}
}
impl tokio::io::AsyncRead for NetworkStream {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.project() {
NetworkStreamProject::Tcp(s) => s.poll_read(cx, buf),
NetworkStreamProject::Tls(s) => s.poll_read(cx, buf),
#[cfg(unix)]
NetworkStreamProject::Unix(s) => s.poll_read(cx, buf),
}
}
}
impl tokio::io::AsyncWrite for NetworkStream {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
match self.project() {
NetworkStreamProject::Tcp(s) => s.poll_write(cx, buf),
NetworkStreamProject::Tls(s) => s.poll_write(cx, buf),
#[cfg(unix)]
NetworkStreamProject::Unix(s) => s.poll_write(cx, buf),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
match self.project() {
NetworkStreamProject::Tcp(s) => s.poll_flush(cx),
NetworkStreamProject::Tls(s) => s.poll_flush(cx),
#[cfg(unix)]
NetworkStreamProject::Unix(s) => s.poll_flush(cx),
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
match self.project() {
NetworkStreamProject::Tcp(s) => s.poll_shutdown(cx),
NetworkStreamProject::Tls(s) => s.poll_shutdown(cx),
#[cfg(unix)]
NetworkStreamProject::Unix(s) => s.poll_shutdown(cx),
}
}
fn is_write_vectored(&self) -> bool {
match self {
Self::Tcp(s) => s.is_write_vectored(),
Self::Tls(s) => s.is_write_vectored(),
#[cfg(unix)]
Self::Unix(s) => s.is_write_vectored(),
}
}
fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, std::io::Error>> {
match self.project() {
NetworkStreamProject::Tcp(s) => s.poll_write_vectored(cx, bufs),
NetworkStreamProject::Tls(s) => s.poll_write_vectored(cx, bufs),
#[cfg(unix)]
NetworkStreamProject::Unix(s) => s.poll_write_vectored(cx, bufs),
}
}
}
/// A raw stream listener of one of the types handled by this extension.
pub enum NetworkStreamListener {
Tcp(tokio::net::TcpListener),
Tls(tokio::net::TcpListener, Arc<ServerConfig>),
#[cfg(unix)]
Unix(tokio::net::UnixListener),
}
pub enum NetworkStreamAddress {
Ip(std::net::SocketAddr),
#[cfg(unix)]
Unix(tokio::net::unix::SocketAddr),
}
impl NetworkStreamListener {
/// Accepts a connection on this listener.
pub async fn accept(&self) -> Result<NetworkStream, AnyError> {
Ok(match self {
Self::Tcp(tcp) => {
let (stream, _addr) = tcp.accept().await?;
NetworkStream::Tcp(stream)
}
Self::Tls(tcp, config) => {
let (stream, _addr) = tcp.accept().await?;
NetworkStream::Tls(TlsStream::new_server_side(stream, config.clone()))
}
#[cfg(unix)]
Self::Unix(unix) => {
let (stream, _addr) = unix.accept().await?;
NetworkStream::Unix(stream)
}
})
}
pub fn listen_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
match self {
Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
Self::Tls(tcp, _) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
#[cfg(unix)]
Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
}
}
pub fn stream(&self) -> NetworkStreamType {
match self {
Self::Tcp(..) => NetworkStreamType::Tcp,
Self::Tls(..) => NetworkStreamType::Tls,
#[cfg(unix)]
Self::Unix(..) => NetworkStreamType::Unix,
}
}
}
/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
/// This method will extract a stream from the resource table and return it, unwrapped.
pub fn take_network_stream_resource(
resource_table: &mut ResourceTable,
stream_rid: ResourceId,
) -> Result<NetworkStream, AnyError> {
// The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed
// with the process of unwrapping this connection, so we just return a bad resource error.
// See also: https://github.com/denoland/deno/pull/16242
if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid)
{
// This TCP connection might be used somewhere else.
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TCP stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?;
return Ok(NetworkStream::Tcp(tcp_stream));
}
if let Ok(resource_rc) = resource_table.take::<TlsStreamResource>(stream_rid)
{
// This TLS connection might be used somewhere else.
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TLS stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let tls_stream = read_half.reunite(write_half);
return Ok(NetworkStream::Tls(tls_stream));
}
#[cfg(unix)]
if let Ok(resource_rc) = resource_table.take::<UnixStreamResource>(stream_rid)
{
// This UNIX socket might be used somewhere else.
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("UNIX stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let unix_stream = read_half.reunite(write_half)?;
return Ok(NetworkStream::Unix(unix_stream));
}
Err(bad_resource_id())
}
/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection
/// objects on the JS side.
pub fn put_network_stream_resource(
resource_table: &mut ResourceTable,
stream: NetworkStream,
) -> Result<ResourceId, AnyError> {
let res = match stream {
NetworkStream::Tcp(conn) => {
let (r, w) = conn.into_split();
resource_table.add(TcpStreamResource::new((r, w)))
}
NetworkStream::Tls(conn) => {
let (r, w) = conn.into_split();
resource_table.add(TlsStreamResource::new((r, w)))
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
let (r, w) = conn.into_split();
resource_table.add(UnixStreamResource::new((r, w)))
}
};
Ok(res)
}
/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
/// This method will extract a stream from the resource table and return it, unwrapped.
pub fn take_network_stream_listener_resource(
resource_table: &mut ResourceTable,
listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError> {
if let Ok(resource_rc) =
resource_table.take::<TcpListenerResource>(listener_rid)
{
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TCP socket listener is currently in use"))?;
return Ok(NetworkStreamListener::Tcp(resource.listener.into_inner()));
}
if let Ok(resource_rc) =
resource_table.take::<TlsListenerResource>(listener_rid)
{
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TLS socket listener is currently in use"))?;
return Ok(NetworkStreamListener::Tls(
resource.tcp_listener.into_inner(),
resource.tls_config,
));
}
#[cfg(unix)]
if let Ok(resource_rc) =
resource_table.take::<UnixListenerResource>(listener_rid)
{
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("UNIX socket listener is currently in use"))?;
return Ok(NetworkStreamListener::Unix(resource.listener.into_inner()));
}
Err(bad_resource_id())
}

View file

@ -14,11 +14,13 @@ description = "Implementation of WebSocket API for Deno"
path = "lib.rs"
[dependencies]
bytes.workspace = true
deno_core.workspace = true
deno_net.workspace = true
deno_tls.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
http.workspace = true
hyper.workspace = true
hyper = { workspace = true, features = ["backports"] }
serde.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true

View file

@ -1,11 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::stream::WebSocketStream;
use bytes::Bytes;
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::StringOrBuffer;
use deno_core::url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
@ -15,7 +14,10 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use http::header::CONNECTION;
use http::header::UPGRADE;
@ -24,9 +26,7 @@ use http::HeaderValue;
use http::Method;
use http::Request;
use http::Uri;
use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
@ -52,6 +52,7 @@ use fastwebsockets::Role;
use fastwebsockets::WebSocket;
pub use tokio_tungstenite; // Re-export tokio_tungstenite
mod stream;
#[derive(Clone)]
pub struct WsRootStore(pub Option<RootCertStore>);
@ -243,17 +244,21 @@ where
let client =
fastwebsockets::handshake::client(&LocalExecutor, request, socket);
let (stream, response): (WebSocket<Upgraded>, Response<Body>) =
if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await?
} else {
client.await
}
.map_err(|err| {
DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {err}"
))
})?;
let (upgraded, response) = if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await?
} else {
client.await
}
.map_err(|err| {
DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {err}"
))
})?;
let inner = MaybeTlsStream::Plain(upgraded.into_inner());
let stream =
WebSocketStream::new(stream::WsStreamKind::Tungstenite(inner), None);
let stream = WebSocket::after_handshake(stream, Role::Client);
if let Some(cancel_rid) = cancel_handle {
state.borrow_mut().resource_table.close(cancel_rid).ok();
@ -294,7 +299,7 @@ pub enum MessageKind {
}
pub struct ServerWebSocket {
ws: AsyncRefCell<FragmentCollector<Upgraded>>,
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
closed: Rc<Cell<bool>>,
}
@ -320,11 +325,19 @@ impl Resource for ServerWebSocket {
"serverWebSocket".into()
}
}
pub async fn ws_create_server_stream(
state: &Rc<RefCell<OpState>>,
transport: Upgraded,
pub fn ws_create_server_stream(
state: &mut OpState,
transport: NetworkStream,
read_buf: Bytes,
) -> Result<ResourceId, AnyError> {
let mut ws = WebSocket::after_handshake(transport, Role::Server);
let mut ws = WebSocket::after_handshake(
WebSocketStream::new(
stream::WsStreamKind::Network(transport),
Some(read_buf),
),
Role::Server,
);
ws.set_writev(true);
ws.set_auto_close(true);
ws.set_auto_pong(true);
@ -334,11 +347,26 @@ pub async fn ws_create_server_stream(
closed: Rc::new(Cell::new(false)),
};
let resource_table = &mut state.borrow_mut().resource_table;
let rid = resource_table.add(ws_resource);
let rid = state.resource_table.add(ws_resource);
Ok(rid)
}
#[op]
pub fn op_ws_server_create(
state: &mut OpState,
conn: ResourceId,
extra_bytes: &[u8],
) -> Result<ResourceId, AnyError> {
let network_stream =
take_network_stream_resource(&mut state.resource_table, conn)?;
// Copying the extra bytes, but unlikely this will account for much
ws_create_server_stream(
state,
network_stream,
Bytes::from(extra_bytes.to_vec()),
)
}
#[op]
pub async fn op_ws_send_binary(
state: Rc<RefCell<OpState>>,
@ -490,6 +518,7 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
op_ws_server_create,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {

115
ext/websocket/stream.rs Normal file
View file

@ -0,0 +1,115 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use bytes::Buf;
use bytes::Bytes;
use deno_net::raw::NetworkStream;
use hyper::upgrade::Upgraded;
use std::pin::Pin;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
use tokio_tungstenite::MaybeTlsStream;
// TODO(bartlomieju): remove this
pub(crate) enum WsStreamKind {
Tungstenite(MaybeTlsStream<Upgraded>),
Network(NetworkStream),
}
pub(crate) struct WebSocketStream {
stream: WsStreamKind,
pre: Option<Bytes>,
}
impl WebSocketStream {
pub fn new(stream: WsStreamKind, buffer: Option<Bytes>) -> Self {
Self {
stream,
pre: buffer,
}
}
}
impl AsyncRead for WebSocketStream {
// From hyper's Rewind (https://github.com/hyperium/hyper), MIT License, Copyright (c) Sean McArthur
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if let Some(mut prefix) = self.pre.take() {
// If there are no remaining bytes, let the bytes get dropped.
if !prefix.is_empty() {
let copy_len = std::cmp::min(prefix.len(), buf.remaining());
// TODO: There should be a way to do following two lines cleaner...
buf.put_slice(&prefix[..copy_len]);
prefix.advance(copy_len);
// Put back what's left
if !prefix.is_empty() {
self.pre = Some(prefix);
}
return Poll::Ready(Ok(()));
}
}
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_read(cx, buf),
WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for WebSocketStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_write(cx, buf),
WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_flush(cx),
WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
match &mut self.stream {
WsStreamKind::Network(stream) => Pin::new(stream).poll_shutdown(cx),
WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_shutdown(cx),
}
}
fn is_write_vectored(&self) -> bool {
match &self.stream {
WsStreamKind::Network(stream) => stream.is_write_vectored(),
WsStreamKind::Tungstenite(stream) => stream.is_write_vectored(),
}
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, std::io::Error>> {
match &mut self.stream {
WsStreamKind::Network(stream) => {
Pin::new(stream).poll_write_vectored(cx, bufs)
}
WsStreamKind::Tungstenite(stream) => {
Pin::new(stream).poll_write_vectored(cx, bufs)
}
}
}
}