From 3df8f1650039e9453056d516744e755d6be8801b Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Sat, 12 Oct 2024 08:20:17 -0700 Subject: [PATCH] refactor(ext/broadcastchannel): use concrete error type (#26105) --- Cargo.lock | 9 +-- ext/broadcast_channel/Cargo.toml | 1 + .../in_memory_broadcast_channel.rs | 13 ++-- ext/broadcast_channel/lib.rs | 70 +++++++++++++++---- runtime/errors.rs | 18 +++++ 5 files changed, 90 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e6b4fb6ac..a166c6f232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1340,6 +1340,7 @@ version = "0.165.0" dependencies = [ "async-trait", "deno_core", + "thiserror", "tokio", "uuid", ] @@ -7153,18 +7154,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", diff --git a/ext/broadcast_channel/Cargo.toml b/ext/broadcast_channel/Cargo.toml index 3caf7b9efd..b19c4ce151 100644 --- a/ext/broadcast_channel/Cargo.toml +++ b/ext/broadcast_channel/Cargo.toml @@ -16,5 +16,6 @@ path = "lib.rs" [dependencies] async-trait.workspace = true deno_core.workspace = true +thiserror.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs index 00b52a9d60..61dc68e17d 100644 --- a/ext/broadcast_channel/in_memory_broadcast_channel.rs +++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use tokio::sync::broadcast; use tokio::sync::mpsc; use uuid::Uuid; use crate::BroadcastChannel; +use crate::BroadcastChannelError; #[derive(Clone)] pub struct InMemoryBroadcastChannel(Arc>>); @@ -41,7 +41,7 @@ impl Default for InMemoryBroadcastChannel { impl BroadcastChannel for InMemoryBroadcastChannel { type Resource = InMemoryBroadcastChannelResource; - fn subscribe(&self) -> Result { + fn subscribe(&self) -> Result { let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); let broadcast_rx = self.0.lock().subscribe(); let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); @@ -53,7 +53,10 @@ impl BroadcastChannel for InMemoryBroadcastChannel { }) } - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError> { Ok(resource.cancel_tx.send(())?) } @@ -62,7 +65,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { resource: &Self::Resource, name: String, data: Vec, - ) -> Result<(), AnyError> { + ) -> Result<(), BroadcastChannelError> { let name = Arc::new(name); let data = Arc::new(data); let uuid = resource.uuid; @@ -73,7 +76,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { async fn recv( &self, resource: &Self::Resource, - ) -> Result, AnyError> { + ) -> Result, BroadcastChannelError> { let mut g = resource.rx.lock().await; let (broadcast_rx, cancel_rx) = &mut *g; loop { diff --git a/ext/broadcast_channel/lib.rs b/ext/broadcast_channel/lib.rs index 47c48656d8..c1de118a36 100644 --- a/ext/broadcast_channel/lib.rs +++ b/ext/broadcast_channel/lib.rs @@ -10,34 +10,69 @@ use std::path::PathBuf; use std::rc::Rc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::op2; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use tokio::sync::broadcast::error::SendError as BroadcastSendError; +use tokio::sync::mpsc::error::SendError as MpscSendError; pub const UNSTABLE_FEATURE_NAME: &str = "broadcast-channel"; +#[derive(Debug, thiserror::Error)] +pub enum BroadcastChannelError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + MPSCSendError(MpscSendError>), + #[error(transparent)] + BroadcastSendError( + BroadcastSendError>, + ), + #[error(transparent)] + Other(deno_core::error::AnyError), +} + +impl From> + for BroadcastChannelError +{ + fn from(value: MpscSendError) -> Self { + BroadcastChannelError::MPSCSendError(MpscSendError(Box::new(value.0))) + } +} +impl From> + for BroadcastChannelError +{ + fn from(value: BroadcastSendError) -> Self { + BroadcastChannelError::BroadcastSendError(BroadcastSendError(Box::new( + value.0, + ))) + } +} + #[async_trait] pub trait BroadcastChannel: Clone { type Resource: Resource; - fn subscribe(&self) -> Result; + fn subscribe(&self) -> Result; - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError>; async fn send( &self, resource: &Self::Resource, name: String, data: Vec, - ) -> Result<(), AnyError>; + ) -> Result<(), BroadcastChannelError>; async fn recv( &self, resource: &Self::Resource, - ) -> Result, AnyError>; + ) -> Result, BroadcastChannelError>; } pub type Message = (String, Vec); @@ -46,7 +81,7 @@ pub type Message = (String, Vec); #[smi] pub fn op_broadcast_subscribe( state: &mut OpState, -) -> Result +) -> Result where BC: BroadcastChannel + 'static, { @@ -62,11 +97,14 @@ where pub fn op_broadcast_unsubscribe( state: &mut OpState, #[smi] rid: ResourceId, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.resource_table.get::(rid)?; + let resource = state + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow::(); bc.unsubscribe(&resource) } @@ -77,11 +115,15 @@ pub async fn op_broadcast_send( #[smi] rid: ResourceId, #[string] name: String, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::(rid)?; + let resource = state + .borrow() + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::().clone(); bc.send(&resource, name, buf.to_vec()).await } @@ -91,11 +133,15 @@ where pub async fn op_broadcast_recv( state: Rc>, #[smi] rid: ResourceId, -) -> Result, AnyError> +) -> Result, BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::(rid)?; + let resource = state + .borrow() + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::().clone(); bc.recv(&resource).await } diff --git a/runtime/errors.rs b/runtime/errors.rs index 694402773e..476aae63b9 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -9,6 +9,7 @@ //! Diagnostics are compile-time type errors, whereas JsErrors are runtime //! exceptions. +use deno_broadcast_channel::BroadcastChannelError; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::url; @@ -153,12 +154,29 @@ pub fn get_nix_error_class(error: &nix::Error) -> &'static str { } } +fn get_broadcast_channel_error(error: &BroadcastChannelError) -> &'static str { + match error { + BroadcastChannelError::Resource(err) => { + deno_core::error::get_custom_error_class(err).unwrap() + } + BroadcastChannelError::MPSCSendError(_) => "Error", + BroadcastChannelError::BroadcastSendError(_) => "Error", + BroadcastChannelError::Other(err) => { + get_error_class_name(err).unwrap_or("Error") + } + } +} + pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { deno_core::error::get_custom_error_class(e) .or_else(|| deno_webgpu::error::get_error_class_name(e)) .or_else(|| deno_web::get_error_class_name(e)) .or_else(|| deno_webstorage::get_not_supported_error_class_name(e)) .or_else(|| deno_websocket::get_network_error_class_name(e)) + .or_else(|| { + e.downcast_ref::() + .map(get_broadcast_channel_error) + }) .or_else(|| { e.downcast_ref::() .map(get_dlopen_error_class)