1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-29 16:30:56 -05:00
denoland-deno/ext/node/ops/http2.rs
Rano | Ranadeep 702f7ee89c
fix(std/http2): release window capacity back to remote stream (#24576)
This PR adds logic to release window capacity after reading the chunks
from the stream. Without it, large response (more than `u16::MAX`) may
fill up the capacity and the whole response can't be read.

Closes https://github.com/denoland/deno/issues/24552
Closes https://github.com/denoland/deno/issues/24305
2024-07-15 17:10:59 +05:30

555 lines
14 KiB
Rust

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::task::Poll;
use bytes::Bytes;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use h2;
use h2::Reason;
use h2::RecvStream;
use http;
use http::request::Parts;
use http::HeaderMap;
use http::Response;
use http::StatusCode;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use url::Url;
pub struct Http2Client {
pub client: AsyncRefCell<h2::client::SendRequest<BufView>>,
pub url: Url,
}
impl Resource for Http2Client {
fn name(&self) -> Cow<str> {
"http2Client".into()
}
}
#[derive(Debug)]
pub struct Http2ClientConn {
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream, BufView>>,
cancel_handle: CancelHandle,
}
impl Resource for Http2ClientConn {
fn name(&self) -> Cow<str> {
"http2ClientConnection".into()
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel()
}
}
#[derive(Debug)]
pub struct Http2ClientStream {
pub response: AsyncRefCell<h2::client::ResponseFuture>,
pub stream: AsyncRefCell<h2::SendStream<BufView>>,
}
impl Resource for Http2ClientStream {
fn name(&self) -> Cow<str> {
"http2ClientStream".into()
}
}
#[derive(Debug)]
pub struct Http2ClientResponseBody {
pub body: AsyncRefCell<h2::RecvStream>,
pub trailers_rx:
AsyncRefCell<Option<tokio::sync::oneshot::Receiver<Option<HeaderMap>>>>,
pub trailers_tx:
AsyncRefCell<Option<tokio::sync::oneshot::Sender<Option<HeaderMap>>>>,
}
impl Resource for Http2ClientResponseBody {
fn name(&self) -> Cow<str> {
"http2ClientResponseBody".into()
}
}
#[derive(Debug)]
pub struct Http2ServerConnection {
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, BufView>>,
}
impl Resource for Http2ServerConnection {
fn name(&self) -> Cow<str> {
"http2ServerConnection".into()
}
}
pub struct Http2ServerSendResponse {
pub send_response: AsyncRefCell<h2::server::SendResponse<BufView>>,
}
impl Resource for Http2ServerSendResponse {
fn name(&self) -> Cow<str> {
"http2ServerSendResponse".into()
}
}
#[op2(async)]
#[serde]
pub async fn op_http2_connect(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[string] url: String,
) -> Result<(ResourceId, ResourceId), AnyError> {
// No permission check necessary because we're using an existing connection
let network_stream = {
let mut state = state.borrow_mut();
take_network_stream_resource(&mut state.resource_table, rid)?
};
let url = Url::parse(&url)?;
let (client, conn) =
h2::client::Builder::new().handshake(network_stream).await?;
let mut state = state.borrow_mut();
let client_rid = state.resource_table.add(Http2Client {
client: AsyncRefCell::new(client),
url,
});
let conn_rid = state.resource_table.add(Http2ClientConn {
conn: AsyncRefCell::new(conn),
cancel_handle: CancelHandle::new(),
});
Ok((client_rid, conn_rid))
}
#[op2(async)]
#[smi]
pub async fn op_http2_listen(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let stream =
take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;
let conn = h2::server::Builder::new().handshake(stream).await?;
Ok(
state
.borrow_mut()
.resource_table
.add(Http2ServerConnection {
conn: AsyncRefCell::new(conn),
}),
)
}
#[op2(async)]
#[serde]
pub async fn op_http2_accept(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<
Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>,
AnyError,
> {
let resource = state
.borrow()
.resource_table
.get::<Http2ServerConnection>(rid)?;
let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await;
if let Some(res) = conn.accept().await {
let (req, resp) = res?;
let (parts, body) = req.into_parts();
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
let stm = state
.borrow_mut()
.resource_table
.add(Http2ClientResponseBody {
body: AsyncRefCell::new(body),
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
});
let Parts {
uri,
method,
headers,
..
} = parts;
let mut req_headers = Vec::with_capacity(headers.len() + 4);
req_headers.push((
ByteString::from(":method"),
ByteString::from(method.as_str()),
));
req_headers.push((
ByteString::from(":scheme"),
ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")),
));
req_headers.push((
ByteString::from(":path"),
ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")),
));
req_headers.push((
ByteString::from(":authority"),
ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")),
));
for (key, val) in headers.iter() {
req_headers.push((key.as_str().into(), val.as_bytes().into()));
}
let resp = state
.borrow_mut()
.resource_table
.add(Http2ServerSendResponse {
send_response: AsyncRefCell::new(resp),
});
Ok(Some((req_headers, stm, resp)))
} else {
Ok(None)
}
}
#[op2(async)]
#[serde]
pub async fn op_http2_send_response(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[smi] status: u16,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ServerSendResponse>(rid)?;
let mut send_response = RcRef::map(resource, |r| &r.send_response)
.borrow_mut()
.await;
let mut response = Response::new(());
if let Ok(status) = StatusCode::from_u16(status) {
*response.status_mut() = status;
}
for (name, value) in headers {
response.headers_mut().append(
HeaderName::from_lowercase(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
let stream = send_response.send_response(response, false)?;
let stream_id = stream.stream_id();
Ok((rid, stream_id.into()))
}
#[op2(async)]
pub async fn op_http2_poll_client_connection(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<(), AnyError> {
let resource = state.borrow().resource_table.get::<Http2ClientConn>(rid)?;
let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle);
let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await;
match (&mut *conn).or_cancel(cancel_handle).await {
Ok(result) => result?,
Err(_) => {
// TODO(bartlomieju): probably need a better mechanism for closing the connection
// cancelled
}
}
Ok(())
}
#[op2(async)]
#[serde]
pub async fn op_http2_client_request(
state: Rc<RefCell<OpState>>,
#[smi] client_rid: ResourceId,
// TODO(bartlomieju): maybe use a vector with fixed layout to save sending
// 4 strings of keys?
#[serde] mut pseudo_headers: HashMap<String, String>,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2Client>(client_rid)?;
let url = resource.url.clone();
let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string());
let pseudo_method = pseudo_headers
.remove(":method")
.unwrap_or("GET".to_string());
// TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme)
let _pseudo_authority = pseudo_headers
.remove(":authority")
.unwrap_or("/".to_string());
let _pseudo_scheme = pseudo_headers
.remove(":scheme")
.unwrap_or("http".to_string());
let url = url.join(&pseudo_path)?;
let mut req = http::Request::builder()
.uri(url.as_str())
.method(pseudo_method.as_str());
for (name, value) in headers {
req.headers_mut().unwrap().append(
HeaderName::from_lowercase(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
let request = req.body(()).unwrap();
let resource = {
let state = state.borrow();
state.resource_table.get::<Http2Client>(client_rid)?
};
let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await;
poll_fn(|cx| client.poll_ready(cx)).await?;
let (response, stream) = client.send_request(request, false).unwrap();
let stream_id = stream.stream_id();
let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream {
response: AsyncRefCell::new(response),
stream: AsyncRefCell::new(stream),
});
Ok((stream_rid, stream_id.into()))
}
#[op2(async)]
pub async fn op_http2_client_send_data(
state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId,
#[buffer] data: JsBuffer,
end_of_stream: bool,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
stream.send_data(data.to_vec().into(), end_of_stream)?;
Ok(())
}
#[op2(async)]
pub async fn op_http2_client_reset_stream(
state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId,
#[smi] code: u32,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
stream.send_reset(h2::Reason::from(code));
Ok(())
}
#[op2(async)]
pub async fn op_http2_client_send_trailers(
state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId,
#[serde] trailers: Vec<(ByteString, ByteString)>,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
let mut trailers_map = http::HeaderMap::new();
for (name, value) in trailers {
trailers_map.insert(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
stream.send_trailers(trailers_map)?;
Ok(())
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Http2ClientResponse {
headers: Vec<(ByteString, ByteString)>,
body_rid: ResourceId,
status_code: u16,
}
#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response(
state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId,
) -> Result<(Http2ClientResponse, bool), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut response_future =
RcRef::map(&resource, |r| &r.response).borrow_mut().await;
let response = (&mut *response_future).await?;
let (parts, body) = response.into_parts();
let status = parts.status;
let mut res_headers = Vec::new();
for (key, val) in parts.headers.iter() {
res_headers.push((key.as_str().into(), val.as_bytes().into()));
}
let end_stream = body.is_end_stream();
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
let body_rid =
state
.borrow_mut()
.resource_table
.add(Http2ClientResponseBody {
body: AsyncRefCell::new(body),
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
});
Ok((
Http2ClientResponse {
headers: res_headers,
body_rid,
status_code: status.into(),
},
end_stream,
))
}
enum DataOrTrailers {
Data(Bytes),
Trailers(HeaderMap),
Eof,
}
fn poll_data_or_trailers(
cx: &mut std::task::Context,
body: &mut RecvStream,
) -> Poll<Result<DataOrTrailers, h2::Error>> {
if let Poll::Ready(trailers) = body.poll_trailers(cx) {
if let Some(trailers) = trailers? {
return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers)));
} else {
return Poll::Ready(Ok(DataOrTrailers::Eof));
}
}
if let Poll::Ready(Some(data)) = body.poll_data(cx) {
let data = data?;
body.flow_control().release_capacity(data.len())?;
return Poll::Ready(Ok(DataOrTrailers::Data(data)));
// If `poll_data` returns `Ready(None)`, poll one more time to check for trailers
}
// Return pending here as poll_data will keep the waker
Poll::Pending
}
#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response_body_chunk(
state: Rc<RefCell<OpState>>,
#[smi] body_rid: ResourceId,
) -> Result<(Option<Vec<u8>>, bool, bool), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientResponseBody>(body_rid)?;
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
loop {
let result = poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await;
if let Err(err) = result {
let reason = err.reason();
if let Some(reason) = reason {
if reason == Reason::CANCEL {
return Ok((None, false, true));
}
}
return Err(err.into());
}
match result.unwrap() {
DataOrTrailers::Data(data) => {
return Ok((Some(data.to_vec()), false, false));
}
DataOrTrailers::Trailers(trailers) => {
if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx)
.borrow_mut()
.await
.take()
{
_ = trailers_tx.send(Some(trailers));
};
continue;
}
DataOrTrailers::Eof => {
RcRef::map(&resource, |r| &r.trailers_tx)
.borrow_mut()
.await
.take();
return Ok((None, true, false));
}
};
}
}
#[op2(async)]
#[serde]
pub async fn op_http2_client_get_response_trailers(
state: Rc<RefCell<OpState>>,
#[smi] body_rid: ResourceId,
) -> Result<Option<Vec<(ByteString, ByteString)>>, AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientResponseBody>(body_rid)?;
let trailers = RcRef::map(&resource, |r| &r.trailers_rx)
.borrow_mut()
.await
.take();
if let Some(trailers) = trailers {
if let Ok(Some(trailers)) = trailers.await {
let mut v = Vec::with_capacity(trailers.len());
for (key, value) in trailers.iter() {
v.push((
ByteString::from(key.as_str()),
ByteString::from(value.as_bytes()),
));
}
Ok(Some(v))
} else {
Ok(None)
}
} else {
Ok(None)
}
}