// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use deno_core::error::bad_resource_id;
use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use hyper::body::HttpBody;
use hyper::http;
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::sync::oneshot;
use tokio_util::io::StreamReader;
pub fn get_unstable_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_http.unstable.d.ts")
}
pub fn init() -> Extension {
Extension::builder()
.js(include_js_files!(
prefix "deno:ext/http",
"01_http.js",
))
.ops(vec![
("op_http_request_next", op_async(op_http_request_next)),
("op_http_request_read", op_async(op_http_request_read)),
("op_http_response", op_async(op_http_response)),
("op_http_response_write", op_async(op_http_response_write)),
("op_http_response_close", op_async(op_http_response_close)),
(
"op_http_websocket_accept_header",
op_sync(op_http_websocket_accept_header),
),
(
"op_http_upgrade_websocket",
op_async(op_http_upgrade_websocket),
),
])
.build()
}
struct ServiceInner {
request: Request
,
response_tx: oneshot::Sender>,
}
#[derive(Clone, Default)]
struct Service {
inner: Rc>>,
waker: Rc,
}
impl HyperService> for Service {
type Response = Response;
type Error = http::Error;
#[allow(clippy::type_complexity)]
type Future =
Pin>>>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll> {
if self.inner.borrow().is_some() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: Request) -> Self::Future {
let (resp_tx, resp_rx) = oneshot::channel();
self.inner.borrow_mut().replace(ServiceInner {
request: req,
response_tx: resp_tx,
});
async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
}
}
type ConnFuture = Pin>>>;
struct Conn {
scheme: &'static str,
addr: SocketAddr,
conn: Rc>,
}
struct ConnResource {
hyper_connection: Conn,
deno_service: Service,
cancel: CancelHandle,
}
impl ConnResource {
// TODO(ry) impl Future for ConnResource?
fn poll(&self, cx: &mut Context<'_>) -> Poll> {
self
.hyper_connection
.conn
.borrow_mut()
.poll_unpin(cx)
.map_err(AnyError::from)
}
}
impl Resource for ConnResource {
fn name(&self) -> Cow {
"httpConnection".into()
}
fn close(self: Rc) {
self.cancel.cancel()
}
}
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
// request_rid:
Option,
// response_sender_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
// the method as a str which is guaranteed to be ASCII-only.
String,
// headers:
Vec<(ByteString, ByteString)>,
// url:
String,
);
async fn op_http_request_next(
state: Rc>,
conn_rid: ResourceId,
_: (),
) -> Result