// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. mod in_memory_broadcast_channel; pub use in_memory_broadcast_channel::InMemoryBroadcastChannel; use async_trait::async_trait; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; #[async_trait] pub trait BroadcastChannel: Clone { type Resource: Resource; fn subscribe(&self) -> Result; fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; async fn send( &self, resource: &Self::Resource, name: String, data: Vec, ) -> Result<(), AnyError>; async fn recv( &self, resource: &Self::Resource, ) -> Result, AnyError>; } pub type Message = (String, Vec); struct Unstable(bool); // --unstable pub fn op_broadcast_subscribe( state: &mut OpState, _args: (), _buf: (), ) -> Result { let unstable = state.borrow::().0; if !unstable { eprintln!( "Unstable API 'BroadcastChannel'. The --unstable flag must be provided.", ); std::process::exit(70); } let bc = state.borrow::(); let resource = bc.subscribe()?; Ok(state.resource_table.add(resource)) } pub fn op_broadcast_unsubscribe( state: &mut OpState, rid: ResourceId, _buf: (), ) -> Result<(), AnyError> { let resource = state .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; let bc = state.borrow::(); bc.unsubscribe(&resource) } pub async fn op_broadcast_send( state: Rc>, (rid, name): (ResourceId, String), buf: ZeroCopyBuf, ) -> Result<(), AnyError> { let resource = state .borrow() .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; let bc = state.borrow().borrow::().clone(); bc.send(&resource, name, buf.to_vec()).await } pub async fn op_broadcast_recv( state: Rc>, rid: ResourceId, _buf: (), ) -> Result, AnyError> { let resource = state .borrow() .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; let bc = state.borrow().borrow::().clone(); bc.recv(&resource).await } pub fn init( bc: BC, unstable: bool, ) -> Extension { Extension::builder() .js(include_js_files!( prefix "deno:extensions/broadcast_channel", "01_broadcast_channel.js", )) .ops(vec![ ( "op_broadcast_subscribe", op_sync(op_broadcast_subscribe::), ), ( "op_broadcast_unsubscribe", op_sync(op_broadcast_unsubscribe::), ), ("op_broadcast_send", op_async(op_broadcast_send::)), ("op_broadcast_recv", op_async(op_broadcast_recv::)), ]) .state(move |state| { state.put(bc.clone()); state.put(Unstable(unstable)); Ok(()) }) .build() } pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("lib.deno_broadcast_channel.d.ts") }