diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ae75f9b6e7..130d3fd17f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,9 +45,10 @@ jobs: # e.g. a flaky test. # Don't fast-fail on tag build because publishing binaries shouldn't be # prevented if any of the stages fail (which can be a false negative). - fail-fast: ${{ github.event_name == 'pull_request' || - (github.ref != 'refs/heads/main' && - !startsWith(github.ref, 'refs/tags/')) }} + # fail-fast: ${{ github.event_name == 'pull_request' || + # (github.ref != 'refs/heads/main' && + # !startsWith(github.ref, 'refs/tags/')) }} + fail-fast: false env: CARGO_TERM_COLOR: always @@ -268,7 +269,7 @@ jobs: # when https://github.com/actions/cache/pull/489 (or 571) is merged. uses: actions/cache@03e00da99d75a2204924908e1cca7902cafce66b if: (matrix.profile == 'release' || matrix.profile == 'fastci') && - github.ref == 'refs/heads/main' + github.ref == 'refs/heads/main' with: path: | ./target @@ -615,9 +616,9 @@ jobs: publish-canary: name: publish canary runs-on: ubuntu-20.04 - needs: ['build'] + needs: ["build"] if: github.repository == 'denoland/deno' && - (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) + (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) steps: - name: Setup gcloud uses: google-github-actions/setup-gcloud@master diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 080b94a1d0..486f708794 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -865,6 +865,7 @@ unitTest( const writer = writable.getWriter(); async function writeResponse() { + await delay(50); await writer.write( new TextEncoder().encode( "written to the writable side of a TransformStream", diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 4e6b8fc003..fc0b5bda4b 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -27,6 +27,7 @@ Set, SetPrototypeAdd, SetPrototypeDelete, + SetPrototypeHas, SetPrototypeValues, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -42,6 +43,8 @@ class HttpConn { #rid = 0; + #closed = false; + // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed @@ -66,6 +69,7 @@ this.#rid, ); } catch (error) { + this.close(); // A connection error seen here would cause disrupted responses to throw // a generic `BadResource` error. Instead store this error and replace // those with it. @@ -79,26 +83,26 @@ } throw error; } - if (nextRequest === null) return null; + if (nextRequest == null) { + this.close(); + return null; + } const [ - requestRid, - responseSenderRid, + streamRid, method, headersList, url, ] = nextRequest; + SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream | undefined} */ let body = null; - if (typeof requestRid === "number") { - SetPrototypeAdd(this.managedResources, requestRid); - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(this, requestRid); - } + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = createRequestBodyStream(streamRid); } const innerRequest = newInnerRequest( @@ -111,22 +115,21 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - SetPrototypeAdd(this.managedResources, responseSenderRid); - const respondWith = createRespondWith( - this, - responseSenderRid, - requestRid, - ); + const respondWith = createRespondWith(this, streamRid); return { request, respondWith }; } /** @returns {void} */ close() { - for (const rid of SetPrototypeValues(this.managedResources)) { - core.tryClose(rid); + if (!this.#closed) { + this.#closed = true; + core.close(this.#rid); + for (const rid of SetPrototypeValues(this.managedResources)) { + SetPrototypeDelete(this.managedResources, rid); + core.close(rid); + } } - core.close(this.#rid); } [SymbolAsyncIterator]() { @@ -136,91 +139,85 @@ async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt - return { value: reqEvt, done: reqEvt === null }; + return { value: reqEvt ?? undefined, done: reqEvt === null }; }, }; } } - function readRequest(requestRid, zeroCopyBuf) { - return core.opAsync( - "op_http_request_read", - requestRid, - zeroCopyBuf, - ); + function readRequest(streamRid, buf) { + return core.opAsync("op_http_request_read", streamRid, buf); } - function createRespondWith(httpConn, responseSenderRid, requestRid) { + function createRespondWith(httpConn, streamRid) { return async function respondWith(resp) { - if (resp instanceof Promise) { - resp = await resp; - } + try { + if (resp instanceof Promise) { + resp = await resp; + } - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } - const innerResp = toInnerResponse(resp); + const innerResp = toInnerResponse(resp); - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; } } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; + respBody = new Uint8Array(0); } - } else { - respBody = new Uint8Array(0); - } + const isStreamingResponseBody = !(respBody instanceof Uint8Array); - SetPrototypeDelete(httpConn.managedResources, responseSenderRid); - let responseBodyRid; - try { - responseBodyRid = await core.opAsync("op_http_response", [ - responseSenderRid, - innerResp.status ?? 200, - innerResp.headerList, - ], respBody instanceof Uint8Array ? respBody : null); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); - } - throw error; - } - - // If `respond` returns a responseBodyRid, we should stream the body - // to that resource. - if (responseBodyRid !== null) { - SetPrototypeAdd(httpConn.managedResources, responseBodyRid); try { + await core.opAsync("op_http_response", [ + streamRid, + innerResp.status ?? 200, + innerResp.headerList, + ], isStreamingResponseBody ? null : respBody); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } + + if (isStreamingResponseBody) { if (respBody === null || !(respBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); } @@ -233,11 +230,7 @@ break; } try { - await core.opAsync( - "op_http_response_write", - responseBodyRid, - value, - ); + await core.opAsync("op_http_response_write", streamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if (error instanceof BadResource && connError != null) { @@ -248,61 +241,55 @@ throw error; } } - } finally { - // Once all chunks are sent, and the request body is closed, we can - // close the response body. - SetPrototypeDelete(httpConn.managedResources, responseBodyRid); try { - await core.opAsync("op_http_response_close", responseBodyRid); - } catch { /* pass */ } + await core.opAsync("op_http_response_close", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } - } - const ws = resp[_ws]; - if (ws) { - if (typeof requestRid !== "number") { - throw new TypeError( - "This request can not be upgraded to a websocket connection.", + const ws = resp[_ws]; + if (ws) { + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + streamRid, ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + httpConn.close(); + + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); + + ws[_readyState] = WebSocket.CLOSED; + + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + ws.dispatchEvent(event); + + core.tryClose(wsRid); + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + } } - - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - requestRid, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); - - ws[_readyState] = WebSocket.CLOSED; - - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); - - const event = new CloseEvent("close"); - ws.dispatchEvent(event); - - core.tryClose(wsRid); - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); + } finally { + if (SetPrototypeHas(httpConn.managedResources, streamRid)) { + SetPrototypeDelete(httpConn.managedResources, streamRid); + core.close(streamRid); } - } else if (typeof requestRid === "number") { - // Try to close "request" resource. It might have been already consumed, - // but if it hasn't been we need to close it here to avoid resource - // leak. - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.tryClose(requestRid); } }; } - function createRequestBodyStream(httpConn, requestRid) { + function createRequestBodyStream(streamRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -310,32 +297,21 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest( - requestRid, - chunk, - ); + const read = await readRequest(streamRid, chunk); if (read > 0) { // We read some data. Enqueue it onto the stream. controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } }, - cancel() { - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); - }, }); } @@ -369,7 +345,6 @@ } const accept = core.opSync("op_http_websocket_accept_header", websocketKey); - const r = newInnerResponse(101); r.headerList = [ ["upgrade", "websocket"], diff --git a/ext/http/lib.rs b/ext/http/lib.rs index f040ce1045..ee9ef1246b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,17 +1,28 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; +use bytes::Bytes; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::oneshot; +use deno_core::futures::future::pending; +use deno_core::futures::future::select; +use deno_core::futures::future::Either; +use deno_core::futures::future::RemoteHandle; +use deno_core::futures::future::Shared; +use deno_core::futures::never::Never; +use deno_core::futures::pin_mut; +use deno_core::futures::ready; +use deno_core::futures::stream::Peekable; use deno_core::futures::FutureExt; -use deno_core::futures::Stream; use deno_core::futures::StreamExt; +use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -20,33 +31,31 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::header::CONNECTION; -use hyper::header::SEC_WEBSOCKET_KEY; -use hyper::header::SEC_WEBSOCKET_VERSION; -use hyper::header::UPGRADE; -use hyper::http; +use deno_websocket::ws_create_server_stream; use hyper::server::conn::Http; -use hyper::service::Service as HyperService; +use hyper::service::Service; use hyper::Body; -use hyper::Method; use hyper::Request; use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; +use std::error::Error; use std::future::Future; +use std::io; +use std::mem::replace; +use std::mem::take; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; +use tokio::task::spawn_local; pub fn init() -> Extension { Extension::builder() @@ -72,86 +81,265 @@ pub fn init() -> Extension { .build() } -struct ServiceInner { - request: Request, - response_tx: oneshot::Sender>, -} - -#[derive(Clone, Default)] -struct Service { - inner: Rc>>, - waker: Rc, -} - -impl HyperService> for Service { - type Response = Response; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin>>>; - - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: Request) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { - resp_rx.await.or_else(|_| - // Fallback dummy response in case sender was dropped due to closed conn - Response::builder() - .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![].into())) - } - .boxed_local() - } -} - -type ConnFuture = Pin>>>; - -struct Conn { - scheme: &'static str, +struct HttpConnResource { addr: SocketAddr, - conn: Rc>, + scheme: &'static str, + acceptors_tx: mpsc::UnboundedSender, + close_result_fut: Shared>>>, + cancel_handle: Rc, // Cancels accept ops. } -struct ConnResource { - hyper_connection: Conn, - deno_service: Service, - cancel: CancelHandle, -} +impl HttpConnResource { + fn new(io: S, scheme: &'static str, addr: SocketAddr) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let (acceptors_tx, acceptors_rx) = mpsc::unbounded::(); + let service = HttpListenerService::new(acceptors_rx); -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll> { - self - .hyper_connection - .conn - .borrow_mut() - .poll_unpin(cx) - .map_err(AnyError::from) + let conn_fut = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, service) + .with_upgrades(); + + // When the cancel handle is used, the connection shuts down gracefully. + // No new HTTP streams will be accepted, but existing streams will be able + // to continue operating and eventually shut down cleanly. + let cancel_handle = CancelHandle::new_rc(); + let shutdown_fut = pending::().or_cancel(&cancel_handle).fuse(); + + // A local task that polls the hyper connection future to completion. + let task_fut = async move { + pin_mut!(shutdown_fut); + pin_mut!(conn_fut); + let result = match select(conn_fut, shutdown_fut).await { + Either::Left((result, _)) => result, + Either::Right((_, mut conn_fut)) => { + conn_fut.as_mut().graceful_shutdown(); + conn_fut.await + } + }; + swallow_not_connected_error(result).map_err(Arc::from) + }; + let (task_fut, close_result_fut) = task_fut.remote_handle(); + let output_fut = close_result_fut.shared(); + spawn_local(task_fut); + + Self { + addr, + scheme, + acceptors_tx, + close_result_fut: output_fut, + cancel_handle, + } + } + + // Accepts a new incoming HTTP request. + async fn accept( + self: &Rc, + ) -> Result, AnyError> { + let fut = async { + let (request_tx, request_rx) = oneshot::channel(); + let (response_tx, response_rx) = oneshot::channel(); + + let acceptor = HttpAcceptor::new(request_tx, response_rx); + self.acceptors_tx.unbounded_send(acceptor).ok()?; + + let request = request_rx.await.ok()?; + let stream = HttpStreamResource::new(self, request, response_tx); + Some(stream) + }; + + async { + match fut.await { + Some(stream) => Ok(Some(stream)), + // Return the connection error, if any. + None => self.get_closed().map_ok(|_| None).await, + } + } + .try_or_cancel(&self.cancel_handle) + .await + } + + /// A future that completes when this HTTP connection is closed or errors. + async fn get_closed(&self) -> Result<(), AnyError> { + self.close_result_fut.clone().map_err(AnyError::from).await + } + + fn scheme(&self) -> &'static str { + self.scheme + } + + fn addr(&self) -> SocketAddr { + self.addr } } -impl Resource for ConnResource { +/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. +fn swallow_not_connected_error( + result: Result<(), hyper::Error>, +) -> Result<(), hyper::Error> { + if result + .as_ref() + .err() + .and_then(|err| err.source()) + .and_then(|err| err.downcast_ref::()) + .filter(|err| err.kind() == io::ErrorKind::NotConnected) + .is_some() + { + Ok(()) + } else { + result + } +} + +impl Resource for HttpConnResource { fn name(&self) -> Cow { - "httpConnection".into() + "httpConn".into() } fn close(self: Rc) { - self.cancel.cancel() + self.cancel_handle.cancel(); + } +} + +/// Creates a new HttpConn resource which uses `io` as its transport. +pub fn http_create_conn_resource( + state: &mut OpState, + io: S, + addr: SocketAddr, + scheme: &'static str, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let listener = HttpConnResource::new(io, scheme, addr); + let rid = state.resource_table.add(listener); + Ok(rid) +} + +/// An object that implements the `hyper::Service` trait, through which Hyper +/// delivers incoming HTTP requests. +struct HttpListenerService { + acceptor_rx: Peekable>, +} + +impl HttpListenerService { + fn new(acceptor_rx: mpsc::UnboundedReceiver) -> Self { + let acceptor_rx = acceptor_rx.peekable(); + Self { acceptor_rx } + } +} + +impl Service> for HttpListenerService { + type Response = Response; + type Error = oneshot::Canceled; + type Future = oneshot::Receiver>; + + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + let acceptor_rx = Pin::new(&mut self.acceptor_rx); + let result = ready!(acceptor_rx.poll_peek(cx)) + .map(|_| ()) + .ok_or(oneshot::Canceled); + Poll::Ready(result) + } + + fn call(&mut self, request: Request) -> Self::Future { + let acceptor = self.acceptor_rx.next().now_or_never().flatten().unwrap(); + acceptor.call(request) + } +} + +/// A pair of one-shot channels which first transfer a HTTP request from the +/// Hyper service to the HttpConn resource, and then take the Response back to +/// the service. +struct HttpAcceptor { + request_tx: oneshot::Sender>, + response_rx: oneshot::Receiver>, +} + +impl HttpAcceptor { + fn new( + request_tx: oneshot::Sender>, + response_rx: oneshot::Receiver>, + ) -> Self { + Self { + request_tx, + response_rx, + } + } + + fn call(self, request: Request) -> oneshot::Receiver> { + let Self { + request_tx, + response_rx, + } = self; + request_tx + .send(request) + .map(|_| response_rx) + .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver. + } +} + +/// A resource representing a single HTTP request/response stream. +struct HttpStreamResource { + conn: Rc, + rd: AsyncRefCell, + wr: AsyncRefCell, + cancel_handle: CancelHandle, +} + +impl HttpStreamResource { + fn new( + conn: &Rc, + request: Request, + response_tx: oneshot::Sender>, + ) -> Self { + Self { + conn: conn.clone(), + rd: HttpRequestReader::Headers(request).into(), + wr: HttpResponseWriter::Headers(response_tx).into(), + cancel_handle: CancelHandle::new(), + } + } +} + +impl Resource for HttpStreamResource { + fn name(&self) -> Cow { + "httpStream".into() + } + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } +} + +/// The read half of an HTTP stream. +enum HttpRequestReader { + Headers(Request), + Body(Peekable), + Closed, +} + +impl Default for HttpRequestReader { + fn default() -> Self { + Self::Closed + } +} + +/// The write half of an HTTP stream. +enum HttpResponseWriter { + Headers(oneshot::Sender>), + Body(hyper::body::Sender), + Closed, +} + +impl Default for HttpResponseWriter { + fn default() -> Self { + Self::Closed } } @@ -159,9 +347,7 @@ impl Resource for ConnResource { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // request_rid: - Option, - // response_sender_rid: + // stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -175,109 +361,38 @@ struct NextRequestResponse( async fn op_http_request_next( state: Rc>, - conn_rid: ResourceId, + rid: ResourceId, _: (), ) -> Result, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::(conn_rid)?; + let conn = state.borrow().resource_table.get::(rid)?; - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - - // Check if conn is open/close/errored - let (conn_closed, conn_result) = match conn_resource.poll(cx) { - Poll::Pending => (false, Ok(())), - Poll::Ready(Ok(())) => (true, Ok(())), - Poll::Ready(Err(e)) => { - if should_ignore_error(&e) { - (true, Ok(())) - } else { - (true, Err(e)) - } - } - }; - // Drop conn resource if closed - if conn_closed { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::(conn_rid); - - // Fail with err if unexpected conn error, early return None otherwise - return Poll::Ready(conn_result.map(|_| None)); - } - - if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { - let Conn { scheme, addr, .. } = conn_resource.hyper_connection; - let mut state = state.borrow_mut(); - let next = - prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; - Poll::Ready(Ok(Some(next))) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} - -fn prepare_next_request( - state: &mut OpState, - conn_rid: ResourceId, - request_resource: ServiceInner, - scheme: &'static str, - addr: SocketAddr, -) -> Result { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - let headers = req_headers(&req); - let url = req_url(&req, scheme, addr)?; - - let is_websocket = is_websocket_request(&req); - let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD); - let has_body = - is_websocket || (can_have_body && req.size_hint().exact() != Some(0)); - - let maybe_request_rid = if has_body { - let request_rid = state.resource_table.add(RequestResource { - conn_rid, - inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), - cancel: CancelHandle::default(), - }); - Some(request_rid) - } else { - None + let stream = match conn.accept().await { + Ok(Some(stream)) => Rc::new(stream), + Ok(None) => return Ok(None), + Err(err) => return Err(err), }; - let response_sender_rid = state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); + let rd = RcRef::map(&stream, |r| &r.rd).borrow().await; + let request = match &*rd { + HttpRequestReader::Headers(request) => request, + _ => unreachable!(), + }; - Ok(NextRequestResponse( - maybe_request_rid, - response_sender_rid, - method, - headers, - url, - )) + let method = request.method().to_string(); + let headers = req_headers(request); + let url = req_url(request, conn.scheme(), conn.addr()); + + let stream_rid = state.borrow_mut().resource_table.add_rc(stream); + + let r = NextRequestResponse(stream_rid, method, headers, url); + Ok(Some(r)) } fn req_url( req: &hyper::Request, scheme: &'static str, addr: SocketAddr, -) -> Result { +) -> String { let host: Cow = if let Some(auth) = req.uri().authority() { match addr.port() { 443 if scheme == "https" => Cow::Borrowed(auth.host()), @@ -287,12 +402,22 @@ fn req_url( } else if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) + match host.to_str() { + Ok(host) => Cow::Borrowed(host), + Err(_) => Cow::Owned( + host + .as_bytes() + .iter() + .cloned() + .map(char::from) + .collect::(), + ), + } } else { Cow::Owned(addr.to_string()) }; let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - Ok([scheme, "://", &host, path].concat()) + [scheme, "://", &host, path].concat() } fn req_headers( @@ -326,66 +451,6 @@ fn req_headers( headers } -fn is_websocket_request(req: &hyper::Request) -> bool { - req.version() == hyper::Version::HTTP_11 - && req.method() == hyper::Method::GET - && req.headers().contains_key(&SEC_WEBSOCKET_KEY) - && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13" - && header(req.headers(), &UPGRADE).eq_ignore_ascii_case(b"websocket") - && header(req.headers(), &CONNECTION) - .split(|c| *c == b' ' || *c == b',') - .any(|token| token.eq_ignore_ascii_case(b"upgrade")) -} - -fn header<'a>( - h: &'a hyper::http::HeaderMap, - name: &hyper::header::HeaderName, -) -> &'a [u8] { - h.get(name) - .map(hyper::header::HeaderValue::as_bytes) - .unwrap_or_default() -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -pub fn start_http( - state: &mut OpState, - io: IO, - addr: SocketAddr, - scheme: &'static str, -) -> Result { - let deno_service = Service::default(); - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(io, deno_service.clone()) - .with_upgrades(); - let conn = Pin::new(Box::new(hyper_connection)); - let conn_resource = ConnResource { - hyper_connection: Conn { - scheme, - addr, - conn: Rc::new(RefCell::new(conn)), - }, - deno_service, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - Ok(rid) -} - // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Deserialize)] struct RespondArgs( @@ -400,24 +465,13 @@ struct RespondArgs( async fn op_http_response( state: Rc>, args: RespondArgs, - data: Option, -) -> Result, AnyError> { + body_buf: Option, +) -> Result<(), AnyError> { let RespondArgs(rid, status, headers) = args; - - let response_sender = state + let stream = state .borrow_mut() .resource_table - .take::(rid)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_rid = response_sender.conn_rid; - - let conn_resource = state - .borrow() - .resource_table - .get::(conn_rid)?; + .get::(rid)?; let mut builder = Response::builder().status(status); @@ -426,175 +480,143 @@ async fn op_http_response( builder = builder.header(key.as_ref(), value.as_ref()); } - let res; - let maybe_response_body_rid = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - res = builder.body(Vec::from(&*d).into())?; - None - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - res = builder.body(body)?; + let body: Response; + let new_wr: HttpResponseWriter; - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid, - }); - - Some(response_body_rid) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::(rid); + match body_buf { + Some(body_buf) => { + // If a buffer was passed, we use it to construct a response body. + // TODO(piscisaureus): get rid of this buffer copy. + let body_bytes = Bytes::copy_from_slice(&body_buf[..]); + body = builder.body(Body::from(body_bytes))?; + new_wr = HttpResponseWriter::Closed; + } + None => { + // If no buffer was passed, the caller will stream the response body. + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::Body(body_tx); } - return Err(type_error("internal communication error")); } - let result = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => { - state.borrow_mut().resource_table.close(conn_rid).ok(); - Poll::Ready(x) - } - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - - if let Err(e) = result { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::(rid); - } - return Err(e); - } - - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); - } - Ok(maybe_response_body_rid) -} - -async fn op_http_response_close( - state: Rc>, - rid: ResourceId, - _: (), -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::(rid)?; - - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc>, - rid: ResourceId, - mut data: ZeroCopyBuf, -) -> Result { - let resource = state - .borrow() - .resource_table - .get::(rid as u32)?; - - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid)?; - - let mut inner = RcRef::map(resource.clone(), |r| &r.inner) - .borrow_mut() - .await; - - if let RequestOrStreamReader::Request(req) = &mut *inner { - let req = req.take().unwrap(); - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let reader = StreamReader::new(stream); - *inner = RequestOrStreamReader::StreamReader(reader); + let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + let response_tx = match replace(&mut *old_wr, new_wr) { + HttpResponseWriter::Headers(response_tx) => response_tx, + _ => return Err(custom_error("Http", "response headers already sent")), }; - let reader = match &mut *inner { - RequestOrStreamReader::StreamReader(reader) => reader, - _ => unreachable!(), - }; - - let cancel = RcRef::map(resource, |r| &r.cancel); - - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); - - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + match response_tx.send(body) { + Ok(_) => Ok(()), + Err(_) => { + stream.conn.get_closed().await?; + unreachable!(); } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await + } } async fn op_http_response_write( state: Rc>, rid: ResourceId, - data: ZeroCopyBuf, + buf: ZeroCopyBuf, ) -> Result<(), AnyError> { - let resource = state + let stream = state .borrow() .resource_table - .get::(rid as u32)?; + .get::(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - let conn_resource = state - .borrow() - .resource_table - .get::(resource.conn_rid)?; + loop { + let body_tx = match &mut *wr { + HttpResponseWriter::Body(body_tx) => body_tx, + HttpResponseWriter::Headers(_) => { + break Err(custom_error("Http", "no response headers")) + } + HttpResponseWriter::Closed => { + break Err(custom_error("Http", "response already completed")) + } + }; - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + // TODO(piscisaureus): pass zero-copy buf directly to `send_data()` below. + let bytes = Bytes::copy_from_slice(&buf[..]); + match body_tx.send_data(bytes).await { + Ok(_) => break Ok(()), + Err(err) => { + // Don't return "channel closed", that's an implementation detail. + // Pull up the failure associated with the connection instead. + assert!(err.is_closed()); + stream.conn.get_closed().await?; + // If there was no connection error, drop body_tx. + take(&mut *wr); + } } + } +} - r - }) - .await?; - +// Gracefully close the write half of the HTTP stream. Note that this does not +// remove the HTTP stream resource from the resource table; it still has to be +// closed with `Deno.core.close()`. +async fn op_http_response_close( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let stream = state + .borrow() + .resource_table + .get::(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + take(&mut *wr); Ok(()) } +async fn op_http_request_read( + state: Rc>, + rid: ResourceId, + mut buf: ZeroCopyBuf, +) -> Result { + let stream = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(body) => break body, + HttpRequestReader::Closed => return Ok(0), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let body = request.into_body().peekable(); + *rd = HttpRequestReader::Body(body); + } + _ => unreachable!(), + }; + }; + + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok(len); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(0), + } + } + }; + + let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await +} + fn op_http_websocket_accept_header( _: &mut OpState, key: String, @@ -612,86 +634,25 @@ async fn op_http_upgrade_websocket( rid: ResourceId, _: (), ) -> Result { - let req_resource = state + let stream = state .borrow_mut() .resource_table - .take::(rid)?; + .get::(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; + let request = match &mut *rd { + HttpRequestReader::Headers(request) => request, + _ => { + return Err(custom_error( + "Http", + "cannot upgrade because request body was used", + )) + } + }; - if let RequestOrStreamReader::Request(req) = inner.as_mut() { - let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; - let stream = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, - None, - ) - .await; - - let (ws_tx, ws_rx) = stream.split(); - let rid = - state - .borrow_mut() - .resource_table - .add(deno_websocket::WsStreamResource { - stream: deno_websocket::WebSocketStreamType::Server { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - }, - cancel: Default::default(), - }); - - Ok(rid) - } else { - Err(bad_resource_id()) - } -} - -type BytesStream = - Pin> + Unpin>>; - -enum RequestOrStreamReader { - Request(Option>), - StreamReader(StreamReader), -} - -struct RequestResource { - conn_rid: ResourceId, - inner: AsyncRefCell, - cancel: CancelHandle, -} - -impl Resource for RequestResource { - fn name(&self) -> Cow { - "request".into() - } - - fn close(self: Rc) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell, - conn_rid: ResourceId, -} - -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow { - "responseBody".into() - } + let transport = hyper::upgrade::on(request).await?; + let ws_rid = ws_create_server_stream(&state, transport).await?; + Ok(ws_rid) } // Needed so hyper can use non Send futures @@ -704,6 +665,6 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); + spawn_local(fut); } } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index d469b5aaf1..ba626a45a4 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -34,12 +34,13 @@ use std::sync::Arc; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::TlsConnector; +use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::{ handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, + protocol::CloseFrame, protocol::Role, Message, }; use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{client_async, WebSocketStream}; +use tokio_tungstenite::WebSocketStream; pub use tokio_tungstenite; // Re-export tokio_tungstenite @@ -72,6 +73,27 @@ pub enum WebSocketStreamType { }, } +pub async fn ws_create_server_stream( + state: &Rc>, + transport: hyper::upgrade::Upgraded, +) -> Result { + let ws_stream = + WebSocketStream::from_raw_socket(transport, Role::Server, None).await; + let (ws_tx, ws_rx) = ws_stream.split(); + + let ws_resource = WsStreamResource { + stream: WebSocketStreamType::Server { + tx: AsyncRefCell::new(ws_tx), + rx: AsyncRefCell::new(ws_rx), + }, + cancel: Default::default(), + }; + + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add(ws_resource); + Ok(rid) +} + pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are diff --git a/runtime/errors.rs b/runtime/errors.rs index fe6e711931..1491161d35 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -17,6 +17,7 @@ use deno_fetch::reqwest; use std::env; use std::error::Error; use std::io; +use std::sync::Arc; fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str { use dlopen::Error::*; @@ -163,6 +164,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .map(get_dlopen_error_class) }) .or_else(|| e.downcast_ref::().map(get_hyper_error_class)) + .or_else(|| { + e.downcast_ref::>() + .map(|e| get_hyper_error_class(&**e)) + }) .or_else(|| { e.downcast_ref::().map(|e| { let io_err: io::Error = e.to_owned().into(); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index 683dc1a576..fddac92612 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -6,6 +6,7 @@ use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; use deno_core::ResourceId; +use deno_http::http_create_conn_resource; use deno_net::io::TcpStreamResource; use deno_net::ops_tls::TlsStreamResource; @@ -29,7 +30,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tcp_stream = read_half.reunite(write_half)?; let addr = tcp_stream.local_addr()?; - return deno_http::start_http(state, tcp_stream, addr, "http"); + return http_create_conn_resource(state, tcp_stream, addr, "http"); } if let Ok(resource_rc) = state @@ -41,7 +42,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tls_stream = read_half.reunite(write_half); let addr = tls_stream.get_ref().0.local_addr()?; - return deno_http::start_http(state, tls_stream, addr, "https"); + return http_create_conn_resource(state, tls_stream, addr, "https"); } Err(bad_resource_id())