1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-03 04:48:52 -05:00

feat: support SharedArrayBuffer sharing between workers (#11040)

This commit adds support for sharing SABs between workers.
This commit is contained in:
Luca Casonato 2021-07-06 19:42:52 +02:00 committed by GitHub
parent 672a88f272
commit bdfad23dd0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 152 additions and 0 deletions

View file

@ -123,6 +123,9 @@ fn create_web_worker_callback(
get_error_class_fn: Some(&crate::errors::get_error_class_name),
blob_store: program_state.blob_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
shared_array_buffer_store: Some(
program_state.shared_array_buffer_store.clone(),
),
};
let (mut worker, external_handle) = WebWorker::from_options(
@ -209,6 +212,9 @@ pub fn create_main_worker(
}),
blob_store: program_state.blob_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
shared_array_buffer_store: Some(
program_state.shared_array_buffer_store.clone(),
),
};
let mut worker = MainWorker::from_options(main_module, permissions, &options);

View file

@ -15,6 +15,7 @@ use crate::module_graph::TypeLib;
use crate::source_maps::SourceMapGetter;
use crate::specifier_handler::FetchHandler;
use crate::version;
use deno_core::SharedArrayBufferStore;
use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_web::BlobStore;
use deno_runtime::inspector_server::InspectorServer;
@ -55,6 +56,7 @@ pub struct ProgramState {
pub ca_data: Option<Vec<u8>>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
pub shared_array_buffer_store: SharedArrayBufferStore,
}
impl ProgramState {
@ -81,6 +83,7 @@ impl ProgramState {
let blob_store = BlobStore::default();
let broadcast_channel = InMemoryBroadcastChannel::default();
let shared_array_buffer_store = SharedArrayBufferStore::default();
let file_fetcher = FileFetcher::new(
http_cache,
@ -148,6 +151,7 @@ impl ProgramState {
ca_data,
blob_store,
broadcast_channel,
shared_array_buffer_store,
};
Ok(Arc::new(program_state))
}

View file

@ -248,6 +248,7 @@ pub async fn run(
origin_storage_dir: None,
blob_store,
broadcast_channel,
shared_array_buffer_store: None,
};
let mut worker =
MainWorker::from_options(main_module.clone(), permissions, &options);

View file

@ -0,0 +1,9 @@
self.postMessage("ready");
globalThis.addEventListener("message", (e) => {
const bytes1 = new Uint8Array(e.data[0]);
const bytes2 = new Uint8Array(e.data[1]);
bytes1[0] = 1;
bytes2[0] = 2;
self.postMessage("done");
});

View file

@ -789,6 +789,34 @@ Deno.test({
},
});
Deno.test({
name: "worker SharedArrayBuffer",
fn: async function (): Promise<void> {
const promise = deferred();
const workerOptions: WorkerOptions = { type: "module" };
const w = new Worker(
new URL("shared_array_buffer.ts", import.meta.url).href,
workerOptions,
);
const sab1 = new SharedArrayBuffer(1);
const sab2 = new SharedArrayBuffer(1);
const bytes1 = new Uint8Array(sab1);
const bytes2 = new Uint8Array(sab2);
assertEquals(bytes1[0], 0);
assertEquals(bytes2[0], 0);
w.onmessage = (): void => {
w.postMessage([sab1, sab2]);
w.onmessage = (): void => {
assertEquals(bytes1[0], 1);
assertEquals(bytes2[0], 2);
promise.resolve();
};
};
await promise;
w.terminate();
},
});
Deno.test({
name: "Send MessagePorts from / to workers",
fn: async function (): Promise<void> {

View file

@ -22,7 +22,10 @@ use std::convert::TryInto;
use std::option::Option;
use std::rc::Rc;
use url::Url;
use v8::HandleScope;
use v8::Local;
use v8::MapFnTo;
use v8::SharedArrayBuffer;
lazy_static::lazy_static! {
pub static ref EXTERNAL_REFERENCES: v8::ExternalReferences =
@ -713,6 +716,22 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> {
scope.throw_exception(error);
}
fn get_shared_array_buffer_id<'s>(
&mut self,
scope: &mut HandleScope<'s>,
shared_array_buffer: Local<'s, SharedArrayBuffer>,
) -> Option<u32> {
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow_mut();
if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store {
let backing_store = shared_array_buffer.get_backing_store();
let id = shared_array_buffer_store.insert(backing_store);
Some(id)
} else {
None
}
}
fn write_host_object<'s>(
&mut self,
scope: &mut v8::HandleScope<'s>,
@ -735,6 +754,23 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> {
}
impl<'a> v8::ValueDeserializerImpl for SerializeDeserialize<'a> {
fn get_shared_array_buffer_from_id<'s>(
&mut self,
scope: &mut HandleScope<'s>,
transfer_id: u32,
) -> Option<Local<'s, SharedArrayBuffer>> {
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow_mut();
if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store {
let backing_store = shared_array_buffer_store.take(transfer_id)?;
let shared_array_buffer =
v8::SharedArrayBuffer::with_backing_store(scope, &backing_store);
Some(shared_array_buffer)
} else {
None
}
}
fn read_host_object<'s>(
&mut self,
scope: &mut v8::HandleScope<'s>,

View file

@ -57,6 +57,7 @@ pub use crate::modules::ModuleLoader;
pub use crate::modules::ModuleSource;
pub use crate::modules::ModuleSourceFuture;
pub use crate::modules::NoopModuleLoader;
pub use crate::runtime::SharedArrayBufferStore;
// TODO(bartlomieju): this struct should be implementation
// detail nad not be public
pub use crate::modules::RecursiveModuleLoad;

View file

@ -38,6 +38,8 @@ use std::mem::forget;
use std::option::Option;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Once;
use std::task::Context;
use std::task::Poll;
@ -97,6 +99,36 @@ struct ModEvaluate {
sender: mpsc::Sender<Result<(), AnyError>>,
}
#[derive(Default, Clone)]
pub struct SharedArrayBufferStore(Arc<Mutex<SharedArrayBufferStoreInner>>);
#[derive(Default)]
pub struct SharedArrayBufferStoreInner {
buffers: HashMap<u32, v8::SharedRef<v8::BackingStore>>,
last_id: u32,
}
impl SharedArrayBufferStore {
pub(crate) fn insert(
&self,
backing_store: v8::SharedRef<v8::BackingStore>,
) -> u32 {
let mut buffers = self.0.lock().unwrap();
let last_id = buffers.last_id;
buffers.buffers.insert(last_id, backing_store);
buffers.last_id += 1;
last_id
}
pub(crate) fn take(
&self,
id: u32,
) -> Option<v8::SharedRef<v8::BackingStore>> {
let mut buffers = self.0.lock().unwrap();
buffers.buffers.remove(&id)
}
}
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@ -116,6 +148,7 @@ pub(crate) struct JsRuntimeState {
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
waker: AtomicWaker,
}
@ -204,6 +237,12 @@ pub struct RuntimeOptions {
/// V8 platform instance to use. Used when Deno initializes V8
/// (which it only does once), otherwise it's silenty dropped.
pub v8_platform: Option<v8::SharedRef<v8::Platform>>,
/// The buffer to use for transferring SharedArrayBuffers between isolates.
/// If multiple isolates should have the possibility of sharing
/// SharedArrayBuffers, they should use the same SharedArrayBufferStore. If no
/// SharedArrayBufferStore is specified, SharedArrayBuffer can not be serialized.
pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl JsRuntime {
@ -294,6 +333,7 @@ impl JsRuntime {
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
op_state: op_state.clone(),
have_unpolled_ops: false,
waker: AtomicWaker::new(),

14
main.js Normal file
View file

@ -0,0 +1,14 @@
const worker = new Worker(new URL("./worker.js", import.meta.url), {
type: "module",
});
const sab = new SharedArrayBuffer(1);
console.log(new Uint8Array(sab));
setInterval(() => {
console.log(new Uint8Array(sab));
}, 100);
worker.onmessage = () => {
worker.postMessage(sab);
};

View file

@ -43,6 +43,7 @@ async fn main() -> Result<(), AnyError> {
origin_storage_dir: None,
blob_store: BlobStore::default(),
broadcast_channel: InMemoryBroadcastChannel::default(),
shared_array_buffer_store: None,
};
let js_path =

View file

@ -28,6 +28,7 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_web::create_entangled_message_port;
use deno_web::BlobStore;
use deno_web::MessagePort;
@ -269,6 +270,7 @@ pub struct WebWorkerOptions {
pub get_error_class_fn: Option<GetErrorClassFn>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl WebWorker {
@ -351,6 +353,7 @@ impl WebWorker {
startup_snapshot: Some(js::deno_isolate_init()),
js_error_create_fn: options.js_error_create_fn.clone(),
get_error_class_fn: options.get_error_class_fn,
shared_array_buffer_store: options.shared_array_buffer_store.clone(),
extensions,
..Default::default()
});

View file

@ -22,6 +22,7 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_web::BlobStore;
use log::debug;
use std::env;
@ -70,6 +71,7 @@ pub struct WorkerOptions {
pub origin_storage_dir: Option<std::path::PathBuf>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl MainWorker {
@ -136,6 +138,7 @@ impl MainWorker {
startup_snapshot: Some(js::deno_isolate_init()),
js_error_create_fn: options.js_error_create_fn.clone(),
get_error_class_fn: options.get_error_class_fn,
shared_array_buffer_store: options.shared_array_buffer_store.clone(),
extensions,
..Default::default()
});
@ -300,6 +303,7 @@ mod tests {
origin_storage_dir: None,
blob_store: BlobStore::default(),
broadcast_channel: InMemoryBroadcastChannel::default(),
shared_array_buffer_store: None,
};
MainWorker::from_options(main_module, permissions, &options)

5
worker.js Normal file
View file

@ -0,0 +1,5 @@
self.postMessage("ready");
globalThis.addEventListener("message", (e) => {
new Uint8Array(e.data)[0] = 1;
});