diff --git a/cli/main.rs b/cli/main.rs index a61f945301..eb7b4d3333 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -123,6 +123,9 @@ fn create_web_worker_callback( get_error_class_fn: Some(&crate::errors::get_error_class_name), blob_store: program_state.blob_store.clone(), broadcast_channel: program_state.broadcast_channel.clone(), + shared_array_buffer_store: Some( + program_state.shared_array_buffer_store.clone(), + ), }; let (mut worker, external_handle) = WebWorker::from_options( @@ -209,6 +212,9 @@ pub fn create_main_worker( }), blob_store: program_state.blob_store.clone(), broadcast_channel: program_state.broadcast_channel.clone(), + shared_array_buffer_store: Some( + program_state.shared_array_buffer_store.clone(), + ), }; let mut worker = MainWorker::from_options(main_module, permissions, &options); diff --git a/cli/program_state.rs b/cli/program_state.rs index becc8faa37..5bb60906e6 100644 --- a/cli/program_state.rs +++ b/cli/program_state.rs @@ -15,6 +15,7 @@ use crate::module_graph::TypeLib; use crate::source_maps::SourceMapGetter; use crate::specifier_handler::FetchHandler; use crate::version; +use deno_core::SharedArrayBufferStore; use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel; use deno_runtime::deno_web::BlobStore; use deno_runtime::inspector_server::InspectorServer; @@ -55,6 +56,7 @@ pub struct ProgramState { pub ca_data: Option>, pub blob_store: BlobStore, pub broadcast_channel: InMemoryBroadcastChannel, + pub shared_array_buffer_store: SharedArrayBufferStore, } impl ProgramState { @@ -81,6 +83,7 @@ impl ProgramState { let blob_store = BlobStore::default(); let broadcast_channel = InMemoryBroadcastChannel::default(); + let shared_array_buffer_store = SharedArrayBufferStore::default(); let file_fetcher = FileFetcher::new( http_cache, @@ -148,6 +151,7 @@ impl ProgramState { ca_data, blob_store, broadcast_channel, + shared_array_buffer_store, }; Ok(Arc::new(program_state)) } diff --git a/cli/standalone.rs b/cli/standalone.rs index 74e5de1ca6..c8918563f9 100644 --- a/cli/standalone.rs +++ b/cli/standalone.rs @@ -248,6 +248,7 @@ pub async fn run( origin_storage_dir: None, blob_store, broadcast_channel, + shared_array_buffer_store: None, }; let mut worker = MainWorker::from_options(main_module.clone(), permissions, &options); diff --git a/cli/tests/workers/shared_array_buffer.ts b/cli/tests/workers/shared_array_buffer.ts new file mode 100644 index 0000000000..4af95863a4 --- /dev/null +++ b/cli/tests/workers/shared_array_buffer.ts @@ -0,0 +1,9 @@ +self.postMessage("ready"); + +globalThis.addEventListener("message", (e) => { + const bytes1 = new Uint8Array(e.data[0]); + const bytes2 = new Uint8Array(e.data[1]); + bytes1[0] = 1; + bytes2[0] = 2; + self.postMessage("done"); +}); diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index b37b7aeb14..d35dbec823 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -789,6 +789,34 @@ Deno.test({ }, }); +Deno.test({ + name: "worker SharedArrayBuffer", + fn: async function (): Promise { + const promise = deferred(); + const workerOptions: WorkerOptions = { type: "module" }; + const w = new Worker( + new URL("shared_array_buffer.ts", import.meta.url).href, + workerOptions, + ); + const sab1 = new SharedArrayBuffer(1); + const sab2 = new SharedArrayBuffer(1); + const bytes1 = new Uint8Array(sab1); + const bytes2 = new Uint8Array(sab2); + assertEquals(bytes1[0], 0); + assertEquals(bytes2[0], 0); + w.onmessage = (): void => { + w.postMessage([sab1, sab2]); + w.onmessage = (): void => { + assertEquals(bytes1[0], 1); + assertEquals(bytes2[0], 2); + promise.resolve(); + }; + }; + await promise; + w.terminate(); + }, +}); + Deno.test({ name: "Send MessagePorts from / to workers", fn: async function (): Promise { diff --git a/core/bindings.rs b/core/bindings.rs index 143ccda9b2..d8337322d4 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -22,7 +22,10 @@ use std::convert::TryInto; use std::option::Option; use std::rc::Rc; use url::Url; +use v8::HandleScope; +use v8::Local; use v8::MapFnTo; +use v8::SharedArrayBuffer; lazy_static::lazy_static! { pub static ref EXTERNAL_REFERENCES: v8::ExternalReferences = @@ -713,6 +716,22 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> { scope.throw_exception(error); } + fn get_shared_array_buffer_id<'s>( + &mut self, + scope: &mut HandleScope<'s>, + shared_array_buffer: Local<'s, SharedArrayBuffer>, + ) -> Option { + let state_rc = JsRuntime::state(scope); + let state = state_rc.borrow_mut(); + if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store { + let backing_store = shared_array_buffer.get_backing_store(); + let id = shared_array_buffer_store.insert(backing_store); + Some(id) + } else { + None + } + } + fn write_host_object<'s>( &mut self, scope: &mut v8::HandleScope<'s>, @@ -735,6 +754,23 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> { } impl<'a> v8::ValueDeserializerImpl for SerializeDeserialize<'a> { + fn get_shared_array_buffer_from_id<'s>( + &mut self, + scope: &mut HandleScope<'s>, + transfer_id: u32, + ) -> Option> { + let state_rc = JsRuntime::state(scope); + let state = state_rc.borrow_mut(); + if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store { + let backing_store = shared_array_buffer_store.take(transfer_id)?; + let shared_array_buffer = + v8::SharedArrayBuffer::with_backing_store(scope, &backing_store); + Some(shared_array_buffer) + } else { + None + } + } + fn read_host_object<'s>( &mut self, scope: &mut v8::HandleScope<'s>, diff --git a/core/lib.rs b/core/lib.rs index 4a9a213f48..8c8861c799 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -57,6 +57,7 @@ pub use crate::modules::ModuleLoader; pub use crate::modules::ModuleSource; pub use crate::modules::ModuleSourceFuture; pub use crate::modules::NoopModuleLoader; +pub use crate::runtime::SharedArrayBufferStore; // TODO(bartlomieju): this struct should be implementation // detail nad not be public pub use crate::modules::RecursiveModuleLoad; diff --git a/core/runtime.rs b/core/runtime.rs index 48003c811d..cf43c2adce 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -38,6 +38,8 @@ use std::mem::forget; use std::option::Option; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; use std::sync::Once; use std::task::Context; use std::task::Poll; @@ -97,6 +99,36 @@ struct ModEvaluate { sender: mpsc::Sender>, } +#[derive(Default, Clone)] +pub struct SharedArrayBufferStore(Arc>); + +#[derive(Default)] +pub struct SharedArrayBufferStoreInner { + buffers: HashMap>, + last_id: u32, +} + +impl SharedArrayBufferStore { + pub(crate) fn insert( + &self, + backing_store: v8::SharedRef, + ) -> u32 { + let mut buffers = self.0.lock().unwrap(); + let last_id = buffers.last_id; + buffers.buffers.insert(last_id, backing_store); + buffers.last_id += 1; + last_id + } + + pub(crate) fn take( + &self, + id: u32, + ) -> Option> { + let mut buffers = self.0.lock().unwrap(); + buffers.buffers.remove(&id) + } +} + /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -116,6 +148,7 @@ pub(crate) struct JsRuntimeState { pub(crate) pending_unref_ops: FuturesUnordered, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, + pub(crate) shared_array_buffer_store: Option, waker: AtomicWaker, } @@ -204,6 +237,12 @@ pub struct RuntimeOptions { /// V8 platform instance to use. Used when Deno initializes V8 /// (which it only does once), otherwise it's silenty dropped. pub v8_platform: Option>, + + /// The buffer to use for transferring SharedArrayBuffers between isolates. + /// If multiple isolates should have the possibility of sharing + /// SharedArrayBuffers, they should use the same SharedArrayBufferStore. If no + /// SharedArrayBufferStore is specified, SharedArrayBuffer can not be serialized. + pub shared_array_buffer_store: Option, } impl JsRuntime { @@ -294,6 +333,7 @@ impl JsRuntime { js_error_create_fn, pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), + shared_array_buffer_store: options.shared_array_buffer_store, op_state: op_state.clone(), have_unpolled_ops: false, waker: AtomicWaker::new(), diff --git a/main.js b/main.js new file mode 100644 index 0000000000..feb6c5a1ec --- /dev/null +++ b/main.js @@ -0,0 +1,14 @@ +const worker = new Worker(new URL("./worker.js", import.meta.url), { + type: "module", +}); + +const sab = new SharedArrayBuffer(1); +console.log(new Uint8Array(sab)); + +setInterval(() => { + console.log(new Uint8Array(sab)); +}, 100); + +worker.onmessage = () => { + worker.postMessage(sab); +}; diff --git a/runtime/examples/hello_runtime.rs b/runtime/examples/hello_runtime.rs index 047b8991b8..7078090373 100644 --- a/runtime/examples/hello_runtime.rs +++ b/runtime/examples/hello_runtime.rs @@ -43,6 +43,7 @@ async fn main() -> Result<(), AnyError> { origin_storage_dir: None, blob_store: BlobStore::default(), broadcast_channel: InMemoryBroadcastChannel::default(), + shared_array_buffer_store: None, }; let js_path = diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 55e1070023..f8aadf4c2c 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -28,6 +28,7 @@ use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; +use deno_core::SharedArrayBufferStore; use deno_web::create_entangled_message_port; use deno_web::BlobStore; use deno_web::MessagePort; @@ -269,6 +270,7 @@ pub struct WebWorkerOptions { pub get_error_class_fn: Option, pub blob_store: BlobStore, pub broadcast_channel: InMemoryBroadcastChannel, + pub shared_array_buffer_store: Option, } impl WebWorker { @@ -351,6 +353,7 @@ impl WebWorker { startup_snapshot: Some(js::deno_isolate_init()), js_error_create_fn: options.js_error_create_fn.clone(), get_error_class_fn: options.get_error_class_fn, + shared_array_buffer_store: options.shared_array_buffer_store.clone(), extensions, ..Default::default() }); diff --git a/runtime/worker.rs b/runtime/worker.rs index 555fc89d2e..04c2941461 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -22,6 +22,7 @@ use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; +use deno_core::SharedArrayBufferStore; use deno_web::BlobStore; use log::debug; use std::env; @@ -70,6 +71,7 @@ pub struct WorkerOptions { pub origin_storage_dir: Option, pub blob_store: BlobStore, pub broadcast_channel: InMemoryBroadcastChannel, + pub shared_array_buffer_store: Option, } impl MainWorker { @@ -136,6 +138,7 @@ impl MainWorker { startup_snapshot: Some(js::deno_isolate_init()), js_error_create_fn: options.js_error_create_fn.clone(), get_error_class_fn: options.get_error_class_fn, + shared_array_buffer_store: options.shared_array_buffer_store.clone(), extensions, ..Default::default() }); @@ -300,6 +303,7 @@ mod tests { origin_storage_dir: None, blob_store: BlobStore::default(), broadcast_channel: InMemoryBroadcastChannel::default(), + shared_array_buffer_store: None, }; MainWorker::from_options(main_module, permissions, &options) diff --git a/worker.js b/worker.js new file mode 100644 index 0000000000..c1b8602049 --- /dev/null +++ b/worker.js @@ -0,0 +1,5 @@ +self.postMessage("ready"); + +globalThis.addEventListener("message", (e) => { + new Uint8Array(e.data)[0] = 1; +});