diff --git a/Cargo.lock b/Cargo.lock index 5a14c5b21e..97ae4d21ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,6 +653,7 @@ name = "deno_runtime" version = "0.10.1" dependencies = [ "atty", + "bytes", "deno_console", "deno_core", "deno_crypto", @@ -684,6 +685,7 @@ dependencies = [ "test_util", "tokio", "tokio-rustls", + "tokio-util", "trust-dns-proto", "trust-dns-resolver", "uuid", diff --git a/cli/bench/deno_http_native.js b/cli/bench/deno_http_native.js new file mode 100644 index 0000000000..fa779be211 --- /dev/null +++ b/cli/bench/deno_http_native.js @@ -0,0 +1,17 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] || "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const listener = Deno.listen({ hostname, port: Number(port) }); +console.log("Server listening on", addr); + +const body = Deno.core.encode("Hello World"); + +for await (const conn of listener) { + (async () => { + const requests = Deno.startHttp(conn); + for await (const { respondWith } of requests) { + respondWith(new Response(body)); + } + })(); +} diff --git a/cli/bench/http.rs b/cli/bench/http.rs index 952f3f19b5..690e26cf49 100644 --- a/cli/bench/http.rs +++ b/cli/bench/http.rs @@ -33,6 +33,7 @@ pub(crate) fn benchmark( res.insert("deno_tcp".to_string(), deno_tcp(deno_exe)?); // res.insert("deno_udp".to_string(), deno_udp(deno_exe)?); res.insert("deno_http".to_string(), deno_http(deno_exe)?); + res.insert("deno_http_native".to_string(), deno_http_native(deno_exe)?); // TODO(ry) deno_proxy disabled to make fetch() standards compliant. // res.insert("deno_proxy".to_string(), deno_http_proxy(deno_exe) hyper_hello_exe)) res.insert( @@ -200,6 +201,25 @@ fn deno_http(deno_exe: &str) -> Result { ) } +fn deno_http_native(deno_exe: &str) -> Result { + let port = get_port(); + println!("http_benchmark testing DENO using native bindings."); + run( + &[ + deno_exe, + "run", + "--allow-net", + "--reload", + "--unstable", + "cli/bench/deno_http_native.js", + &server_addr(port), + ], + port, + None, + None, + ) +} + #[allow(dead_code)] fn deno_http_proxy( deno_exe: &str, diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index 0833c33018..9dbe6817f1 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -1196,6 +1196,33 @@ declare namespace Deno { bytesSentData: number; bytesReceived: number; } + + export interface RequestEvent { + readonly request: Request; + respondWith(r: Response | Promise): void; + } + + export interface HttpConn extends AsyncIterable { + readonly rid: number; + + nextRequest(): Promise; + close(): void; + } + + /** **UNSTABLE**: new API, yet to be vetted. + * + * Parse HTTP requests from the given connection + * + * ```ts + * const httpConn = await Deno.startHttp(conn); + * const { request, respondWith } = await httpConn.next(); + * respondWith(new Response("Hello World")); + * ``` + * + * If `httpConn.next()` encounters an error or returns `done == true` then + * the underlying HttpConn resource is closed automatically. + */ + export function startHttp(conn: Conn): HttpConn; } declare function fetch( diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts new file mode 100644 index 0000000000..fc85301426 --- /dev/null +++ b/cli/tests/unit/http_test.ts @@ -0,0 +1,203 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +import { + assert, + assertEquals, + assertThrowsAsync, + unitTest, +} from "./test_util.ts"; +import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts"; +import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts"; + +unitTest({ perms: { net: true } }, async function httpServerBasic() { + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + for await (const conn of listener) { + const httpConn = Deno.startHttp(conn); + for await (const { request, respondWith } of httpConn) { + assertEquals(await request.text(), ""); + respondWith(new Response("Hello World")); + } + break; + } + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + headers: { "connection": "close" }, + }); + const text = await resp.text(); + assertEquals(text, "Hello World"); + await promise; +}); + +unitTest( + { perms: { net: true } }, + async function httpServerStreamResponse() { + const stream = new TransformStream(); + const writer = stream.writable.getWriter(); + writer.write(new TextEncoder().encode("hello ")); + writer.write(new TextEncoder().encode("world")); + writer.close(); + + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + const httpConn = Deno.startHttp(conn); + const evt = await httpConn.nextRequest(); + assert(evt); + const { request, respondWith } = evt; + assert(!request.body); + await respondWith(new Response(stream.readable)); + httpConn.close(); + listener.close(); + })(); + + const resp = await fetch("http://127.0.0.1:4501/"); + const respBody = await resp.text(); + assertEquals("hello world", respBody); + await promise; + }, +); + +unitTest( + { perms: { net: true } }, + async function httpServerStreamRequest() { + const stream = new TransformStream(); + const writer = stream.writable.getWriter(); + writer.write(new TextEncoder().encode("hello ")); + writer.write(new TextEncoder().encode("world")); + writer.close(); + + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + const httpConn = Deno.startHttp(conn); + const evt = await httpConn.nextRequest(); + assert(evt); + const { request, respondWith } = evt; + const reqBody = await request.text(); + assertEquals("hello world", reqBody); + await respondWith(new Response("")); + + // TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending + // request for url (https://localhost:4501/): connection closed before + // message completed". + assertEquals(await httpConn.nextRequest(), null); + + listener.close(); + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + body: stream.readable, + method: "POST", + headers: { "connection": "close" }, + }); + + await resp.arrayBuffer(); + await promise; + }, +); + +unitTest({ perms: { net: true } }, async function httpServerStreamDuplex() { + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + const httpConn = Deno.startHttp(conn); + const evt = await httpConn.nextRequest(); + assert(evt); + const { request, respondWith } = evt; + assert(request.body); + await respondWith(new Response(request.body)); + httpConn.close(); + listener.close(); + })(); + + const ts = new TransformStream(); + const writable = ts.writable.getWriter(); + const resp = await fetch("http://127.0.0.1:4501/", { + method: "POST", + body: ts.readable, + }); + assert(resp.body); + const reader = resp.body.getReader(); + await writable.write(new Uint8Array([1])); + const chunk1 = await reader.read(); + assert(!chunk1.done); + assertEquals(chunk1.value, new Uint8Array([1])); + await writable.write(new Uint8Array([2])); + const chunk2 = await reader.read(); + assert(!chunk2.done); + assertEquals(chunk2.value, new Uint8Array([2])); + await writable.close(); + const chunk3 = await reader.read(); + assert(chunk3.done); + await promise; +}); + +unitTest({ perms: { net: true } }, async function httpServerClose() { + const listener = Deno.listen({ port: 4501 }); + const client = await Deno.connect({ port: 4501 }); + const httpConn = Deno.startHttp(await listener.accept()); + client.close(); + const evt = await httpConn.nextRequest(); + assertEquals(evt, null); + // Note httpConn is automatically closed when "done" is reached. + listener.close(); +}); + +unitTest({ perms: { net: true } }, async function httpServerInvalidMethod() { + const listener = Deno.listen({ port: 4501 }); + const client = await Deno.connect({ port: 4501 }); + const httpConn = Deno.startHttp(await listener.accept()); + await client.write(new Uint8Array([1, 2, 3])); + await assertThrowsAsync( + async () => { + await httpConn.nextRequest(); + }, + Deno.errors.Http, + "invalid HTTP method parsed", + ); + // Note httpConn is automatically closed when it errors. + client.close(); + listener.close(); +}); + +unitTest( + { perms: { read: true, net: true } }, + async function httpServerWithTls(): Promise { + const hostname = "localhost"; + const port = 4501; + + const promise = (async () => { + const listener = Deno.listenTls({ + hostname, + port, + certFile: "cli/tests/tls/localhost.crt", + keyFile: "cli/tests/tls/localhost.key", + }); + const conn = await listener.accept(); + const httpConn = Deno.startHttp(conn); + const evt = await httpConn.nextRequest(); + assert(evt); + const { request, respondWith } = evt; + await respondWith(new Response("Hello World")); + + // TODO(ry) If we don't call httpConn.nextRequest() here we get "error sending + // request for url (https://localhost:4501/): connection closed before + // message completed". + assertEquals(await httpConn.nextRequest(), null); + + listener.close(); + })(); + + const caData = Deno.readTextFileSync("cli/tests/tls/RootCA.pem"); + const client = Deno.createHttpClient({ caData }); + const resp = await fetch(`https://${hostname}:${port}/`, { + client, + headers: { "connection": "close" }, + }); + const respBody = await resp.text(); + assertEquals("Hello World", respBody); + await promise; + client.close(); + }, +); diff --git a/cli/tests/unit/unit_tests.ts b/cli/tests/unit/unit_tests.ts index a736e97ca0..0dcbfe80bb 100644 --- a/cli/tests/unit/unit_tests.ts +++ b/cli/tests/unit/unit_tests.ts @@ -30,6 +30,7 @@ import "./fs_events_test.ts"; import "./get_random_values_test.ts"; import "./globals_test.ts"; import "./headers_test.ts"; +import "./http_test.ts"; import "./internals_test.ts"; import "./io_test.ts"; import "./link_test.ts"; diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 5ee45325d4..695b948025 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -20,5 +20,6 @@ where F: std::future::Future, { let rt = create_basic_runtime(); - rt.block_on(future) + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, future) } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 864b6b7f31..eda9f2bf75 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -46,11 +46,12 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.7.1" } deno_webgpu = { path = "../op_crates/webgpu", version = "0.3.1" } atty = "0.2.14" +bytes = "1" dlopen = "0.1.8" encoding_rs = "0.8.28" filetime = "0.2.14" http = "0.2.3" -hyper = { version = "0.14.5", features = ["server"] } +hyper = { version = "0.14.5", features = ["server", "stream", "http1", "http2", "runtime"] } indexmap = "1.6.2" lazy_static = "1.4.0" libc = "0.2.93" @@ -63,6 +64,7 @@ serde = { version = "1.0.125", features = ["derive"] } sys-info = "0.8.0" termcolor = "1.1.2" tokio = { version = "1.4.0", features = ["full"] } +tokio-util = { version = "0.6", features = ["io"] } tokio-rustls = "0.22.0" uuid = { version = "0.8.2", features = ["v4"] } webpki = "0.21.4" diff --git a/runtime/errors.rs b/runtime/errors.rs index a8152b0758..7bb109fb9d 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -131,6 +131,10 @@ fn get_url_parse_error_class(_error: &url::ParseError) -> &'static str { "URIError" } +fn get_hyper_error_class(_error: &hyper::Error) -> &'static str { + "Http" +} + #[cfg(unix)] fn get_nix_error_class(error: &nix::Error) -> &'static str { use nix::errno::Errno::*; @@ -156,6 +160,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { e.downcast_ref::() .map(get_dlopen_error_class) }) + .or_else(|| e.downcast_ref::().map(get_hyper_error_class)) .or_else(|| { e.downcast_ref::().map(|e| { let io_err: io::Error = e.to_owned().into(); diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js new file mode 100644 index 0000000000..cfb015edd7 --- /dev/null +++ b/runtime/js/40_http.js @@ -0,0 +1,210 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const { Request, dontValidateUrl, fastBody, Response } = + window.__bootstrap.fetch; + const { Headers } = window.__bootstrap.headers; + const errors = window.__bootstrap.errors.errors; + const core = window.Deno.core; + const { ReadableStream } = window.__bootstrap.streams; + + function flatEntries(obj) { + const entries = []; + for (const key in obj) { + entries.push(key); + entries.push(obj[key]); + } + return entries; + } + + function startHttp(conn) { + const rid = Deno.core.jsonOpSync("op_http_start", conn.rid); + return new HttpConn(rid); + } + + class HttpConn { + #rid = 0; + + constructor(rid) { + this.#rid = rid; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await Deno.core.jsonOpAsync( + "op_http_request_next", + this.#rid, + ); + } catch (error) { + if (error instanceof errors.BadResource) { + return null; + } else if (error instanceof errors.Interrupted) { + return null; + } + throw error; + } + if (nextRequest === null) return null; + + const [ + requestBodyRid, + responseSenderRid, + method, + headersList, + url, + ] = nextRequest; + + /** @type {ReadableStream | undefined} */ + let body = undefined; + if (typeof requestBodyRid === "number") { + body = createRequestBodyStream(requestBodyRid); + } + + const request = new Request(url, { + body, + method, + headers: new Headers(headersList), + [dontValidateUrl]: true, + }); + + const respondWith = createRespondWith(responseSenderRid, this.#rid); + + return { request, respondWith }; + } + + /** @returns {void} */ + close() { + core.close(this.#rid); + } + + [Symbol.asyncIterator]() { + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + if (reqEvt === null) return { value: undefined, done: true }; + return { value: reqEvt, done: false }; + }, + }; + } + } + + function readRequest(requestRid, zeroCopyBuf) { + return Deno.core.jsonOpAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); + } + + function respond(responseSenderRid, resp, zeroCopyBuf) { + return Deno.core.jsonOpSync("op_http_response", [ + responseSenderRid, + resp.status ?? 200, + flatEntries(resp.headers ?? {}), + ], zeroCopyBuf); + } + + function createRespondWith(responseSenderRid, connRid) { + return async function (resp) { + if (resp instanceof Promise) { + resp = await resp; + } + + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + // If response body is Uint8Array it will be sent synchronously + // in a single op, in other case a "response body" resource will be + // created and we'll be streaming it. + const body = resp[fastBody](); + let zeroCopyBuf; + if (body instanceof ArrayBuffer) { + zeroCopyBuf = new Uint8Array(body); + } else if (!body) { + zeroCopyBuf = new Uint8Array(0); + } else { + zeroCopyBuf = null; + } + + const responseBodyRid = respond( + responseSenderRid, + resp, + zeroCopyBuf, + ); + + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (typeof responseBodyRid === "number") { + if (!body || !(body instanceof ReadableStream)) { + throw new Error( + "internal error: recieved responseBodyRid, but response has no body or is not a stream", + ); + } + for await (const chunk of body) { + const data = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength, + ); + await Deno.core.jsonOpAsync( + "op_http_response_write", + responseBodyRid, + data, + ); + } + + // Once all chunks are sent, and the request body is closed, we can close + // the response body. + await Deno.core.jsonOpAsync("op_http_response_close", responseBodyRid); + } + }; + } + + function createRequestBodyStream(requestBodyRid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await readRequest( + requestBodyRid, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(chunk.subarray(0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + core.close(requestBodyRid); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + core.close(requestBodyRid); + } + }, + cancel() { + core.close(requestBodyRid); + }, + }); + } + + window.__bootstrap.http = { + startHttp, + }; +})(this); diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index d820d08964..84eb69ef06 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -120,6 +120,7 @@ listen: __bootstrap.netUnstable.listen, connect: __bootstrap.netUnstable.connect, listenDatagram: __bootstrap.netUnstable.listenDatagram, + startHttp: __bootstrap.http.startHttp, startTls: __bootstrap.tls.startTls, fstatSync: __bootstrap.fs.fstatSync, fstat: __bootstrap.fs.fstat, @@ -132,5 +133,6 @@ utimeSync: __bootstrap.fs.utimeSync, HttpClient: __bootstrap.fetch.HttpClient, createHttpClient: __bootstrap.fetch.createHttpClient, + http: __bootstrap.http, }; })(this); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs new file mode 100644 index 0000000000..9cf4ff9d51 --- /dev/null +++ b/runtime/ops/http.rs @@ -0,0 +1,523 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::ops::io::TcpStreamResource; +use crate::ops::io::TlsServerStreamResource; +use deno_core::error::bad_resource_id; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::FutureExt; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::AsyncRefCell; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use hyper::body::HttpBody; +use hyper::http; +use hyper::server::conn::Connection; +use hyper::server::conn::Http; +use hyper::service::Service as HyperService; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; +use tokio::sync::oneshot; +use tokio_rustls::server::TlsStream; +use tokio_util::io::StreamReader; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_http_start", op_http_start); + + super::reg_json_async(rt, "op_http_request_next", op_http_request_next); + super::reg_json_async(rt, "op_http_request_read", op_http_request_read); + + super::reg_json_sync(rt, "op_http_response", op_http_response); + super::reg_json_async(rt, "op_http_response_write", op_http_response_write); + super::reg_json_async(rt, "op_http_response_close", op_http_response_close); +} + +struct ServiceInner { + request: Request, + response_tx: oneshot::Sender>, +} + +#[derive(Clone, Default)] +struct Service { + inner: Rc>>, +} + +impl HyperService> for Service { + type Response = Response; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin>>>; + + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: Request) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); + + async move { Ok(resp_rx.await.unwrap()) }.boxed_local() + } +} + +enum ConnType { + Tcp(Rc>>), + Tls(Rc, Service, LocalExecutor>>>), +} + +struct ConnResource { + hyper_connection: ConnType, + deno_service: Service, +} + +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll> { + match &self.hyper_connection { + ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), + ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), + } + .map_err(AnyError::from) + } +} + +impl Resource for ConnResource { + fn name(&self) -> Cow { + "httpConnection".into() + } +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct NextRequestResponse( + // request_body_rid: + Option, + // response_sender_rid: + ResourceId, + // method: + String, + // headers: + Vec<(String, String)>, + // url: + String, +); + +async fn op_http_request_next( + state: Rc>, + conn_rid: ResourceId, + _data: Option, +) -> Result, AnyError> { + let conn_resource = state + .borrow() + .resource_table + .get::(conn_rid) + .ok_or_else(bad_resource_id)?; + + poll_fn(|cx| { + let connection_closed = match conn_resource.poll(cx) { + Poll::Pending => false, + Poll::Ready(Ok(())) => { + // close ConnResource + state + .borrow_mut() + .resource_table + .take::(conn_rid) + .unwrap(); + true + } + Poll::Ready(Err(e)) => { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // close ConnResource + state + .borrow_mut() + .resource_table + .take::(conn_rid) + .unwrap(); + + if should_ignore_error(&e) { + true + } else { + return Poll::Ready(Err(e)); + } + } + }; + + if let Some(request_resource) = + conn_resource.deno_service.inner.borrow_mut().take() + { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + + let mut headers = Vec::with_capacity(req.headers().len()); + for (name, value) in req.headers().iter() { + let name = name.to_string(); + let value = value.to_str().unwrap_or("").to_string(); + headers.push((name, value)); + } + + let url = req.uri().to_string(); + + let has_body = if let Some(exact_size) = req.size_hint().exact() { + exact_size > 0 + } else { + true + }; + + let maybe_request_body_rid = if has_body { + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = StreamReader::new(stream); + let mut state = state.borrow_mut(); + let request_body_rid = state.resource_table.add(RequestBodyResource { + conn_rid, + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); + Some(request_body_rid) + } else { + None + }; + + let mut state = state.borrow_mut(); + let response_sender_rid = + state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Poll::Ready(Ok(Some(NextRequestResponse( + maybe_request_body_rid, + response_sender_rid, + method, + headers, + url, + )))) + } else if connection_closed { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + }) + .await + .map_err(AnyError::from) +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +fn op_http_start( + state: &mut OpState, + tcp_stream_rid: ResourceId, + _data: Option, +) -> Result { + let deno_service = Service::default(); + + if let Some(resource_rc) = state + .resource_table + .take::(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tcp_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), + deno_service, + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + if let Some(resource_rc) = state + .resource_table + .take::(tcp_stream_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tls_stream = read_half.unsplit(write_half); + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(tls_stream, deno_service.clone()); + let conn_resource = ConnResource { + hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), + deno_service, + }; + let rid = state.resource_table.add(conn_resource); + return Ok(rid); + } + + Err(bad_resource_id()) +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Deserialize)] +struct RespondArgs( + // rid: + u32, + // status: + u16, + // headers: + Vec, +); + +fn op_http_response( + state: &mut OpState, + args: RespondArgs, + data: Option, +) -> Result, AnyError> { + let rid = args.0; + let status = args.1; + let headers = args.2; + + let response_sender = state + .resource_table + .take::(rid) + .ok_or_else(bad_resource_id)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let mut builder = Response::builder().status(status); + + debug_assert_eq!(headers.len() % 2, 0); + let headers_count = headers.len() / 2; + builder.headers_mut().unwrap().reserve(headers_count); + for i in 0..headers_count { + builder = builder.header(&headers[2 * i], &headers[2 * i + 1]); + } + + let res; + let maybe_response_body_rid = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + res = builder.body(Vec::from(&*d).into())?; + None + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + res = builder.body(body)?; + + let response_body_rid = state.resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + cancel: CancelHandle::default(), + conn_rid: response_sender.conn_rid, + }); + + Some(response_body_rid) + }; + + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + return Err(type_error("internal communication error")); + } + + Ok(maybe_response_body_rid) +} + +async fn op_http_response_close( + state: Rc>, + rid: ResourceId, + _data: Option, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .take::(rid) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + drop(resource); + + poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await +} + +async fn op_http_request_read( + state: Rc>, + rid: ResourceId, + data: Option, +) -> Result { + let mut data = data.ok_or_else(null_opbuf)?; + + let resource = state + .borrow() + .resource_table + .get::(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc>, + rid: ResourceId, + data: Option, +) -> Result<(), AnyError> { + let buf = data.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get::(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + + let mut send_data_fut = body + .send_data(Vec::from(&*buf).into()) + .or_cancel(cancel) + .boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + send_data_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await? + .unwrap(); // panic on send_data error + + Ok(()) +} + +type BytesStream = + Pin> + Unpin>>; + +struct RequestBodyResource { + conn_rid: ResourceId, + reader: AsyncRefCell>, + cancel: CancelHandle, +} + +impl Resource for RequestBodyResource { + fn name(&self) -> Cow { + "requestBody".into() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell, + cancel: CancelHandle, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow { + "responseBody".into() + } +} + +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl hyper::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index 67eae27b2f..d9bd2ba835 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -5,6 +5,7 @@ pub mod fetch; pub mod file; pub mod fs; pub mod fs_events; +pub mod http; pub mod io; pub mod net; #[cfg(unix)] diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index 48431ef22b..4c38d2293e 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -353,9 +353,9 @@ async fn op_connect( } } -struct TcpListenerResource { - listener: AsyncRefCell, - cancel: CancelHandle, +pub struct TcpListenerResource { + pub listener: AsyncRefCell, + pub cancel: CancelHandle, } impl Resource for TcpListenerResource { diff --git a/runtime/worker.rs b/runtime/worker.rs index 982198d65c..5db542b42d 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -138,6 +138,7 @@ impl MainWorker { ); ops::fs_events::init(js_runtime); ops::fs::init(js_runtime); + ops::http::init(js_runtime); ops::io::init(js_runtime); ops::net::init(js_runtime); ops::os::init(js_runtime);