// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use deno_core::error::bad_resource_id;
use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use hyper::body::HttpBody;
use hyper::http;
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::sync::oneshot;
use tokio_util::io::StreamReader;
pub fn get_unstable_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_http.unstable.d.ts")
}
pub fn init() -> Extension {
Extension::builder()
.js(include_js_files!(
prefix "deno:ext/http",
"01_http.js",
))
.ops(vec![
("op_http_request_next", op_async(op_http_request_next)),
("op_http_request_read", op_async(op_http_request_read)),
("op_http_response", op_async(op_http_response)),
("op_http_response_write", op_async(op_http_response_write)),
("op_http_response_close", op_async(op_http_response_close)),
(
"op_http_websocket_accept_header",
op_sync(op_http_websocket_accept_header),
),
(
"op_http_upgrade_websocket",
op_async(op_http_upgrade_websocket),
),
])
.build()
}
struct ServiceInner {
request: Request
,
response_tx: oneshot::Sender>,
}
#[derive(Clone, Default)]
struct Service {
inner: Rc>>,
waker: Rc,
}
impl HyperService> for Service {
type Response = Response;
type Error = http::Error;
#[allow(clippy::type_complexity)]
type Future =
Pin>>>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll> {
if self.inner.borrow().is_some() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: Request) -> Self::Future {
let (resp_tx, resp_rx) = oneshot::channel();
self.inner.borrow_mut().replace(ServiceInner {
request: req,
response_tx: resp_tx,
});
async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
}
}
type ConnFuture = Pin>>>;
struct Conn {
scheme: &'static str,
addr: SocketAddr,
conn: Rc>,
}
struct ConnResource {
hyper_connection: Conn,
deno_service: Service,
cancel: CancelHandle,
}
impl ConnResource {
// TODO(ry) impl Future for ConnResource?
fn poll(&self, cx: &mut Context<'_>) -> Poll> {
self
.hyper_connection
.conn
.borrow_mut()
.poll_unpin(cx)
.map_err(AnyError::from)
}
}
impl Resource for ConnResource {
fn name(&self) -> Cow {
"httpConnection".into()
}
fn close(self: Rc) {
self.cancel.cancel()
}
}
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
// request_rid:
Option,
// response_sender_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
// the method as a str which is guaranteed to be ASCII-only.
String,
// headers:
Vec<(ByteString, ByteString)>,
// url:
String,
);
async fn op_http_request_next(
state: Rc>,
conn_rid: ResourceId,
_: (),
) -> Result, AnyError> {
let conn_resource = state
.borrow()
.resource_table
.get::(conn_rid)?;
let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
poll_fn(|cx| {
conn_resource.deno_service.waker.register(cx.waker());
// Check if conn is open/close/errored
let (conn_closed, conn_result) = match conn_resource.poll(cx) {
Poll::Pending => (false, Ok(())),
Poll::Ready(Ok(())) => (true, Ok(())),
Poll::Ready(Err(e)) => {
if should_ignore_error(&e) {
(true, Ok(()))
} else {
(true, Err(e))
}
}
};
// Drop conn resource if closed
if conn_closed {
// TODO(ry) close RequestResource associated with connection
// TODO(ry) close ResponseBodyResource associated with connection
// try to close ConnResource, but don't unwrap as it might
// already be closed
let _ = state
.borrow_mut()
.resource_table
.take::(conn_rid);
// Fail with err if unexpected conn error, early return None otherwise
return Poll::Ready(conn_result.map(|_| None));
}
if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() {
let Conn { scheme, addr, .. } = conn_resource.hyper_connection;
let mut state = state.borrow_mut();
let next =
prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?;
Poll::Ready(Ok(Some(next)))
} else {
Poll::Pending
}
})
.try_or_cancel(cancel)
.await
.map_err(AnyError::from)
}
fn prepare_next_request(
state: &mut OpState,
conn_rid: ResourceId,
request_resource: ServiceInner,
scheme: &'static str,
addr: SocketAddr,
) -> Result {
let tx = request_resource.response_tx;
let req = request_resource.request;
let method = req.method().to_string();
let headers = req_headers(&req);
let url = req_url(&req, scheme, addr)?;
let is_websocket = is_websocket_request(&req);
let has_body = if let Some(exact_size) = req.size_hint().exact() {
exact_size > 0
} else {
true
};
let maybe_request_rid = if is_websocket || has_body {
let request_rid = state.resource_table.add(RequestResource {
conn_rid,
inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
cancel: CancelHandle::default(),
});
Some(request_rid)
} else {
None
};
let response_sender_rid = state.resource_table.add(ResponseSenderResource {
sender: tx,
conn_rid,
});
Ok(NextRequestResponse(
maybe_request_rid,
response_sender_rid,
method,
headers,
url,
))
}
fn req_url(
req: &hyper::Request,
scheme: &'static str,
addr: SocketAddr,
) -> Result {
let host: Cow = if let Some(host) = req.uri().host() {
Cow::Borrowed(host)
} else if let Some(host) = req.headers().get("HOST") {
Cow::Borrowed(host.to_str()?)
} else {
Cow::Owned(addr.to_string())
};
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
Ok([scheme, "://", &host, path].concat())
}
fn req_headers(
req: &hyper::Request,
) -> Vec<(ByteString, ByteString)> {
// We treat cookies specially, because we don't want them to get them
// mangled by the `Headers` object in JS. What we do is take all cookie
// headers and concat them into a single cookie header, seperated by
// semicolons.
let cookie_sep = "; ".as_bytes();
let mut cookies = vec![];
let mut headers = Vec::with_capacity(req.headers().len());
for (name, value) in req.headers().iter() {
if name == hyper::header::COOKIE {
cookies.push(value.as_bytes());
} else {
let name: &[u8] = name.as_ref();
let value = value.as_bytes();
headers.push((ByteString(name.to_owned()), ByteString(value.to_owned())));
}
}
if !cookies.is_empty() {
headers.push((
ByteString("cookie".as_bytes().to_owned()),
ByteString(cookies.join(cookie_sep)),
));
}
headers
}
fn is_websocket_request(req: &hyper::Request) -> bool {
req_header_contains(req, hyper::header::CONNECTION, "upgrade")
&& req_header_contains(req, hyper::header::UPGRADE, "websocket")
}
fn req_header_contains(
req: &hyper::Request,
key: impl hyper::header::AsHeaderName,
value: &str,
) -> bool {
req.headers().get_all(key).iter().any(|v| {
v.to_str()
.map(|s| s.to_lowercase().contains(value))
.unwrap_or(false)
})
}
fn should_ignore_error(e: &AnyError) -> bool {
if let Some(e) = e.downcast_ref::() {
use std::error::Error;
if let Some(std_err) = e.source() {
if let Some(io_err) = std_err.downcast_ref::() {
if io_err.kind() == std::io::ErrorKind::NotConnected {
return true;
}
}
}
}
false
}
pub fn start_http(
state: &mut OpState,
io: IO,
addr: SocketAddr,
scheme: &'static str,
) -> Result {
let deno_service = Service::default();
let hyper_connection = Http::new()
.with_executor(LocalExecutor)
.serve_connection(io, deno_service.clone())
.with_upgrades();
let conn = Pin::new(Box::new(hyper_connection));
let conn_resource = ConnResource {
hyper_connection: Conn {
scheme,
addr,
conn: Rc::new(RefCell::new(conn)),
},
deno_service,
cancel: CancelHandle::default(),
};
let rid = state.resource_table.add(conn_resource);
Ok(rid)
}
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Deserialize)]
struct RespondArgs(
// rid:
u32,
// status:
u16,
// headers:
Vec<(ByteString, ByteString)>,
);
async fn op_http_response(
state: Rc>,
args: RespondArgs,
data: Option,
) -> Result, AnyError> {
let RespondArgs(rid, status, headers) = args;
let response_sender = state
.borrow_mut()
.resource_table
.take::(rid)?;
let response_sender = Rc::try_unwrap(response_sender)
.ok()
.expect("multiple op_http_respond ongoing");
let conn_rid = response_sender.conn_rid;
let conn_resource = state
.borrow()
.resource_table
.get::(conn_rid)?;
let mut builder = Response::builder().status(status);
builder.headers_mut().unwrap().reserve(headers.len());
for (key, value) in &headers {
builder = builder.header(key.as_ref(), value.as_ref());
}
let res;
let maybe_response_body_rid = if let Some(d) = data {
// If a body is passed, we use it, and don't return a body for streaming.
res = builder.body(Vec::from(&*d).into())?;
None
} else {
// If no body is passed, we return a writer for streaming the body.
let (sender, body) = Body::channel();
res = builder.body(body)?;
let response_body_rid =
state.borrow_mut().resource_table.add(ResponseBodyResource {
body: AsyncRefCell::new(sender),
conn_rid,
});
Some(response_body_rid)
};
// oneshot::Sender::send(v) returns |v| on error, not an error object.
// The only failure mode is the receiver already having dropped its end
// of the channel.
if response_sender.sender.send(res).is_err() {
if let Some(rid) = maybe_response_body_rid {
let _ = state
.borrow_mut()
.resource_table
.take::(rid);
}
return Err(type_error("internal communication error"));
}
let result = poll_fn(|cx| match conn_resource.poll(cx) {
Poll::Ready(x) => {
state.borrow_mut().resource_table.close(conn_rid).ok();
Poll::Ready(x)
}
Poll::Pending => Poll::Ready(Ok(())),
})
.await;
if let Err(e) = result {
if let Some(rid) = maybe_response_body_rid {
let _ = state
.borrow_mut()
.resource_table
.take::(rid);
}
return Err(e);
}
if maybe_response_body_rid.is_none() {
conn_resource.deno_service.waker.wake();
}
Ok(maybe_response_body_rid)
}
async fn op_http_response_close(
state: Rc>,
rid: ResourceId,
_: (),
) -> Result<(), AnyError> {
let resource = state
.borrow_mut()
.resource_table
.take::(rid)?;
let conn_resource = state
.borrow()
.resource_table
.get::(resource.conn_rid)?;
drop(resource);
let r = poll_fn(|cx| match conn_resource.poll(cx) {
Poll::Ready(x) => Poll::Ready(x),
Poll::Pending => Poll::Ready(Ok(())),
})
.await;
conn_resource.deno_service.waker.wake();
r
}
async fn op_http_request_read(
state: Rc>,
rid: ResourceId,
data: Option,
) -> Result {
let mut data = data.ok_or_else(null_opbuf)?;
let resource = state
.borrow()
.resource_table
.get::(rid as u32)?;
let conn_resource = state
.borrow()
.resource_table
.get::(resource.conn_rid)?;
let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
.borrow_mut()
.await;
if let RequestOrStreamReader::Request(req) = &mut *inner {
let req = req.take().unwrap();
let stream: BytesStream = Box::pin(req.into_body().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
let reader = StreamReader::new(stream);
*inner = RequestOrStreamReader::StreamReader(reader);
};
let reader = match &mut *inner {
RequestOrStreamReader::StreamReader(reader) => reader,
_ => unreachable!(),
};
let cancel = RcRef::map(resource, |r| &r.cancel);
let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
poll_fn(|cx| {
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
// close ConnResource
// close RequestResource associated with connection
// close ResponseBodyResource associated with connection
return Poll::Ready(Err(e));
}
read_fut.poll_unpin(cx).map_err(AnyError::from)
})
.await
}
async fn op_http_response_write(
state: Rc>,
rid: ResourceId,
data: Option,
) -> Result<(), AnyError> {
let buf = data.ok_or_else(null_opbuf)?;
let resource = state
.borrow()
.resource_table
.get::(rid as u32)?;
let conn_resource = state
.borrow()
.resource_table
.get::(resource.conn_rid)?;
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
poll_fn(|cx| {
let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
// Poll connection so the data is flushed
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
// close ConnResource
// close RequestResource associated with connection
// close ResponseBodyResource associated with connection
return Poll::Ready(Err(e));
}
r
})
.await?;
Ok(())
}
fn op_http_websocket_accept_header(
_: &mut OpState,
key: String,
_: (),
) -> Result {
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(),
);
Ok(base64::encode(digest))
}
async fn op_http_upgrade_websocket(
state: Rc>,
rid: ResourceId,
_: (),
) -> Result {
let req_resource = state
.borrow_mut()
.resource_table
.take::(rid)?;
let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
if let RequestOrStreamReader::Request(req) = inner.as_mut() {
let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
let stream =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
None,
)
.await;
let (ws_tx, ws_rx) = stream.split();
let rid =
state
.borrow_mut()
.resource_table
.add(deno_websocket::WsStreamResource {
stream: deno_websocket::WebSocketStreamType::Server {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
},
cancel: Default::default(),
});
Ok(rid)
} else {
Err(bad_resource_id())
}
}
type BytesStream =
Pin> + Unpin>>;
enum RequestOrStreamReader {
Request(Option>),
StreamReader(StreamReader),
}
struct RequestResource {
conn_rid: ResourceId,
inner: AsyncRefCell,
cancel: CancelHandle,
}
impl Resource for RequestResource {
fn name(&self) -> Cow {
"request".into()
}
fn close(self: Rc) {
self.cancel.cancel()
}
}
struct ResponseSenderResource {
sender: oneshot::Sender>,
conn_rid: ResourceId,
}
impl Resource for ResponseSenderResource {
fn name(&self) -> Cow {
"responseSender".into()
}
}
struct ResponseBodyResource {
body: AsyncRefCell,
conn_rid: ResourceId,
}
impl Resource for ResponseBodyResource {
fn name(&self) -> Cow {
"responseBody".into()
}
}
// Needed so hyper can use non Send futures
#[derive(Clone)]
struct LocalExecutor;
impl hyper::rt::Executor for LocalExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn_local(fut);
}
}