1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

refactor(ext/http): use concrete error types (#26377)

This commit is contained in:
Leo Kettmeir 2024-10-18 15:57:12 -07:00 committed by GitHub
parent 8ca8174c81
commit 2c3900370a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 305 additions and 130 deletions

View file

@ -19,7 +19,6 @@ use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade; use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor; use crate::LocalExecutor;
use cache_control::CacheControl; use cache_control::CacheControl;
use deno_core::error::AnyError;
use deno_core::external; use deno_core::external;
use deno_core::futures::future::poll_fn; use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt; use deno_core::futures::TryFutureExt;
@ -146,12 +145,32 @@ macro_rules! clone_external {
}}; }};
} }
#[derive(Debug, thiserror::Error)]
pub enum HttpNextError {
#[error(transparent)]
Resource(deno_core::error::AnyError),
#[error("{0}")]
Io(#[from] io::Error),
#[error(transparent)]
WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError),
#[error("{0}")]
Hyper(#[from] hyper::Error),
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
Canceled(#[from] deno_core::Canceled),
#[error(transparent)]
HttpPropertyExtractor(deno_core::error::AnyError),
#[error(transparent)]
UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
}
#[op2(fast)] #[op2(fast)]
#[smi] #[smi]
pub fn op_http_upgrade_raw( pub fn op_http_upgrade_raw(
state: &mut OpState, state: &mut OpState,
external: *const c_void, external: *const c_void,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, HttpNextError> {
// SAFETY: external is deleted before calling this op. // SAFETY: external is deleted before calling this op.
let http = unsafe { take_external!(external, "op_http_upgrade_raw") }; let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
@ -177,7 +196,7 @@ pub fn op_http_upgrade_raw(
upgraded.write_all(&bytes).await?; upgraded.write_all(&bytes).await?;
break upgraded; break upgraded;
} }
Err(err) => return Err(err), Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)),
} }
}; };
@ -193,7 +212,7 @@ pub fn op_http_upgrade_raw(
} }
read_tx.write_all(&buf[..read]).await?; read_tx.write_all(&buf[..read]).await?;
} }
Ok::<_, AnyError>(()) Ok::<_, HttpNextError>(())
}); });
spawn(async move { spawn(async move {
let mut buf = [0; 1024]; let mut buf = [0; 1024];
@ -204,7 +223,7 @@ pub fn op_http_upgrade_raw(
} }
upgraded_tx.write_all(&buf[..read]).await?; upgraded_tx.write_all(&buf[..read]).await?;
} }
Ok::<_, AnyError>(()) Ok::<_, HttpNextError>(())
}); });
Ok(()) Ok(())
@ -223,7 +242,7 @@ pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
external: *const c_void, external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>, #[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, HttpNextError> {
let http = let http =
// SAFETY: external is deleted before calling this op. // SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_upgrade_websocket_next") }; unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
@ -690,7 +709,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId, #[smi] stream_rid: ResourceId,
auto_close: bool, auto_close: bool,
status: u16, status: u16,
) -> Result<bool, AnyError> { ) -> Result<bool, HttpNextError> {
let http = let http =
// SAFETY: op is called with external. // SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") }; unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@ -705,9 +724,15 @@ pub async fn op_http_set_response_body_resource(
let resource = { let resource = {
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
if auto_close { if auto_close {
state.resource_table.take_any(stream_rid)? state
.resource_table
.take_any(stream_rid)
.map_err(HttpNextError::Resource)?
} else { } else {
state.resource_table.get_any(stream_rid)? state
.resource_table
.get_any(stream_rid)
.map_err(HttpNextError::Resource)?
} }
}; };
@ -814,17 +839,17 @@ async fn serve_http2_autodetect(
io: impl HttpServeStream, io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> { ) -> Result<(), HttpNextError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?; let (matches, io) = prefix.match_prefix().await?;
if matches { if matches {
serve_http2_unconditional(io, svc, cancel) serve_http2_unconditional(io, svc, cancel)
.await .await
.map_err(|e| e.into()) .map_err(HttpNextError::Hyper)
} else { } else {
serve_http11_unconditional(io, svc, cancel) serve_http11_unconditional(io, svc, cancel)
.await .await
.map_err(|e| e.into()) .map_err(HttpNextError::Hyper)
} }
} }
@ -833,7 +858,7 @@ fn serve_https(
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
lifetime: HttpLifetime, lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime { let HttpLifetime {
server_state, server_state,
connection_cancel_handle, connection_cancel_handle,
@ -852,11 +877,11 @@ fn serve_https(
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() { if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
serve_http2_unconditional(io, svc, listen_cancel_handle) serve_http2_unconditional(io, svc, listen_cancel_handle)
.await .await
.map_err(|e| e.into()) .map_err(HttpNextError::Hyper)
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() { } else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
serve_http11_unconditional(io, svc, listen_cancel_handle) serve_http11_unconditional(io, svc, listen_cancel_handle)
.await .await
.map_err(|e| e.into()) .map_err(HttpNextError::Hyper)
} else { } else {
serve_http2_autodetect(io, svc, listen_cancel_handle).await serve_http2_autodetect(io, svc, listen_cancel_handle).await
} }
@ -870,7 +895,7 @@ fn serve_http(
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
lifetime: HttpLifetime, lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime { let HttpLifetime {
server_state, server_state,
connection_cancel_handle, connection_cancel_handle,
@ -891,7 +916,7 @@ fn serve_http_on<HTTP>(
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
lifetime: HttpLifetime, lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> ) -> JoinHandle<Result<(), HttpNextError>>
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
@ -922,7 +947,7 @@ struct HttpLifetime {
} }
struct HttpJoinHandle { struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>,
connection_cancel_handle: Rc<CancelHandle>, connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>, listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>, rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
@ -982,12 +1007,13 @@ impl Drop for HttpJoinHandle {
pub fn op_http_serve<HTTP>( pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] listener_rid: ResourceId, #[smi] listener_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> ) -> Result<(ResourceId, &'static str, String), HttpNextError>
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
let listener = let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
.map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_listener(&listener)?; let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
@ -1002,7 +1028,8 @@ where
loop { loop {
let conn = HTTP::accept_connection_from_listener(&listener) let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(listen_cancel_clone.clone()) .try_or_cancel(listen_cancel_clone.clone())
.await?; .await
.map_err(HttpNextError::HttpPropertyExtractor)?;
serve_http_on::<HTTP>( serve_http_on::<HTTP>(
conn, conn,
&listen_properties_clone, &listen_properties_clone,
@ -1011,7 +1038,7 @@ where
); );
} }
#[allow(unreachable_code)] #[allow(unreachable_code)]
Ok::<_, AnyError>(()) Ok::<_, HttpNextError>(())
}); });
// Set the handle after we start the future // Set the handle after we start the future
@ -1031,25 +1058,25 @@ where
pub fn op_http_serve_on<HTTP>( pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] connection_rid: ResourceId, #[smi] connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> ) -> Result<(ResourceId, &'static str, String), HttpNextError>
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
let connection = let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)
.map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx)); let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> = let handle = serve_http_on::<HTTP>(
serve_http_on::<HTTP>( connection,
connection, &listen_properties,
&listen_properties, resource.lifetime(),
resource.lifetime(), tx,
tx, );
);
// Set the handle after we start the future // Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle) *RcRef::map(&resource, |this| &this.join_handle)
@ -1095,12 +1122,13 @@ pub fn op_http_try_wait(
pub async fn op_http_wait( pub async fn op_http_wait(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
) -> Result<*const c_void, AnyError> { ) -> Result<*const c_void, HttpNextError> {
// We will get the join handle initially, as we might be consuming requests still // We will get the join handle initially, as we might be consuming requests still
let join_handle = state let join_handle = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
.get::<HttpJoinHandle>(rid)?; .get::<HttpJoinHandle>(rid)
.map_err(HttpNextError::Resource)?;
let cancel = join_handle.listen_cancel_handle(); let cancel = join_handle.listen_cancel_handle();
let next = async { let next = async {
@ -1127,13 +1155,12 @@ pub async fn op_http_wait(
// Filter out shutdown (ENOTCONN) errors // Filter out shutdown (ENOTCONN) errors
if let Err(err) = res { if let Err(err) = res {
if let Some(err) = err.source() { if let HttpNextError::Io(err) = &err {
if let Some(err) = err.downcast_ref::<io::Error>() { if err.kind() == io::ErrorKind::NotConnected {
if err.kind() == io::ErrorKind::NotConnected { return Ok(null());
return Ok(null());
}
} }
} }
return Err(err); return Err(err);
} }
@ -1146,7 +1173,7 @@ pub fn op_http_cancel(
state: &mut OpState, state: &mut OpState,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
graceful: bool, graceful: bool,
) -> Result<(), AnyError> { ) -> Result<(), deno_core::error::AnyError> {
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?; let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
if graceful { if graceful {
@ -1166,11 +1193,12 @@ pub async fn op_http_close(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
graceful: bool, graceful: bool,
) -> Result<(), AnyError> { ) -> Result<(), HttpNextError> {
let join_handle = state let join_handle = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
.take::<HttpJoinHandle>(rid)?; .take::<HttpJoinHandle>(rid)
.map_err(HttpNextError::Resource)?;
if graceful { if graceful {
http_general_trace!("graceful shutdown"); http_general_trace!("graceful shutdown");
@ -1216,23 +1244,26 @@ impl UpgradeStream {
} }
} }
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { async fn read(
self: Rc<Self>,
buf: &mut [u8],
) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async { async {
let read = RcRef::map(self, |this| &this.read); let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await; let mut read = read.borrow_mut().await;
Ok(Pin::new(&mut *read).read(buf).await?) Pin::new(&mut *read).read(buf).await
} }
.try_or_cancel(cancel_handle) .try_or_cancel(cancel_handle)
.await .await
} }
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async { async {
let write = RcRef::map(self, |this| &this.write); let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await; let mut write = write.borrow_mut().await;
Ok(Pin::new(&mut *write).write(buf).await?) Pin::new(&mut *write).write(buf).await
} }
.try_or_cancel(cancel_handle) .try_or_cancel(cancel_handle)
.await .await
@ -1242,7 +1273,7 @@ impl UpgradeStream {
self: Rc<Self>, self: Rc<Self>,
buf1: &[u8], buf1: &[u8],
buf2: &[u8], buf2: &[u8],
) -> Result<usize, AnyError> { ) -> Result<usize, std::io::Error> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await; let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
let total = buf1.len() + buf2.len(); let total = buf1.len() + buf2.len();
@ -1295,9 +1326,12 @@ pub async fn op_raw_write_vectored(
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
#[buffer] buf1: JsBuffer, #[buffer] buf1: JsBuffer,
#[buffer] buf2: JsBuffer, #[buffer] buf2: JsBuffer,
) -> Result<usize, AnyError> { ) -> Result<usize, HttpNextError> {
let resource: Rc<UpgradeStream> = let resource: Rc<UpgradeStream> = state
state.borrow().resource_table.get::<UpgradeStream>(rid)?; .borrow()
.resource_table
.get::<UpgradeStream>(rid)
.map_err(HttpNextError::Resource)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?; let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten) Ok(nwritten)
} }

View file

@ -6,8 +6,6 @@ use async_compression::Level;
use base64::prelude::BASE64_STANDARD; use base64::prelude::BASE64_STANDARD;
use base64::Engine; use base64::Engine;
use cache_control::CacheControl; use cache_control::CacheControl;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::oneshot; use deno_core::futures::channel::oneshot;
use deno_core::futures::future::pending; use deno_core::futures::future::pending;
@ -89,11 +87,14 @@ mod service;
mod websocket_upgrade; mod websocket_upgrade;
use fly_accept_encoding::Encoding; use fly_accept_encoding::Encoding;
pub use http_next::HttpNextError;
pub use request_properties::DefaultHttpPropertyExtractor; pub use request_properties::DefaultHttpPropertyExtractor;
pub use request_properties::HttpConnectionProperties; pub use request_properties::HttpConnectionProperties;
pub use request_properties::HttpListenProperties; pub use request_properties::HttpListenProperties;
pub use request_properties::HttpPropertyExtractor; pub use request_properties::HttpPropertyExtractor;
pub use request_properties::HttpRequestProperties; pub use request_properties::HttpRequestProperties;
pub use service::UpgradeUnavailableError;
pub use websocket_upgrade::WebSocketUpgradeError;
deno_core::extension!( deno_core::extension!(
deno_http, deno_http,
@ -134,6 +135,38 @@ deno_core::extension!(
esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
); );
#[derive(Debug, thiserror::Error)]
pub enum HttpError {
#[error(transparent)]
Resource(deno_core::error::AnyError),
#[error(transparent)]
Canceled(#[from] deno_core::Canceled),
#[error("{0}")]
HyperV014(#[source] Arc<hyper_v014::Error>),
#[error("{0}")]
InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
#[error("{0}")]
InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
#[error("{0}")]
Http(#[from] hyper_v014::http::Error),
#[error("response headers already sent")]
ResponseHeadersAlreadySent,
#[error("connection closed while sending response")]
ConnectionClosedWhileSendingResponse,
#[error("already in use")]
AlreadyInUse,
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("no response headers")]
NoResponseHeaders,
#[error("response already completed")]
ResponseAlreadyCompleted,
#[error("cannot upgrade because request body was used")]
UpgradeBodyUsed,
#[error(transparent)]
Other(deno_core::error::AnyError),
}
pub enum HttpSocketAddr { pub enum HttpSocketAddr {
IpSocket(std::net::SocketAddr), IpSocket(std::net::SocketAddr),
#[cfg(unix)] #[cfg(unix)]
@ -216,7 +249,7 @@ impl HttpConnResource {
String, String,
String, String,
)>, )>,
AnyError, HttpError,
> { > {
let fut = async { let fut = async {
let (request_tx, request_rx) = oneshot::channel(); let (request_tx, request_rx) = oneshot::channel();
@ -259,8 +292,8 @@ impl HttpConnResource {
} }
/// A future that completes when this HTTP connection is closed or errors. /// A future that completes when this HTTP connection is closed or errors.
async fn closed(&self) -> Result<(), AnyError> { async fn closed(&self) -> Result<(), HttpError> {
self.closed_fut.clone().map_err(AnyError::from).await self.closed_fut.clone().map_err(HttpError::HyperV014).await
} }
} }
@ -280,14 +313,13 @@ pub fn http_create_conn_resource<S, A>(
io: S, io: S,
addr: A, addr: A,
scheme: &'static str, scheme: &'static str,
) -> Result<ResourceId, AnyError> ) -> ResourceId
where where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Into<HttpSocketAddr>, A: Into<HttpSocketAddr>,
{ {
let conn = HttpConnResource::new(io, scheme, addr.into()); let conn = HttpConnResource::new(io, scheme, addr.into());
let rid = state.resource_table.add(conn); state.resource_table.add(conn)
Ok(rid)
} }
/// An object that implements the `hyper::Service` trait, through which Hyper /// An object that implements the `hyper::Service` trait, through which Hyper
@ -423,7 +455,9 @@ impl Resource for HttpStreamReadResource {
// safely call `await` on it without creating a race condition. // safely call `await` on it without creating a race condition.
Some(_) => match body.as_mut().next().await.unwrap() { Some(_) => match body.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()), Ok(chunk) => assert!(chunk.is_empty()),
Err(err) => break Err(AnyError::from(err)), Err(err) => {
break Err(HttpError::HyperV014(Arc::new(err)).into())
}
}, },
None => break Ok(BufView::empty()), None => break Ok(BufView::empty()),
} }
@ -545,8 +579,12 @@ struct NextRequestResponse(
async fn op_http_accept( async fn op_http_accept(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
) -> Result<Option<NextRequestResponse>, AnyError> { ) -> Result<Option<NextRequestResponse>, HttpError> {
let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; let conn = state
.borrow()
.resource_table
.get::<HttpConnResource>(rid)
.map_err(HttpError::Resource)?;
match conn.accept().await { match conn.accept().await {
Ok(Some((read_stream, write_stream, method, url))) => { Ok(Some((read_stream, write_stream, method, url))) => {
@ -657,11 +695,12 @@ async fn op_http_write_headers(
#[smi] status: u16, #[smi] status: u16,
#[serde] headers: Vec<(ByteString, ByteString)>, #[serde] headers: Vec<(ByteString, ByteString)>,
#[serde] data: Option<StringOrBuffer>, #[serde] data: Option<StringOrBuffer>,
) -> Result<(), AnyError> { ) -> Result<(), HttpError> {
let stream = state let stream = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
.get::<HttpStreamWriteResource>(rid)?; .get::<HttpStreamWriteResource>(rid)
.map_err(HttpError::Resource)?;
// Track supported encoding // Track supported encoding
let encoding = stream.accept_encoding; let encoding = stream.accept_encoding;
@ -708,14 +747,14 @@ async fn op_http_write_headers(
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let response_tx = match replace(&mut *old_wr, new_wr) { let response_tx = match replace(&mut *old_wr, new_wr) {
HttpResponseWriter::Headers(response_tx) => response_tx, HttpResponseWriter::Headers(response_tx) => response_tx,
_ => return Err(http_error("response headers already sent")), _ => return Err(HttpError::ResponseHeadersAlreadySent),
}; };
match response_tx.send(body) { match response_tx.send(body) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(_) => { Err(_) => {
stream.conn.closed().await?; stream.conn.closed().await?;
Err(http_error("connection closed while sending response")) Err(HttpError::ConnectionClosedWhileSendingResponse)
} }
} }
} }
@ -725,11 +764,14 @@ async fn op_http_write_headers(
fn op_http_headers( fn op_http_headers(
state: &mut OpState, state: &mut OpState,
#[smi] rid: u32, #[smi] rid: u32,
) -> Result<Vec<(ByteString, ByteString)>, AnyError> { ) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?; let stream = state
.resource_table
.get::<HttpStreamReadResource>(rid)
.map_err(HttpError::Resource)?;
let rd = RcRef::map(&stream, |r| &r.rd) let rd = RcRef::map(&stream, |r| &r.rd)
.try_borrow() .try_borrow()
.ok_or_else(|| http_error("already in use"))?; .ok_or(HttpError::AlreadyInUse)?;
match &*rd { match &*rd {
HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())), HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)), HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
@ -741,7 +783,7 @@ fn http_response(
data: Option<StringOrBuffer>, data: Option<StringOrBuffer>,
compressing: bool, compressing: bool,
encoding: Encoding, encoding: Encoding,
) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> { ) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
// Gzip, after level 1, doesn't produce significant size difference. // Gzip, after level 1, doesn't produce significant size difference.
// This default matches nginx default gzip compression level (1): // This default matches nginx default gzip compression level (1):
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
@ -878,25 +920,34 @@ async fn op_http_write_resource(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
#[smi] stream: ResourceId, #[smi] stream: ResourceId,
) -> Result<(), AnyError> { ) -> Result<(), HttpError> {
let http_stream = state let http_stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<HttpStreamWriteResource>(rid)?; .get::<HttpStreamWriteResource>(rid)
.map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?; let resource = state
.borrow()
.resource_table
.get_any(stream)
.map_err(HttpError::Resource)?;
loop { loop {
match *wr { match *wr {
HttpResponseWriter::Headers(_) => { HttpResponseWriter::Headers(_) => {
return Err(http_error("no response headers")) return Err(HttpError::NoResponseHeaders)
} }
HttpResponseWriter::Closed => { HttpResponseWriter::Closed => {
return Err(http_error("response already completed")) return Err(HttpError::ResponseAlreadyCompleted)
} }
_ => {} _ => {}
}; };
let view = resource.clone().read(64 * 1024).await?; // 64KB let view = resource
.clone()
.read(64 * 1024)
.await
.map_err(HttpError::Other)?; // 64KB
if view.is_empty() { if view.is_empty() {
break; break;
} }
@ -937,16 +988,17 @@ async fn op_http_write(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
#[buffer] buf: JsBuffer, #[buffer] buf: JsBuffer,
) -> Result<(), AnyError> { ) -> Result<(), HttpError> {
let stream = state let stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<HttpStreamWriteResource>(rid)?; .get::<HttpStreamWriteResource>(rid)
.map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
match &mut *wr { match &mut *wr {
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
HttpResponseWriter::Closed => Err(http_error("response already completed")), HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
HttpResponseWriter::Body { writer, .. } => { HttpResponseWriter::Body { writer, .. } => {
let mut result = writer.write_all(&buf).await; let mut result = writer.write_all(&buf).await;
if result.is_ok() { if result.is_ok() {
@ -961,7 +1013,7 @@ async fn op_http_write(
stream.conn.closed().await?; stream.conn.closed().await?;
// If there was no connection error, drop body_tx. // If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed; *wr = HttpResponseWriter::Closed;
Err(http_error("response already completed")) Err(HttpError::ResponseAlreadyCompleted)
} }
} }
} }
@ -975,7 +1027,7 @@ async fn op_http_write(
stream.conn.closed().await?; stream.conn.closed().await?;
// If there was no connection error, drop body_tx. // If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed; *wr = HttpResponseWriter::Closed;
Err(http_error("response already completed")) Err(HttpError::ResponseAlreadyCompleted)
} }
} }
} }
@ -989,11 +1041,12 @@ async fn op_http_write(
async fn op_http_shutdown( async fn op_http_shutdown(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
) -> Result<(), AnyError> { ) -> Result<(), HttpError> {
let stream = state let stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<HttpStreamWriteResource>(rid)?; .get::<HttpStreamWriteResource>(rid)
.map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr); let wr = take(&mut *wr);
match wr { match wr {
@ -1022,14 +1075,12 @@ async fn op_http_shutdown(
#[op2] #[op2]
#[string] #[string]
fn op_http_websocket_accept_header( fn op_http_websocket_accept_header(#[string] key: String) -> String {
#[string] key: String,
) -> Result<String, AnyError> {
let digest = ring::digest::digest( let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY, &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(), format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
); );
Ok(BASE64_STANDARD.encode(digest)) BASE64_STANDARD.encode(digest)
} }
#[op2(async)] #[op2(async)]
@ -1037,22 +1088,24 @@ fn op_http_websocket_accept_header(
async fn op_http_upgrade_websocket( async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId, #[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, HttpError> {
let stream = state let stream = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
.get::<HttpStreamReadResource>(rid)?; .get::<HttpStreamReadResource>(rid)
.map_err(HttpError::Resource)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd { let request = match &mut *rd {
HttpRequestReader::Headers(request) => request, HttpRequestReader::Headers(request) => request,
_ => { _ => return Err(HttpError::UpgradeBodyUsed),
return Err(http_error("cannot upgrade because request body was used"))
}
}; };
let (transport, bytes) = let (transport, bytes) = extract_network_stream(
extract_network_stream(hyper_v014::upgrade::on(request).await?); hyper_v014::upgrade::on(request)
.await
.map_err(|err| HttpError::HyperV014(Arc::new(err)))?,
);
Ok(ws_create_server_stream( Ok(ws_create_server_stream(
&mut state.borrow_mut(), &mut state.borrow_mut(),
transport, transport,
@ -1084,10 +1137,6 @@ where
} }
} }
fn http_error(message: &'static str) -> AnyError {
custom_error("Http", message)
}
/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. /// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
fn filter_enotconn( fn filter_enotconn(
result: Result<(), hyper_v014::Error>, result: Result<(), hyper_v014::Error>,

View file

@ -1,9 +1,9 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes; use bytes::Bytes;
use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable; use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream; use deno_core::futures::Stream;
use deno_core::futures::StreamExt; use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
use deno_core::AsyncResult; use deno_core::AsyncResult;
use deno_core::BufView; use deno_core::BufView;
@ -22,7 +22,7 @@ use std::task::Poll;
struct ReadFuture(Incoming); struct ReadFuture(Incoming);
impl Stream for ReadFuture { impl Stream for ReadFuture {
type Item = Result<Bytes, AnyError>; type Item = Result<Bytes, hyper::Error>;
fn poll_next( fn poll_next(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -37,13 +37,13 @@ impl Stream for ReadFuture {
if let Ok(data) = frame.into_data() { if let Ok(data) = frame.into_data() {
// Ensure that we never yield an empty frame // Ensure that we never yield an empty frame
if !data.is_empty() { if !data.is_empty() {
break Poll::Ready(Some(Ok::<_, AnyError>(data))); break Poll::Ready(Some(Ok(data)));
} }
} }
// Loop again so we don't lose the waker // Loop again so we don't lose the waker
continue; continue;
} }
Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None), None => Poll::Ready(None),
}; };
} }
@ -58,7 +58,7 @@ impl HttpRequestBody {
Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint) Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
} }
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> { async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, hyper::Error> {
let peekable = RcRef::map(self, |this| &this.0); let peekable = RcRef::map(self, |this| &this.0);
let mut peekable = peekable.borrow_mut().await; let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable).peek_mut().await { match Pin::new(&mut *peekable).peek_mut().await {
@ -82,7 +82,7 @@ impl Resource for HttpRequestBody {
} }
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(HttpRequestBody::read(self, limit)) Box::pin(HttpRequestBody::read(self, limit).map_err(Into::into))
} }
fn size_hint(&self) -> (u64, Option<u64>) { fn size_hint(&self) -> (u64, Option<u64>) {

View file

@ -2,7 +2,6 @@
use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpConnectionProperties;
use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult; use crate::response_body::ResponseStreamResult;
use deno_core::error::AnyError;
use deno_core::futures::ready; use deno_core::futures::ready;
use deno_core::BufView; use deno_core::BufView;
use deno_core::OpState; use deno_core::OpState;
@ -206,6 +205,10 @@ pub(crate) async fn handle_request(
Ok(response) Ok(response)
} }
#[derive(Debug, thiserror::Error)]
#[error("upgrade unavailable")]
pub struct UpgradeUnavailableError;
struct HttpRecordInner { struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>, server_state: SignallingRc<HttpServerState>,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
@ -344,14 +347,14 @@ impl HttpRecord {
} }
/// Perform the Hyper upgrade on this record. /// Perform the Hyper upgrade on this record.
pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> { pub fn upgrade(&self) -> Result<OnUpgrade, UpgradeUnavailableError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self self
.self_mut() .self_mut()
.request_parts .request_parts
.extensions .extensions
.remove::<OnUpgrade>() .remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable")) .ok_or(UpgradeUnavailableError)
} }
/// Take the Hyper body from this record. /// Take the Hyper body from this record.
@ -515,7 +518,7 @@ pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
impl Body for HttpRecordResponse { impl Body for HttpRecordResponse {
type Data = BufView; type Data = BufView;
type Error = AnyError; type Error = deno_core::error::AnyError;
fn poll_frame( fn poll_frame(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -640,7 +643,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn test_handle_request() -> Result<(), AnyError> { async fn test_handle_request() -> Result<(), deno_core::error::AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10); let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let server_state = HttpServerState::new(); let server_state = HttpServerState::new();
let server_state_check = server_state.clone(); let server_state_check = server_state.clone();

View file

@ -4,7 +4,6 @@ use std::marker::PhantomData;
use bytes::Bytes; use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use deno_core::error::AnyError;
use httparse::Status; use httparse::Status;
use hyper::header::HeaderName; use hyper::header::HeaderName;
use hyper::header::HeaderValue; use hyper::header::HeaderValue;
@ -13,12 +12,30 @@ use memmem::Searcher;
use memmem::TwoWaySearcher; use memmem::TwoWaySearcher;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use crate::http_error; #[derive(Debug, thiserror::Error)]
pub enum WebSocketUpgradeError {
#[error("invalid headers")]
InvalidHeaders,
#[error("{0}")]
HttpParse(#[from] httparse::Error),
#[error("{0}")]
Http(#[from] http::Error),
#[error("{0}")]
Utf8(#[from] std::str::Utf8Error),
#[error("{0}")]
InvalidHeaderName(#[from] http::header::InvalidHeaderName),
#[error("{0}")]
InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
#[error("invalid HTTP status line")]
InvalidHttpStatusLine,
#[error("attempted to write to completed upgrade buffer")]
UpgradeBufferAlreadyCompleted,
}
/// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request<Body>`]. /// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request<Body>`].
fn parse_response<T: Default>( fn parse_response<T: Default>(
header_bytes: &[u8], header_bytes: &[u8],
) -> Result<(usize, Response<T>), AnyError> { ) -> Result<(usize, Response<T>), WebSocketUpgradeError> {
let mut headers = [httparse::EMPTY_HEADER; 16]; let mut headers = [httparse::EMPTY_HEADER; 16];
let status = httparse::parse_headers(header_bytes, &mut headers)?; let status = httparse::parse_headers(header_bytes, &mut headers)?;
match status { match status {
@ -32,7 +49,7 @@ fn parse_response<T: Default>(
} }
Ok((index, resp)) Ok((index, resp))
} }
_ => Err(http_error("invalid headers")), _ => Err(WebSocketUpgradeError::InvalidHeaders),
} }
} }
@ -69,11 +86,14 @@ pub struct WebSocketUpgrade<T: Default> {
impl<T: Default> WebSocketUpgrade<T> { impl<T: Default> WebSocketUpgrade<T> {
/// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js /// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js
/// WebSocket libraries that are known. We don't care about the trailing status text. /// WebSocket libraries that are known. We don't care about the trailing status text.
fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> { fn validate_status(
&self,
status: &[u8],
) -> Result<(), WebSocketUpgradeError> {
if status.starts_with(b"HTTP/1.1 101 ") { if status.starts_with(b"HTTP/1.1 101 ") {
Ok(()) Ok(())
} else { } else {
Err(http_error("invalid HTTP status line")) Err(WebSocketUpgradeError::InvalidHttpStatusLine)
} }
} }
@ -82,7 +102,7 @@ impl<T: Default> WebSocketUpgrade<T> {
pub fn write( pub fn write(
&mut self, &mut self,
bytes: &[u8], bytes: &[u8],
) -> Result<Option<(Response<T>, Bytes)>, AnyError> { ) -> Result<Option<(Response<T>, Bytes)>, WebSocketUpgradeError> {
use WebSocketUpgradeState::*; use WebSocketUpgradeState::*;
match self.state { match self.state {
@ -142,9 +162,7 @@ impl<T: Default> WebSocketUpgrade<T> {
Ok(None) Ok(None)
} }
} }
Complete => { Complete => Err(WebSocketUpgradeError::UpgradeBufferAlreadyCompleted),
Err(http_error("attempted to write to completed upgrade buffer"))
}
} }
} }
} }
@ -157,8 +175,8 @@ mod tests {
type ExpectedResponseAndHead = Option<(Response<Body>, &'static [u8])>; type ExpectedResponseAndHead = Option<(Response<Body>, &'static [u8])>;
fn assert_response( fn assert_response(
result: Result<Option<(Response<Body>, Bytes)>, AnyError>, result: Result<Option<(Response<Body>, Bytes)>, WebSocketUpgradeError>,
expected: Result<ExpectedResponseAndHead, &'static str>, expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
chunk_info: Option<(usize, usize)>, chunk_info: Option<(usize, usize)>,
) { ) {
let formatted = format!("{result:?}"); let formatted = format!("{result:?}");
@ -189,8 +207,8 @@ mod tests {
"Expected Ok(None), was {formatted}", "Expected Ok(None), was {formatted}",
), ),
Err(e) => assert_eq!( Err(e) => assert_eq!(
e, format!("{e:?}"),
result.err().map(|e| format!("{e:?}")).unwrap_or_default(), format!("{:?}", result.unwrap_err()),
"Expected error, was {formatted}", "Expected error, was {formatted}",
), ),
} }
@ -198,7 +216,7 @@ mod tests {
fn validate_upgrade_all_at_once( fn validate_upgrade_all_at_once(
s: &str, s: &str,
expected: Result<ExpectedResponseAndHead, &'static str>, expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) { ) {
let mut upgrade = WebSocketUpgrade::default(); let mut upgrade = WebSocketUpgrade::default();
let res = upgrade.write(s.as_bytes()); let res = upgrade.write(s.as_bytes());
@ -209,7 +227,7 @@ mod tests {
fn validate_upgrade_chunks( fn validate_upgrade_chunks(
s: &str, s: &str,
size: usize, size: usize,
expected: Result<ExpectedResponseAndHead, &'static str>, expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) { ) {
let chunk_info = Some((s.as_bytes().len(), size)); let chunk_info = Some((s.as_bytes().len(), size));
let mut upgrade = WebSocketUpgrade::default(); let mut upgrade = WebSocketUpgrade::default();
@ -226,7 +244,7 @@ mod tests {
fn validate_upgrade( fn validate_upgrade(
s: &str, s: &str,
expected: fn() -> Result<ExpectedResponseAndHead, &'static str>, expected: fn() -> Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) { ) {
validate_upgrade_all_at_once(s, expected()); validate_upgrade_all_at_once(s, expected());
validate_upgrade_chunks(s, 1, expected()); validate_upgrade_chunks(s, 1, expected());
@ -315,7 +333,7 @@ mod tests {
#[test] #[test]
fn upgrade_invalid_status() { fn upgrade_invalid_status() {
validate_upgrade("HTTP/1.1 200 OK\nConnection: Upgrade\n\n", || { validate_upgrade("HTTP/1.1 200 OK\nConnection: Upgrade\n\n", || {
Err("invalid HTTP status line") Err(WebSocketUpgradeError::InvalidHttpStatusLine)
}); });
} }
@ -327,7 +345,11 @@ mod tests {
.join("\n"); .join("\n");
validate_upgrade( validate_upgrade(
&format!("HTTP/1.1 101 Switching Protocols\n{headers}\n\n"), &format!("HTTP/1.1 101 Switching Protocols\n{headers}\n\n"),
|| Err("too many headers"), || {
Err(WebSocketUpgradeError::HttpParse(
httparse::Error::TooManyHeaders,
))
},
); );
} }
} }

View file

@ -29,6 +29,9 @@ use deno_ffi::IRError;
use deno_ffi::ReprError; use deno_ffi::ReprError;
use deno_ffi::StaticError; use deno_ffi::StaticError;
use deno_fs::FsOpsError; use deno_fs::FsOpsError;
use deno_http::HttpError;
use deno_http::HttpNextError;
use deno_http::WebSocketUpgradeError;
use deno_io::fs::FsError; use deno_io::fs::FsError;
use deno_kv::KvCheckError; use deno_kv::KvCheckError;
use deno_kv::KvError; use deno_kv::KvError;
@ -682,6 +685,59 @@ fn get_net_map_error(error: &deno_net::io::MapError) -> &'static str {
} }
} }
fn get_http_error(error: &HttpError) -> &'static str {
match error {
HttpError::Canceled(e) => {
let io_err: io::Error = e.to_owned().into();
get_io_error_class(&io_err)
}
HttpError::HyperV014(e) => get_hyper_v014_error_class(e),
HttpError::InvalidHeaderName(_) => "Error",
HttpError::InvalidHeaderValue(_) => "Error",
HttpError::Http(_) => "Error",
HttpError::ResponseHeadersAlreadySent => "Http",
HttpError::ConnectionClosedWhileSendingResponse => "Http",
HttpError::AlreadyInUse => "Http",
HttpError::Io(e) => get_io_error_class(e),
HttpError::NoResponseHeaders => "Http",
HttpError::ResponseAlreadyCompleted => "Http",
HttpError::UpgradeBodyUsed => "Http",
HttpError::Resource(e) | HttpError::Other(e) => {
get_error_class_name(e).unwrap_or("Error")
}
}
}
fn get_http_next_error(error: &HttpNextError) -> &'static str {
match error {
HttpNextError::Io(e) => get_io_error_class(e),
HttpNextError::WebSocketUpgrade(e) => get_websocket_upgrade_error(e),
HttpNextError::Hyper(e) => get_hyper_error_class(e),
HttpNextError::JoinError(_) => "Error",
HttpNextError::Canceled(e) => {
let io_err: io::Error = e.to_owned().into();
get_io_error_class(&io_err)
}
HttpNextError::UpgradeUnavailable(_) => "Error",
HttpNextError::HttpPropertyExtractor(e) | HttpNextError::Resource(e) => {
get_error_class_name(e).unwrap_or("Error")
}
}
}
fn get_websocket_upgrade_error(error: &WebSocketUpgradeError) -> &'static str {
match error {
WebSocketUpgradeError::InvalidHeaders => "Http",
WebSocketUpgradeError::HttpParse(_) => "Error",
WebSocketUpgradeError::Http(_) => "Error",
WebSocketUpgradeError::Utf8(_) => "Error",
WebSocketUpgradeError::InvalidHeaderName(_) => "Error",
WebSocketUpgradeError::InvalidHeaderValue(_) => "Error",
WebSocketUpgradeError::InvalidHttpStatusLine => "Http",
WebSocketUpgradeError::UpgradeBufferAlreadyCompleted => "Http",
}
}
pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
deno_core::error::get_custom_error_class(e) deno_core::error::get_custom_error_class(e)
.or_else(|| deno_webgpu::error::get_error_class_name(e)) .or_else(|| deno_webgpu::error::get_error_class_name(e))
@ -702,6 +758,12 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.or_else(|| e.downcast_ref::<BlobError>().map(get_web_blob_error_class)) .or_else(|| e.downcast_ref::<BlobError>().map(get_web_blob_error_class))
.or_else(|| e.downcast_ref::<IRError>().map(|_| "TypeError")) .or_else(|| e.downcast_ref::<IRError>().map(|_| "TypeError"))
.or_else(|| e.downcast_ref::<ReprError>().map(get_ffi_repr_error_class)) .or_else(|| e.downcast_ref::<ReprError>().map(get_ffi_repr_error_class))
.or_else(|| e.downcast_ref::<HttpError>().map(get_http_error))
.or_else(|| e.downcast_ref::<HttpNextError>().map(get_http_next_error))
.or_else(|| {
e.downcast_ref::<WebSocketUpgradeError>()
.map(get_websocket_upgrade_error)
})
.or_else(|| e.downcast_ref::<FsOpsError>().map(get_fs_error)) .or_else(|| e.downcast_ref::<FsOpsError>().map(get_fs_error))
.or_else(|| { .or_else(|| {
e.downcast_ref::<DlfcnError>() e.downcast_ref::<DlfcnError>()

View file

@ -34,7 +34,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner(); let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?; let tcp_stream = read_half.reunite(write_half)?;
let addr = tcp_stream.local_addr()?; let addr = tcp_stream.local_addr()?;
return http_create_conn_resource(state, tcp_stream, addr, "http"); return Ok(http_create_conn_resource(state, tcp_stream, addr, "http"));
} }
if let Ok(resource_rc) = state if let Ok(resource_rc) = state
@ -49,7 +49,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner(); let (read_half, write_half) = resource.into_inner();
let tls_stream = read_half.unsplit(write_half); let tls_stream = read_half.unsplit(write_half);
let addr = tls_stream.local_addr()?; let addr = tls_stream.local_addr()?;
return http_create_conn_resource(state, tls_stream, addr, "https"); return Ok(http_create_conn_resource(state, tls_stream, addr, "https"));
} }
#[cfg(unix)] #[cfg(unix)]
@ -65,7 +65,12 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner(); let (read_half, write_half) = resource.into_inner();
let unix_stream = read_half.reunite(write_half)?; let unix_stream = read_half.reunite(write_half)?;
let addr = unix_stream.local_addr()?; let addr = unix_stream.local_addr()?;
return http_create_conn_resource(state, unix_stream, addr, "http+unix"); return Ok(http_create_conn_resource(
state,
unix_stream,
addr,
"http+unix",
));
} }
Err(bad_resource_id()) Err(bad_resource_id())