mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 08:33:43 -05:00
cleanup(ext/http): simplify op_http_request_next (#11691)
* cleanup(ext/http): simplify op_http_request_next Keep op_http_request_next's high-level logic simple, factor out NextRequestResponse building to prepare_next_request() for improved readability & maintainability * cleanup(ext/http): break prepare_next_request() into meaningful sub-funcs
This commit is contained in:
parent
f90231924d
commit
8fa46a7b44
1 changed files with 145 additions and 128 deletions
273
ext/http/lib.rs
273
ext/http/lib.rs
|
@ -182,143 +182,40 @@ async fn op_http_request_next(
|
|||
|
||||
poll_fn(|cx| {
|
||||
conn_resource.deno_service.waker.register(cx.waker());
|
||||
let connection_closed = match conn_resource.poll(cx) {
|
||||
Poll::Pending => false,
|
||||
Poll::Ready(Ok(())) => {
|
||||
// try to close ConnResource, but don't unwrap as it might
|
||||
// already be closed
|
||||
let _ = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<ConnResource>(conn_rid);
|
||||
true
|
||||
}
|
||||
|
||||
// Check if conn is open/close/errored
|
||||
let (conn_closed, conn_result) = match conn_resource.poll(cx) {
|
||||
Poll::Pending => (false, Ok(())),
|
||||
Poll::Ready(Ok(())) => (true, Ok(())),
|
||||
Poll::Ready(Err(e)) => {
|
||||
// TODO(ry) close RequestResource associated with connection
|
||||
// TODO(ry) close ResponseBodyResource associated with connection
|
||||
// try to close ConnResource, but don't unwrap as it might
|
||||
// already be closed
|
||||
let _ = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<ConnResource>(conn_rid);
|
||||
if should_ignore_error(&e) {
|
||||
true
|
||||
(true, Ok(()))
|
||||
} else {
|
||||
return Poll::Ready(Err(e));
|
||||
(true, Err(e))
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(request_resource) =
|
||||
conn_resource.deno_service.inner.borrow_mut().take()
|
||||
{
|
||||
let tx = request_resource.response_tx;
|
||||
let req = request_resource.request;
|
||||
let method = req.method().to_string();
|
||||
// Drop conn resource if closed
|
||||
if conn_closed {
|
||||
// TODO(ry) close RequestResource associated with connection
|
||||
// TODO(ry) close ResponseBodyResource associated with connection
|
||||
// try to close ConnResource, but don't unwrap as it might
|
||||
// already be closed
|
||||
let _ = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<ConnResource>(conn_rid);
|
||||
|
||||
// We treat cookies specially, because we don't want them to get them
|
||||
// mangled by the `Headers` object in JS. What we do is take all cookie
|
||||
// headers and concat them into a single cookie header, seperated by
|
||||
// semicolons.
|
||||
let mut total_cookie_length = 0;
|
||||
let mut cookies = vec![];
|
||||
|
||||
let mut headers = Vec::with_capacity(req.headers().len());
|
||||
for (name, value) in req.headers().iter() {
|
||||
if name == hyper::header::COOKIE {
|
||||
let bytes = value.as_bytes();
|
||||
total_cookie_length += bytes.len();
|
||||
cookies.push(bytes);
|
||||
} else {
|
||||
let name: &[u8] = name.as_ref();
|
||||
let value = value.as_bytes();
|
||||
headers
|
||||
.push((ByteString(name.to_owned()), ByteString(value.to_owned())));
|
||||
}
|
||||
}
|
||||
|
||||
if !cookies.is_empty() {
|
||||
let cookie_count = cookies.len();
|
||||
total_cookie_length += (cookie_count * 2) - 2;
|
||||
let mut bytes = Vec::with_capacity(total_cookie_length);
|
||||
for (i, cookie) in cookies.into_iter().enumerate() {
|
||||
bytes.extend(cookie);
|
||||
if i != cookie_count - 1 {
|
||||
bytes.extend("; ".as_bytes());
|
||||
}
|
||||
}
|
||||
headers.push((
|
||||
ByteString("cookie".as_bytes().to_owned()),
|
||||
ByteString(bytes),
|
||||
));
|
||||
}
|
||||
|
||||
let url = {
|
||||
let scheme = &conn_resource.hyper_connection.scheme;
|
||||
let host: Cow<str> = if let Some(host) = req.uri().host() {
|
||||
Cow::Borrowed(host)
|
||||
} else if let Some(host) = req.headers().get("HOST") {
|
||||
Cow::Borrowed(host.to_str()?)
|
||||
} else {
|
||||
Cow::Owned(conn_resource.hyper_connection.addr.to_string())
|
||||
};
|
||||
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
|
||||
format!("{}://{}{}", scheme, host, path)
|
||||
};
|
||||
|
||||
let is_websocket_request = req
|
||||
.headers()
|
||||
.get_all(hyper::header::CONNECTION)
|
||||
.iter()
|
||||
.any(|v| {
|
||||
v.to_str()
|
||||
.map(|s| s.to_lowercase().contains("upgrade"))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
&& req
|
||||
.headers()
|
||||
.get_all(hyper::header::UPGRADE)
|
||||
.iter()
|
||||
.any(|v| {
|
||||
v.to_str()
|
||||
.map(|s| s.to_lowercase().contains("websocket"))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
let has_body = if let Some(exact_size) = req.size_hint().exact() {
|
||||
exact_size > 0
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let maybe_request_rid = if is_websocket_request || has_body {
|
||||
let mut state = state.borrow_mut();
|
||||
let request_rid = state.resource_table.add(RequestResource {
|
||||
conn_rid,
|
||||
inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
|
||||
cancel: CancelHandle::default(),
|
||||
});
|
||||
Some(request_rid)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Fail with err if unexpected conn error, early return None otherwise
|
||||
return Poll::Ready(conn_result.map(|_| None));
|
||||
}
|
||||
|
||||
if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() {
|
||||
let Conn { scheme, addr, .. } = conn_resource.hyper_connection;
|
||||
let mut state = state.borrow_mut();
|
||||
let response_sender_rid =
|
||||
state.resource_table.add(ResponseSenderResource {
|
||||
sender: tx,
|
||||
conn_rid,
|
||||
});
|
||||
|
||||
Poll::Ready(Ok(Some(NextRequestResponse(
|
||||
maybe_request_rid,
|
||||
response_sender_rid,
|
||||
method,
|
||||
headers,
|
||||
url,
|
||||
))))
|
||||
} else if connection_closed {
|
||||
Poll::Ready(Ok(None))
|
||||
let next =
|
||||
prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?;
|
||||
Poll::Ready(Ok(Some(next)))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
|
@ -328,6 +225,126 @@ async fn op_http_request_next(
|
|||
.map_err(AnyError::from)
|
||||
}
|
||||
|
||||
fn prepare_next_request(
|
||||
state: &mut OpState,
|
||||
conn_rid: ResourceId,
|
||||
request_resource: ServiceInner,
|
||||
scheme: &'static str,
|
||||
addr: SocketAddr,
|
||||
) -> Result<NextRequestResponse, AnyError> {
|
||||
let tx = request_resource.response_tx;
|
||||
let req = request_resource.request;
|
||||
let method = req.method().to_string();
|
||||
let headers = req_headers(&req);
|
||||
let url = req_url(&req, scheme, addr)?;
|
||||
|
||||
let is_websocket = is_websocket_request(&req);
|
||||
let has_body = if let Some(exact_size) = req.size_hint().exact() {
|
||||
exact_size > 0
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let maybe_request_rid = if is_websocket || has_body {
|
||||
let request_rid = state.resource_table.add(RequestResource {
|
||||
conn_rid,
|
||||
inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
|
||||
cancel: CancelHandle::default(),
|
||||
});
|
||||
Some(request_rid)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let response_sender_rid = state.resource_table.add(ResponseSenderResource {
|
||||
sender: tx,
|
||||
conn_rid,
|
||||
});
|
||||
|
||||
Ok(NextRequestResponse(
|
||||
maybe_request_rid,
|
||||
response_sender_rid,
|
||||
method,
|
||||
headers,
|
||||
url,
|
||||
))
|
||||
}
|
||||
|
||||
fn req_url(
|
||||
req: &hyper::Request<hyper::Body>,
|
||||
scheme: &'static str,
|
||||
addr: SocketAddr,
|
||||
) -> Result<String, AnyError> {
|
||||
let host: Cow<str> = if let Some(host) = req.uri().host() {
|
||||
Cow::Borrowed(host)
|
||||
} else if let Some(host) = req.headers().get("HOST") {
|
||||
Cow::Borrowed(host.to_str()?)
|
||||
} else {
|
||||
Cow::Owned(addr.to_string())
|
||||
};
|
||||
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
|
||||
Ok(format!("{}://{}{}", scheme, host, path))
|
||||
}
|
||||
|
||||
fn req_headers(
|
||||
req: &hyper::Request<hyper::Body>,
|
||||
) -> Vec<(ByteString, ByteString)> {
|
||||
// We treat cookies specially, because we don't want them to get them
|
||||
// mangled by the `Headers` object in JS. What we do is take all cookie
|
||||
// headers and concat them into a single cookie header, seperated by
|
||||
// semicolons.
|
||||
let mut total_cookie_length = 0;
|
||||
let mut cookies = vec![];
|
||||
|
||||
let mut headers = Vec::with_capacity(req.headers().len());
|
||||
for (name, value) in req.headers().iter() {
|
||||
if name == hyper::header::COOKIE {
|
||||
let bytes = value.as_bytes();
|
||||
total_cookie_length += bytes.len();
|
||||
cookies.push(bytes);
|
||||
} else {
|
||||
let name: &[u8] = name.as_ref();
|
||||
let value = value.as_bytes();
|
||||
headers.push((ByteString(name.to_owned()), ByteString(value.to_owned())));
|
||||
}
|
||||
}
|
||||
|
||||
if !cookies.is_empty() {
|
||||
let cookie_count = cookies.len();
|
||||
total_cookie_length += (cookie_count * 2) - 2;
|
||||
let mut bytes = Vec::with_capacity(total_cookie_length);
|
||||
for (i, cookie) in cookies.into_iter().enumerate() {
|
||||
bytes.extend(cookie);
|
||||
if i != cookie_count - 1 {
|
||||
bytes.extend("; ".as_bytes());
|
||||
}
|
||||
}
|
||||
headers.push((
|
||||
ByteString("cookie".as_bytes().to_owned()),
|
||||
ByteString(bytes),
|
||||
));
|
||||
}
|
||||
|
||||
headers
|
||||
}
|
||||
|
||||
fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool {
|
||||
req_header_contains(req, hyper::header::CONNECTION, "upgrade")
|
||||
&& req_header_contains(req, hyper::header::UPGRADE, "websocket")
|
||||
}
|
||||
|
||||
fn req_header_contains(
|
||||
req: &hyper::Request<hyper::Body>,
|
||||
key: impl hyper::header::AsHeaderName,
|
||||
value: &str,
|
||||
) -> bool {
|
||||
req.headers().get_all(key).iter().any(|v| {
|
||||
v.to_str()
|
||||
.map(|s| s.to_lowercase().contains(value))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
|
||||
fn should_ignore_error(e: &AnyError) -> bool {
|
||||
if let Some(e) = e.downcast_ref::<hyper::Error>() {
|
||||
use std::error::Error;
|
||||
|
|
Loading…
Reference in a new issue