mirror of
https://github.com/denoland/deno.git
synced 2025-01-13 01:22:20 -05:00
feat: native HTTP bindings (#9935)
Co-authered-by: Luca Casonato <lucacasonato@yahoo.com> Co-authered-by: Ben Noordhuis <info@bnoordhuis.nl> Co-authered-by: Ryan Dahl <ry@tinyclouds.org>
This commit is contained in:
parent
b30ac9c5cf
commit
70af812876
15 changed files with 1020 additions and 5 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -653,6 +653,7 @@ name = "deno_runtime"
|
||||||
version = "0.10.1"
|
version = "0.10.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atty",
|
"atty",
|
||||||
|
"bytes",
|
||||||
"deno_console",
|
"deno_console",
|
||||||
"deno_core",
|
"deno_core",
|
||||||
"deno_crypto",
|
"deno_crypto",
|
||||||
|
@ -684,6 +685,7 @@ dependencies = [
|
||||||
"test_util",
|
"test_util",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
|
"tokio-util",
|
||||||
"trust-dns-proto",
|
"trust-dns-proto",
|
||||||
"trust-dns-resolver",
|
"trust-dns-resolver",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
|
17
cli/bench/deno_http_native.js
Normal file
17
cli/bench/deno_http_native.js
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
const addr = Deno.args[0] || "127.0.0.1:4500";
|
||||||
|
const [hostname, port] = addr.split(":");
|
||||||
|
const listener = Deno.listen({ hostname, port: Number(port) });
|
||||||
|
console.log("Server listening on", addr);
|
||||||
|
|
||||||
|
const body = Deno.core.encode("Hello World");
|
||||||
|
|
||||||
|
for await (const conn of listener) {
|
||||||
|
(async () => {
|
||||||
|
const requests = Deno.startHttp(conn);
|
||||||
|
for await (const { respondWith } of requests) {
|
||||||
|
respondWith(new Response(body));
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
}
|
|
@ -33,6 +33,7 @@ pub(crate) fn benchmark(
|
||||||
res.insert("deno_tcp".to_string(), deno_tcp(deno_exe)?);
|
res.insert("deno_tcp".to_string(), deno_tcp(deno_exe)?);
|
||||||
// res.insert("deno_udp".to_string(), deno_udp(deno_exe)?);
|
// res.insert("deno_udp".to_string(), deno_udp(deno_exe)?);
|
||||||
res.insert("deno_http".to_string(), deno_http(deno_exe)?);
|
res.insert("deno_http".to_string(), deno_http(deno_exe)?);
|
||||||
|
res.insert("deno_http_native".to_string(), deno_http_native(deno_exe)?);
|
||||||
// TODO(ry) deno_proxy disabled to make fetch() standards compliant.
|
// TODO(ry) deno_proxy disabled to make fetch() standards compliant.
|
||||||
// res.insert("deno_proxy".to_string(), deno_http_proxy(deno_exe) hyper_hello_exe))
|
// res.insert("deno_proxy".to_string(), deno_http_proxy(deno_exe) hyper_hello_exe))
|
||||||
res.insert(
|
res.insert(
|
||||||
|
@ -200,6 +201,25 @@ fn deno_http(deno_exe: &str) -> Result<HttpBenchmarkResult> {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn deno_http_native(deno_exe: &str) -> Result<HttpBenchmarkResult> {
|
||||||
|
let port = get_port();
|
||||||
|
println!("http_benchmark testing DENO using native bindings.");
|
||||||
|
run(
|
||||||
|
&[
|
||||||
|
deno_exe,
|
||||||
|
"run",
|
||||||
|
"--allow-net",
|
||||||
|
"--reload",
|
||||||
|
"--unstable",
|
||||||
|
"cli/bench/deno_http_native.js",
|
||||||
|
&server_addr(port),
|
||||||
|
],
|
||||||
|
port,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn deno_http_proxy(
|
fn deno_http_proxy(
|
||||||
deno_exe: &str,
|
deno_exe: &str,
|
||||||
|
|
27
cli/dts/lib.deno.unstable.d.ts
vendored
27
cli/dts/lib.deno.unstable.d.ts
vendored
|
@ -1196,6 +1196,33 @@ declare namespace Deno {
|
||||||
bytesSentData: number;
|
bytesSentData: number;
|
||||||
bytesReceived: number;
|
bytesReceived: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface RequestEvent {
|
||||||
|
readonly request: Request;
|
||||||
|
respondWith(r: Response | Promise<Response>): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface HttpConn extends AsyncIterable<RequestEvent> {
|
||||||
|
readonly rid: number;
|
||||||
|
|
||||||
|
nextRequest(): Promise<RequestEvent | null>;
|
||||||
|
close(): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** **UNSTABLE**: new API, yet to be vetted.
|
||||||
|
*
|
||||||
|
* Parse HTTP requests from the given connection
|
||||||
|
*
|
||||||
|
* ```ts
|
||||||
|
* const httpConn = await Deno.startHttp(conn);
|
||||||
|
* const { request, respondWith } = await httpConn.next();
|
||||||
|
* respondWith(new Response("Hello World"));
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* If `httpConn.next()` encounters an error or returns `done == true` then
|
||||||
|
* the underlying HttpConn resource is closed automatically.
|
||||||
|
*/
|
||||||
|
export function startHttp(conn: Conn): HttpConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
declare function fetch(
|
declare function fetch(
|
||||||
|
|
203
cli/tests/unit/http_test.ts
Normal file
203
cli/tests/unit/http_test.ts
Normal file
|
@ -0,0 +1,203 @@
|
||||||
|
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||||
|
import {
|
||||||
|
assert,
|
||||||
|
assertEquals,
|
||||||
|
assertThrowsAsync,
|
||||||
|
unitTest,
|
||||||
|
} from "./test_util.ts";
|
||||||
|
import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts";
|
||||||
|
import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
|
||||||
|
|
||||||
|
unitTest({ perms: { net: true } }, async function httpServerBasic() {
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
for await (const conn of listener) {
|
||||||
|
const httpConn = Deno.startHttp(conn);
|
||||||
|
for await (const { request, respondWith } of httpConn) {
|
||||||
|
assertEquals(await request.text(), "");
|
||||||
|
respondWith(new Response("Hello World"));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
const resp = await fetch("http://127.0.0.1:4501/", {
|
||||||
|
headers: { "connection": "close" },
|
||||||
|
});
|
||||||
|
const text = await resp.text();
|
||||||
|
assertEquals(text, "Hello World");
|
||||||
|
await promise;
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(
|
||||||
|
{ perms: { net: true } },
|
||||||
|
async function httpServerStreamResponse() {
|
||||||
|
const stream = new TransformStream();
|
||||||
|
const writer = stream.writable.getWriter();
|
||||||
|
writer.write(new TextEncoder().encode("hello "));
|
||||||
|
writer.write(new TextEncoder().encode("world"));
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
const conn = await listener.accept();
|
||||||
|
const httpConn = Deno.startHttp(conn);
|
||||||
|
const evt = await httpConn.nextRequest();
|
||||||
|
assert(evt);
|
||||||
|
const { request, respondWith } = evt;
|
||||||
|
assert(!request.body);
|
||||||
|
await respondWith(new Response(stream.readable));
|
||||||
|
httpConn.close();
|
||||||
|
listener.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
const resp = await fetch("http://127.0.0.1:4501/");
|
||||||
|
const respBody = await resp.text();
|
||||||
|
assertEquals("hello world", respBody);
|
||||||
|
await promise;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
unitTest(
|
||||||
|
{ perms: { net: true } },
|
||||||
|
async function httpServerStreamRequest() {
|
||||||
|
const stream = new TransformStream();
|
||||||
|
const writer = stream.writable.getWriter();
|
||||||
|
writer.write(new TextEncoder().encode("hello "));
|
||||||
|
writer.write(new TextEncoder().encode("world"));
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
const conn = await listener.accept();
|
||||||
|
const httpConn = Deno.startHttp(conn);
|
||||||
|
const evt = await httpConn.nextRequest();
|
||||||
|
assert(evt);
|
||||||
|
const { request, respondWith } = evt;
|
||||||
|
const reqBody = await request.text();
|
||||||
|
assertEquals("hello world", reqBody);
|
||||||
|
await respondWith(new Response(""));
|
||||||
|
|
||||||
|
// TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending
|
||||||
|
// request for url (https://localhost:4501/): connection closed before
|
||||||
|
// message completed".
|
||||||
|
assertEquals(await httpConn.nextRequest(), null);
|
||||||
|
|
||||||
|
listener.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
const resp = await fetch("http://127.0.0.1:4501/", {
|
||||||
|
body: stream.readable,
|
||||||
|
method: "POST",
|
||||||
|
headers: { "connection": "close" },
|
||||||
|
});
|
||||||
|
|
||||||
|
await resp.arrayBuffer();
|
||||||
|
await promise;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
unitTest({ perms: { net: true } }, async function httpServerStreamDuplex() {
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
const conn = await listener.accept();
|
||||||
|
const httpConn = Deno.startHttp(conn);
|
||||||
|
const evt = await httpConn.nextRequest();
|
||||||
|
assert(evt);
|
||||||
|
const { request, respondWith } = evt;
|
||||||
|
assert(request.body);
|
||||||
|
await respondWith(new Response(request.body));
|
||||||
|
httpConn.close();
|
||||||
|
listener.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
const ts = new TransformStream();
|
||||||
|
const writable = ts.writable.getWriter();
|
||||||
|
const resp = await fetch("http://127.0.0.1:4501/", {
|
||||||
|
method: "POST",
|
||||||
|
body: ts.readable,
|
||||||
|
});
|
||||||
|
assert(resp.body);
|
||||||
|
const reader = resp.body.getReader();
|
||||||
|
await writable.write(new Uint8Array([1]));
|
||||||
|
const chunk1 = await reader.read();
|
||||||
|
assert(!chunk1.done);
|
||||||
|
assertEquals(chunk1.value, new Uint8Array([1]));
|
||||||
|
await writable.write(new Uint8Array([2]));
|
||||||
|
const chunk2 = await reader.read();
|
||||||
|
assert(!chunk2.done);
|
||||||
|
assertEquals(chunk2.value, new Uint8Array([2]));
|
||||||
|
await writable.close();
|
||||||
|
const chunk3 = await reader.read();
|
||||||
|
assert(chunk3.done);
|
||||||
|
await promise;
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest({ perms: { net: true } }, async function httpServerClose() {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
const client = await Deno.connect({ port: 4501 });
|
||||||
|
const httpConn = Deno.startHttp(await listener.accept());
|
||||||
|
client.close();
|
||||||
|
const evt = await httpConn.nextRequest();
|
||||||
|
assertEquals(evt, null);
|
||||||
|
// Note httpConn is automatically closed when "done" is reached.
|
||||||
|
listener.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest({ perms: { net: true } }, async function httpServerInvalidMethod() {
|
||||||
|
const listener = Deno.listen({ port: 4501 });
|
||||||
|
const client = await Deno.connect({ port: 4501 });
|
||||||
|
const httpConn = Deno.startHttp(await listener.accept());
|
||||||
|
await client.write(new Uint8Array([1, 2, 3]));
|
||||||
|
await assertThrowsAsync(
|
||||||
|
async () => {
|
||||||
|
await httpConn.nextRequest();
|
||||||
|
},
|
||||||
|
Deno.errors.Http,
|
||||||
|
"invalid HTTP method parsed",
|
||||||
|
);
|
||||||
|
// Note httpConn is automatically closed when it errors.
|
||||||
|
client.close();
|
||||||
|
listener.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(
|
||||||
|
{ perms: { read: true, net: true } },
|
||||||
|
async function httpServerWithTls(): Promise<void> {
|
||||||
|
const hostname = "localhost";
|
||||||
|
const port = 4501;
|
||||||
|
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listenTls({
|
||||||
|
hostname,
|
||||||
|
port,
|
||||||
|
certFile: "cli/tests/tls/localhost.crt",
|
||||||
|
keyFile: "cli/tests/tls/localhost.key",
|
||||||
|
});
|
||||||
|
const conn = await listener.accept();
|
||||||
|
const httpConn = Deno.startHttp(conn);
|
||||||
|
const evt = await httpConn.nextRequest();
|
||||||
|
assert(evt);
|
||||||
|
const { request, respondWith } = evt;
|
||||||
|
await respondWith(new Response("Hello World"));
|
||||||
|
|
||||||
|
// TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending
|
||||||
|
// request for url (https://localhost:4501/): connection closed before
|
||||||
|
// message completed".
|
||||||
|
assertEquals(await httpConn.nextRequest(), null);
|
||||||
|
|
||||||
|
listener.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
const caData = Deno.readTextFileSync("cli/tests/tls/RootCA.pem");
|
||||||
|
const client = Deno.createHttpClient({ caData });
|
||||||
|
const resp = await fetch(`https://${hostname}:${port}/`, {
|
||||||
|
client,
|
||||||
|
headers: { "connection": "close" },
|
||||||
|
});
|
||||||
|
const respBody = await resp.text();
|
||||||
|
assertEquals("Hello World", respBody);
|
||||||
|
await promise;
|
||||||
|
client.close();
|
||||||
|
},
|
||||||
|
);
|
|
@ -30,6 +30,7 @@ import "./fs_events_test.ts";
|
||||||
import "./get_random_values_test.ts";
|
import "./get_random_values_test.ts";
|
||||||
import "./globals_test.ts";
|
import "./globals_test.ts";
|
||||||
import "./headers_test.ts";
|
import "./headers_test.ts";
|
||||||
|
import "./http_test.ts";
|
||||||
import "./internals_test.ts";
|
import "./internals_test.ts";
|
||||||
import "./io_test.ts";
|
import "./io_test.ts";
|
||||||
import "./link_test.ts";
|
import "./link_test.ts";
|
||||||
|
|
|
@ -20,5 +20,6 @@ where
|
||||||
F: std::future::Future<Output = R>,
|
F: std::future::Future<Output = R>,
|
||||||
{
|
{
|
||||||
let rt = create_basic_runtime();
|
let rt = create_basic_runtime();
|
||||||
rt.block_on(future)
|
let local = tokio::task::LocalSet::new();
|
||||||
|
local.block_on(&rt, future)
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,11 +46,12 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.7.1" }
|
||||||
deno_webgpu = { path = "../op_crates/webgpu", version = "0.3.1" }
|
deno_webgpu = { path = "../op_crates/webgpu", version = "0.3.1" }
|
||||||
|
|
||||||
atty = "0.2.14"
|
atty = "0.2.14"
|
||||||
|
bytes = "1"
|
||||||
dlopen = "0.1.8"
|
dlopen = "0.1.8"
|
||||||
encoding_rs = "0.8.28"
|
encoding_rs = "0.8.28"
|
||||||
filetime = "0.2.14"
|
filetime = "0.2.14"
|
||||||
http = "0.2.3"
|
http = "0.2.3"
|
||||||
hyper = { version = "0.14.5", features = ["server"] }
|
hyper = { version = "0.14.5", features = ["server", "stream", "http1", "http2", "runtime"] }
|
||||||
indexmap = "1.6.2"
|
indexmap = "1.6.2"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
libc = "0.2.93"
|
libc = "0.2.93"
|
||||||
|
@ -63,6 +64,7 @@ serde = { version = "1.0.125", features = ["derive"] }
|
||||||
sys-info = "0.8.0"
|
sys-info = "0.8.0"
|
||||||
termcolor = "1.1.2"
|
termcolor = "1.1.2"
|
||||||
tokio = { version = "1.4.0", features = ["full"] }
|
tokio = { version = "1.4.0", features = ["full"] }
|
||||||
|
tokio-util = { version = "0.6", features = ["io"] }
|
||||||
tokio-rustls = "0.22.0"
|
tokio-rustls = "0.22.0"
|
||||||
uuid = { version = "0.8.2", features = ["v4"] }
|
uuid = { version = "0.8.2", features = ["v4"] }
|
||||||
webpki = "0.21.4"
|
webpki = "0.21.4"
|
||||||
|
|
|
@ -131,6 +131,10 @@ fn get_url_parse_error_class(_error: &url::ParseError) -> &'static str {
|
||||||
"URIError"
|
"URIError"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_hyper_error_class(_error: &hyper::Error) -> &'static str {
|
||||||
|
"Http"
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
fn get_nix_error_class(error: &nix::Error) -> &'static str {
|
fn get_nix_error_class(error: &nix::Error) -> &'static str {
|
||||||
use nix::errno::Errno::*;
|
use nix::errno::Errno::*;
|
||||||
|
@ -156,6 +160,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
|
||||||
e.downcast_ref::<dlopen::Error>()
|
e.downcast_ref::<dlopen::Error>()
|
||||||
.map(get_dlopen_error_class)
|
.map(get_dlopen_error_class)
|
||||||
})
|
})
|
||||||
|
.or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class))
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
e.downcast_ref::<deno_core::Canceled>().map(|e| {
|
e.downcast_ref::<deno_core::Canceled>().map(|e| {
|
||||||
let io_err: io::Error = e.to_owned().into();
|
let io_err: io::Error = e.to_owned().into();
|
||||||
|
|
210
runtime/js/40_http.js
Normal file
210
runtime/js/40_http.js
Normal file
|
@ -0,0 +1,210 @@
|
||||||
|
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||||
|
"use strict";
|
||||||
|
|
||||||
|
((window) => {
|
||||||
|
const { Request, dontValidateUrl, fastBody, Response } =
|
||||||
|
window.__bootstrap.fetch;
|
||||||
|
const { Headers } = window.__bootstrap.headers;
|
||||||
|
const errors = window.__bootstrap.errors.errors;
|
||||||
|
const core = window.Deno.core;
|
||||||
|
const { ReadableStream } = window.__bootstrap.streams;
|
||||||
|
|
||||||
|
function flatEntries(obj) {
|
||||||
|
const entries = [];
|
||||||
|
for (const key in obj) {
|
||||||
|
entries.push(key);
|
||||||
|
entries.push(obj[key]);
|
||||||
|
}
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
|
|
||||||
|
function startHttp(conn) {
|
||||||
|
const rid = Deno.core.jsonOpSync("op_http_start", conn.rid);
|
||||||
|
return new HttpConn(rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
class HttpConn {
|
||||||
|
#rid = 0;
|
||||||
|
|
||||||
|
constructor(rid) {
|
||||||
|
this.#rid = rid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @returns {number} */
|
||||||
|
get rid() {
|
||||||
|
return this.#rid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @returns {Promise<ResponseEvent | null>} */
|
||||||
|
async nextRequest() {
|
||||||
|
let nextRequest;
|
||||||
|
try {
|
||||||
|
nextRequest = await Deno.core.jsonOpAsync(
|
||||||
|
"op_http_request_next",
|
||||||
|
this.#rid,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof errors.BadResource) {
|
||||||
|
return null;
|
||||||
|
} else if (error instanceof errors.Interrupted) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
if (nextRequest === null) return null;
|
||||||
|
|
||||||
|
const [
|
||||||
|
requestBodyRid,
|
||||||
|
responseSenderRid,
|
||||||
|
method,
|
||||||
|
headersList,
|
||||||
|
url,
|
||||||
|
] = nextRequest;
|
||||||
|
|
||||||
|
/** @type {ReadableStream<Uint8Array> | undefined} */
|
||||||
|
let body = undefined;
|
||||||
|
if (typeof requestBodyRid === "number") {
|
||||||
|
body = createRequestBodyStream(requestBodyRid);
|
||||||
|
}
|
||||||
|
|
||||||
|
const request = new Request(url, {
|
||||||
|
body,
|
||||||
|
method,
|
||||||
|
headers: new Headers(headersList),
|
||||||
|
[dontValidateUrl]: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const respondWith = createRespondWith(responseSenderRid, this.#rid);
|
||||||
|
|
||||||
|
return { request, respondWith };
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @returns {void} */
|
||||||
|
close() {
|
||||||
|
core.close(this.#rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Symbol.asyncIterator]() {
|
||||||
|
const httpConn = this;
|
||||||
|
return {
|
||||||
|
async next() {
|
||||||
|
const reqEvt = await httpConn.nextRequest();
|
||||||
|
if (reqEvt === null) return { value: undefined, done: true };
|
||||||
|
return { value: reqEvt, done: false };
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function readRequest(requestRid, zeroCopyBuf) {
|
||||||
|
return Deno.core.jsonOpAsync(
|
||||||
|
"op_http_request_read",
|
||||||
|
requestRid,
|
||||||
|
zeroCopyBuf,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function respond(responseSenderRid, resp, zeroCopyBuf) {
|
||||||
|
return Deno.core.jsonOpSync("op_http_response", [
|
||||||
|
responseSenderRid,
|
||||||
|
resp.status ?? 200,
|
||||||
|
flatEntries(resp.headers ?? {}),
|
||||||
|
], zeroCopyBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
function createRespondWith(responseSenderRid, connRid) {
|
||||||
|
return async function (resp) {
|
||||||
|
if (resp instanceof Promise) {
|
||||||
|
resp = await resp;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(resp instanceof Response)) {
|
||||||
|
throw new TypeError(
|
||||||
|
"First argument to respondWith must be a Response or a promise resolving to a Response.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// If response body is Uint8Array it will be sent synchronously
|
||||||
|
// in a single op, in other case a "response body" resource will be
|
||||||
|
// created and we'll be streaming it.
|
||||||
|
const body = resp[fastBody]();
|
||||||
|
let zeroCopyBuf;
|
||||||
|
if (body instanceof ArrayBuffer) {
|
||||||
|
zeroCopyBuf = new Uint8Array(body);
|
||||||
|
} else if (!body) {
|
||||||
|
zeroCopyBuf = new Uint8Array(0);
|
||||||
|
} else {
|
||||||
|
zeroCopyBuf = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseBodyRid = respond(
|
||||||
|
responseSenderRid,
|
||||||
|
resp,
|
||||||
|
zeroCopyBuf,
|
||||||
|
);
|
||||||
|
|
||||||
|
// If `respond` returns a responseBodyRid, we should stream the body
|
||||||
|
// to that resource.
|
||||||
|
if (typeof responseBodyRid === "number") {
|
||||||
|
if (!body || !(body instanceof ReadableStream)) {
|
||||||
|
throw new Error(
|
||||||
|
"internal error: recieved responseBodyRid, but response has no body or is not a stream",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
for await (const chunk of body) {
|
||||||
|
const data = new Uint8Array(
|
||||||
|
chunk.buffer,
|
||||||
|
chunk.byteOffset,
|
||||||
|
chunk.byteLength,
|
||||||
|
);
|
||||||
|
await Deno.core.jsonOpAsync(
|
||||||
|
"op_http_response_write",
|
||||||
|
responseBodyRid,
|
||||||
|
data,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once all chunks are sent, and the request body is closed, we can close
|
||||||
|
// the response body.
|
||||||
|
await Deno.core.jsonOpAsync("op_http_response_close", responseBodyRid);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createRequestBodyStream(requestBodyRid) {
|
||||||
|
return new ReadableStream({
|
||||||
|
type: "bytes",
|
||||||
|
async pull(controller) {
|
||||||
|
try {
|
||||||
|
// This is the largest possible size for a single packet on a TLS
|
||||||
|
// stream.
|
||||||
|
const chunk = new Uint8Array(16 * 1024 + 256);
|
||||||
|
const read = await readRequest(
|
||||||
|
requestBodyRid,
|
||||||
|
chunk,
|
||||||
|
);
|
||||||
|
if (read > 0) {
|
||||||
|
// We read some data. Enqueue it onto the stream.
|
||||||
|
controller.enqueue(chunk.subarray(0, read));
|
||||||
|
} else {
|
||||||
|
// We have reached the end of the body, so we close the stream.
|
||||||
|
controller.close();
|
||||||
|
core.close(requestBodyRid);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// There was an error while reading a chunk of the body, so we
|
||||||
|
// error.
|
||||||
|
controller.error(err);
|
||||||
|
controller.close();
|
||||||
|
core.close(requestBodyRid);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
cancel() {
|
||||||
|
core.close(requestBodyRid);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
window.__bootstrap.http = {
|
||||||
|
startHttp,
|
||||||
|
};
|
||||||
|
})(this);
|
|
@ -120,6 +120,7 @@
|
||||||
listen: __bootstrap.netUnstable.listen,
|
listen: __bootstrap.netUnstable.listen,
|
||||||
connect: __bootstrap.netUnstable.connect,
|
connect: __bootstrap.netUnstable.connect,
|
||||||
listenDatagram: __bootstrap.netUnstable.listenDatagram,
|
listenDatagram: __bootstrap.netUnstable.listenDatagram,
|
||||||
|
startHttp: __bootstrap.http.startHttp,
|
||||||
startTls: __bootstrap.tls.startTls,
|
startTls: __bootstrap.tls.startTls,
|
||||||
fstatSync: __bootstrap.fs.fstatSync,
|
fstatSync: __bootstrap.fs.fstatSync,
|
||||||
fstat: __bootstrap.fs.fstat,
|
fstat: __bootstrap.fs.fstat,
|
||||||
|
@ -132,5 +133,6 @@
|
||||||
utimeSync: __bootstrap.fs.utimeSync,
|
utimeSync: __bootstrap.fs.utimeSync,
|
||||||
HttpClient: __bootstrap.fetch.HttpClient,
|
HttpClient: __bootstrap.fetch.HttpClient,
|
||||||
createHttpClient: __bootstrap.fetch.createHttpClient,
|
createHttpClient: __bootstrap.fetch.createHttpClient,
|
||||||
|
http: __bootstrap.http,
|
||||||
};
|
};
|
||||||
})(this);
|
})(this);
|
||||||
|
|
523
runtime/ops/http.rs
Normal file
523
runtime/ops/http.rs
Normal file
|
@ -0,0 +1,523 @@
|
||||||
|
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use crate::ops::io::TcpStreamResource;
|
||||||
|
use crate::ops::io::TlsServerStreamResource;
|
||||||
|
use deno_core::error::bad_resource_id;
|
||||||
|
use deno_core::error::null_opbuf;
|
||||||
|
use deno_core::error::type_error;
|
||||||
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::futures::future::poll_fn;
|
||||||
|
use deno_core::futures::FutureExt;
|
||||||
|
use deno_core::futures::Stream;
|
||||||
|
use deno_core::futures::StreamExt;
|
||||||
|
use deno_core::AsyncRefCell;
|
||||||
|
use deno_core::CancelFuture;
|
||||||
|
use deno_core::CancelHandle;
|
||||||
|
use deno_core::CancelTryFuture;
|
||||||
|
use deno_core::OpState;
|
||||||
|
use deno_core::RcRef;
|
||||||
|
use deno_core::Resource;
|
||||||
|
use deno_core::ResourceId;
|
||||||
|
use deno_core::ZeroCopyBuf;
|
||||||
|
use hyper::body::HttpBody;
|
||||||
|
use hyper::http;
|
||||||
|
use hyper::server::conn::Connection;
|
||||||
|
use hyper::server::conn::Http;
|
||||||
|
use hyper::service::Service as HyperService;
|
||||||
|
use hyper::Body;
|
||||||
|
use hyper::Request;
|
||||||
|
use hyper::Response;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::borrow::Cow;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::task::Context;
|
||||||
|
use std::task::Poll;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tokio_rustls::server::TlsStream;
|
||||||
|
use tokio_util::io::StreamReader;
|
||||||
|
|
||||||
|
pub fn init(rt: &mut deno_core::JsRuntime) {
|
||||||
|
super::reg_json_sync(rt, "op_http_start", op_http_start);
|
||||||
|
|
||||||
|
super::reg_json_async(rt, "op_http_request_next", op_http_request_next);
|
||||||
|
super::reg_json_async(rt, "op_http_request_read", op_http_request_read);
|
||||||
|
|
||||||
|
super::reg_json_sync(rt, "op_http_response", op_http_response);
|
||||||
|
super::reg_json_async(rt, "op_http_response_write", op_http_response_write);
|
||||||
|
super::reg_json_async(rt, "op_http_response_close", op_http_response_close);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ServiceInner {
|
||||||
|
request: Request<Body>,
|
||||||
|
response_tx: oneshot::Sender<Response<Body>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
struct Service {
|
||||||
|
inner: Rc<RefCell<Option<ServiceInner>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HyperService<Request<Body>> for Service {
|
||||||
|
type Response = Response<Body>;
|
||||||
|
type Error = http::Error;
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
type Future =
|
||||||
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
|
||||||
|
|
||||||
|
fn poll_ready(
|
||||||
|
&mut self,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), Self::Error>> {
|
||||||
|
if self.inner.borrow().is_some() {
|
||||||
|
Poll::Pending
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: Request<Body>) -> Self::Future {
|
||||||
|
let (resp_tx, resp_rx) = oneshot::channel();
|
||||||
|
self.inner.borrow_mut().replace(ServiceInner {
|
||||||
|
request: req,
|
||||||
|
response_tx: resp_tx,
|
||||||
|
});
|
||||||
|
|
||||||
|
async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ConnType {
|
||||||
|
Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
|
||||||
|
Tls(Rc<RefCell<Connection<TlsStream<TcpStream>, Service, LocalExecutor>>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ConnResource {
|
||||||
|
hyper_connection: ConnType,
|
||||||
|
deno_service: Service,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnResource {
|
||||||
|
// TODO(ry) impl Future for ConnResource?
|
||||||
|
fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
|
||||||
|
match &self.hyper_connection {
|
||||||
|
ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
|
||||||
|
ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
|
||||||
|
}
|
||||||
|
.map_err(AnyError::from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for ConnResource {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"httpConnection".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use a tuple instead of struct to avoid serialization overhead of the keys.
|
||||||
|
#[derive(Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct NextRequestResponse(
|
||||||
|
// request_body_rid:
|
||||||
|
Option<ResourceId>,
|
||||||
|
// response_sender_rid:
|
||||||
|
ResourceId,
|
||||||
|
// method:
|
||||||
|
String,
|
||||||
|
// headers:
|
||||||
|
Vec<(String, String)>,
|
||||||
|
// url:
|
||||||
|
String,
|
||||||
|
);
|
||||||
|
|
||||||
|
async fn op_http_request_next(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
conn_rid: ResourceId,
|
||||||
|
_data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<Option<NextRequestResponse>, AnyError> {
|
||||||
|
let conn_resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<ConnResource>(conn_rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
poll_fn(|cx| {
|
||||||
|
let connection_closed = match conn_resource.poll(cx) {
|
||||||
|
Poll::Pending => false,
|
||||||
|
Poll::Ready(Ok(())) => {
|
||||||
|
// close ConnResource
|
||||||
|
state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.take::<ConnResource>(conn_rid)
|
||||||
|
.unwrap();
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => {
|
||||||
|
// TODO(ry) close RequestResource associated with connection
|
||||||
|
// TODO(ry) close ResponseBodyResource associated with connection
|
||||||
|
// close ConnResource
|
||||||
|
state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.take::<ConnResource>(conn_rid)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if should_ignore_error(&e) {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
return Poll::Ready(Err(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(request_resource) =
|
||||||
|
conn_resource.deno_service.inner.borrow_mut().take()
|
||||||
|
{
|
||||||
|
let tx = request_resource.response_tx;
|
||||||
|
let req = request_resource.request;
|
||||||
|
let method = req.method().to_string();
|
||||||
|
|
||||||
|
let mut headers = Vec::with_capacity(req.headers().len());
|
||||||
|
for (name, value) in req.headers().iter() {
|
||||||
|
let name = name.to_string();
|
||||||
|
let value = value.to_str().unwrap_or("").to_string();
|
||||||
|
headers.push((name, value));
|
||||||
|
}
|
||||||
|
|
||||||
|
let url = req.uri().to_string();
|
||||||
|
|
||||||
|
let has_body = if let Some(exact_size) = req.size_hint().exact() {
|
||||||
|
exact_size > 0
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
};
|
||||||
|
|
||||||
|
let maybe_request_body_rid = if has_body {
|
||||||
|
let stream: BytesStream = Box::pin(req.into_body().map(|r| {
|
||||||
|
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
|
||||||
|
}));
|
||||||
|
let stream_reader = StreamReader::new(stream);
|
||||||
|
let mut state = state.borrow_mut();
|
||||||
|
let request_body_rid = state.resource_table.add(RequestBodyResource {
|
||||||
|
conn_rid,
|
||||||
|
reader: AsyncRefCell::new(stream_reader),
|
||||||
|
cancel: CancelHandle::default(),
|
||||||
|
});
|
||||||
|
Some(request_body_rid)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut state = state.borrow_mut();
|
||||||
|
let response_sender_rid =
|
||||||
|
state.resource_table.add(ResponseSenderResource {
|
||||||
|
sender: tx,
|
||||||
|
conn_rid,
|
||||||
|
});
|
||||||
|
|
||||||
|
Poll::Ready(Ok(Some(NextRequestResponse(
|
||||||
|
maybe_request_body_rid,
|
||||||
|
response_sender_rid,
|
||||||
|
method,
|
||||||
|
headers,
|
||||||
|
url,
|
||||||
|
))))
|
||||||
|
} else if connection_closed {
|
||||||
|
Poll::Ready(Ok(None))
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(AnyError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_ignore_error(e: &AnyError) -> bool {
|
||||||
|
if let Some(e) = e.downcast_ref::<hyper::Error>() {
|
||||||
|
use std::error::Error;
|
||||||
|
if let Some(std_err) = e.source() {
|
||||||
|
if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
|
||||||
|
if io_err.kind() == std::io::ErrorKind::NotConnected {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn op_http_start(
|
||||||
|
state: &mut OpState,
|
||||||
|
tcp_stream_rid: ResourceId,
|
||||||
|
_data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<ResourceId, AnyError> {
|
||||||
|
let deno_service = Service::default();
|
||||||
|
|
||||||
|
if let Some(resource_rc) = state
|
||||||
|
.resource_table
|
||||||
|
.take::<TcpStreamResource>(tcp_stream_rid)
|
||||||
|
{
|
||||||
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
|
.expect("Only a single use of this resource should happen");
|
||||||
|
let (read_half, write_half) = resource.into_inner();
|
||||||
|
let tcp_stream = read_half.reunite(write_half)?;
|
||||||
|
let hyper_connection = Http::new()
|
||||||
|
.with_executor(LocalExecutor)
|
||||||
|
.serve_connection(tcp_stream, deno_service.clone());
|
||||||
|
let conn_resource = ConnResource {
|
||||||
|
hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
|
||||||
|
deno_service,
|
||||||
|
};
|
||||||
|
let rid = state.resource_table.add(conn_resource);
|
||||||
|
return Ok(rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(resource_rc) = state
|
||||||
|
.resource_table
|
||||||
|
.take::<TlsServerStreamResource>(tcp_stream_rid)
|
||||||
|
{
|
||||||
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
|
.expect("Only a single use of this resource should happen");
|
||||||
|
let (read_half, write_half) = resource.into_inner();
|
||||||
|
let tls_stream = read_half.unsplit(write_half);
|
||||||
|
|
||||||
|
let hyper_connection = Http::new()
|
||||||
|
.with_executor(LocalExecutor)
|
||||||
|
.serve_connection(tls_stream, deno_service.clone());
|
||||||
|
let conn_resource = ConnResource {
|
||||||
|
hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
|
||||||
|
deno_service,
|
||||||
|
};
|
||||||
|
let rid = state.resource_table.add(conn_resource);
|
||||||
|
return Ok(rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(bad_resource_id())
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use a tuple instead of struct to avoid serialization overhead of the keys.
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct RespondArgs(
|
||||||
|
// rid:
|
||||||
|
u32,
|
||||||
|
// status:
|
||||||
|
u16,
|
||||||
|
// headers:
|
||||||
|
Vec<String>,
|
||||||
|
);
|
||||||
|
|
||||||
|
fn op_http_response(
|
||||||
|
state: &mut OpState,
|
||||||
|
args: RespondArgs,
|
||||||
|
data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<Option<ResourceId>, AnyError> {
|
||||||
|
let rid = args.0;
|
||||||
|
let status = args.1;
|
||||||
|
let headers = args.2;
|
||||||
|
|
||||||
|
let response_sender = state
|
||||||
|
.resource_table
|
||||||
|
.take::<ResponseSenderResource>(rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
let response_sender = Rc::try_unwrap(response_sender)
|
||||||
|
.ok()
|
||||||
|
.expect("multiple op_http_respond ongoing");
|
||||||
|
|
||||||
|
let mut builder = Response::builder().status(status);
|
||||||
|
|
||||||
|
debug_assert_eq!(headers.len() % 2, 0);
|
||||||
|
let headers_count = headers.len() / 2;
|
||||||
|
builder.headers_mut().unwrap().reserve(headers_count);
|
||||||
|
for i in 0..headers_count {
|
||||||
|
builder = builder.header(&headers[2 * i], &headers[2 * i + 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let res;
|
||||||
|
let maybe_response_body_rid = if let Some(d) = data {
|
||||||
|
// If a body is passed, we use it, and don't return a body for streaming.
|
||||||
|
res = builder.body(Vec::from(&*d).into())?;
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
// If no body is passed, we return a writer for streaming the body.
|
||||||
|
let (sender, body) = Body::channel();
|
||||||
|
res = builder.body(body)?;
|
||||||
|
|
||||||
|
let response_body_rid = state.resource_table.add(ResponseBodyResource {
|
||||||
|
body: AsyncRefCell::new(sender),
|
||||||
|
cancel: CancelHandle::default(),
|
||||||
|
conn_rid: response_sender.conn_rid,
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(response_body_rid)
|
||||||
|
};
|
||||||
|
|
||||||
|
// oneshot::Sender::send(v) returns |v| on error, not an error object.
|
||||||
|
// The only failure mode is the receiver already having dropped its end
|
||||||
|
// of the channel.
|
||||||
|
if response_sender.sender.send(res).is_err() {
|
||||||
|
return Err(type_error("internal communication error"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(maybe_response_body_rid)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn op_http_response_close(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
_data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.take::<ResponseBodyResource>(rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let conn_resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<ConnResource>(resource.conn_rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
drop(resource);
|
||||||
|
|
||||||
|
poll_fn(|cx| match conn_resource.poll(cx) {
|
||||||
|
Poll::Ready(x) => Poll::Ready(x),
|
||||||
|
Poll::Pending => Poll::Ready(Ok(())),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn op_http_request_read(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<usize, AnyError> {
|
||||||
|
let mut data = data.ok_or_else(null_opbuf)?;
|
||||||
|
|
||||||
|
let resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<RequestBodyResource>(rid as u32)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let conn_resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<ConnResource>(resource.conn_rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
|
||||||
|
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||||
|
let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
|
||||||
|
|
||||||
|
poll_fn(|cx| {
|
||||||
|
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
|
||||||
|
// close ConnResource
|
||||||
|
// close RequestResource associated with connection
|
||||||
|
// close ResponseBodyResource associated with connection
|
||||||
|
return Poll::Ready(Err(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
read_fut.poll_unpin(cx).map_err(AnyError::from)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn op_http_response_write(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
data: Option<ZeroCopyBuf>,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let buf = data.ok_or_else(null_opbuf)?;
|
||||||
|
let resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<ResponseBodyResource>(rid as u32)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let conn_resource = state
|
||||||
|
.borrow()
|
||||||
|
.resource_table
|
||||||
|
.get::<ConnResource>(resource.conn_rid)
|
||||||
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
|
||||||
|
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||||
|
|
||||||
|
let mut send_data_fut = body
|
||||||
|
.send_data(Vec::from(&*buf).into())
|
||||||
|
.or_cancel(cancel)
|
||||||
|
.boxed_local();
|
||||||
|
|
||||||
|
poll_fn(|cx| {
|
||||||
|
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
|
||||||
|
// close ConnResource
|
||||||
|
// close RequestResource associated with connection
|
||||||
|
// close ResponseBodyResource associated with connection
|
||||||
|
return Poll::Ready(Err(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
send_data_fut.poll_unpin(cx).map_err(AnyError::from)
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.unwrap(); // panic on send_data error
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
type BytesStream =
|
||||||
|
Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
|
||||||
|
|
||||||
|
struct RequestBodyResource {
|
||||||
|
conn_rid: ResourceId,
|
||||||
|
reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
|
||||||
|
cancel: CancelHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for RequestBodyResource {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"requestBody".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ResponseSenderResource {
|
||||||
|
sender: oneshot::Sender<Response<Body>>,
|
||||||
|
conn_rid: ResourceId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for ResponseSenderResource {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"responseSender".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ResponseBodyResource {
|
||||||
|
body: AsyncRefCell<hyper::body::Sender>,
|
||||||
|
cancel: CancelHandle,
|
||||||
|
conn_rid: ResourceId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for ResponseBodyResource {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"responseBody".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Needed so hyper can use non Send futures
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct LocalExecutor;
|
||||||
|
|
||||||
|
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
|
||||||
|
where
|
||||||
|
Fut: Future + 'static,
|
||||||
|
Fut::Output: 'static,
|
||||||
|
{
|
||||||
|
fn execute(&self, fut: Fut) {
|
||||||
|
tokio::task::spawn_local(fut);
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ pub mod fetch;
|
||||||
pub mod file;
|
pub mod file;
|
||||||
pub mod fs;
|
pub mod fs;
|
||||||
pub mod fs_events;
|
pub mod fs_events;
|
||||||
|
pub mod http;
|
||||||
pub mod io;
|
pub mod io;
|
||||||
pub mod net;
|
pub mod net;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
|
|
@ -353,9 +353,9 @@ async fn op_connect(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TcpListenerResource {
|
pub struct TcpListenerResource {
|
||||||
listener: AsyncRefCell<TcpListener>,
|
pub listener: AsyncRefCell<TcpListener>,
|
||||||
cancel: CancelHandle,
|
pub cancel: CancelHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Resource for TcpListenerResource {
|
impl Resource for TcpListenerResource {
|
||||||
|
|
|
@ -138,6 +138,7 @@ impl MainWorker {
|
||||||
);
|
);
|
||||||
ops::fs_events::init(js_runtime);
|
ops::fs_events::init(js_runtime);
|
||||||
ops::fs::init(js_runtime);
|
ops::fs::init(js_runtime);
|
||||||
|
ops::http::init(js_runtime);
|
||||||
ops::io::init(js_runtime);
|
ops::io::init(js_runtime);
|
||||||
ops::net::init(js_runtime);
|
ops::net::init(js_runtime);
|
||||||
ops::os::init(js_runtime);
|
ops::os::init(js_runtime);
|
||||||
|
|
Loading…
Reference in a new issue