1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

refactor(core): Turn the wasm_streaming_feed binding into ops (#11985)

Async WebAssembly compilation was implemented by adding two
bindings: `set_wasm_streaming_callback`, which registered a callback to
be called whenever a streaming wasm compilation was started, and
`wasm_streaming_feed`, which let the JS callback modify the state of the
v8 wasm compiler.

`set_wasm_streaming_callback` cannot currently be implemented as
anything other than a binding, but `wasm_streaming_feed` does not really
need to use anything specific to bindings, and could indeed be
implemented as one or more ops. This PR does that, resulting in a
simplification of the relevant code.

There are three operations on the state of the v8 wasm compiler that
`wasm_streaming_feed` allowed: feeding new bytes into the compiler,
letting it know that there are no more bytes coming from the network,
and aborting the compilation. This PR provides `op_wasm_streaming_feed`
to feed new bytes into the compiler, and `op_wasm_streaming_abort` to
abort the compilation. It doesn't provide an op to let v8 know that the
response is finished, but closing the resource with `Deno.core.close()`
will achieve that.
This commit is contained in:
Andreu Botella 2021-09-13 14:27:54 +02:00 committed by GitHub
parent a95ca9dc70
commit 4d6f412b0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 92 deletions

View file

@ -9,7 +9,6 @@ use crate::OpId;
use crate::OpPayload;
use crate::OpTable;
use crate::PromiseId;
use crate::ResourceId;
use crate::ZeroCopyBuf;
use log::debug;
use rusty_v8 as v8;
@ -20,7 +19,6 @@ use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::option::Option;
use std::rc::Rc;
use url::Url;
use v8::HandleScope;
use v8::Local;
@ -73,9 +71,6 @@ lazy_static::lazy_static! {
},
v8::ExternalReference {
function: set_wasm_streaming_callback.map_fn_to()
},
v8::ExternalReference {
function: wasm_streaming_feed.map_fn_to()
}
]);
}
@ -160,8 +155,6 @@ pub fn initialize_context<'s>(
"setWasmStreamingCallback",
set_wasm_streaming_callback,
);
set_func(scope, core_val, "wasmStreamingFeed", wasm_streaming_feed);
// Direct bindings on `window`.
set_func(scope, global, "queueMicrotask", queue_microtask);
@ -535,14 +528,13 @@ fn call_console(
deno_console_method.call(scope, receiver.into(), &call_args);
}
struct WasmStreamingResource(RefCell<v8::WasmStreaming>);
impl crate::Resource for WasmStreamingResource {}
fn set_wasm_streaming_callback(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,
_rv: v8::ReturnValue,
) {
use crate::ops_builtin::WasmStreamingResource;
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
@ -584,67 +576,6 @@ fn set_wasm_streaming_callback(
});
}
fn wasm_streaming_feed(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,
_rv: v8::ReturnValue,
) {
#[derive(Deserialize)]
#[serde(rename_all = "snake_case")]
enum MessageType {
Bytes,
Abort,
Finish,
}
let rid: ResourceId = match serde_v8::from_v8(scope, args.get(0)) {
Ok(rid) => rid,
Err(_) => return throw_type_error(scope, "Invalid argument"),
};
let message_type = match serde_v8::from_v8(scope, args.get(1)) {
Ok(message_type) => message_type,
Err(_) => return throw_type_error(scope, "Invalid argument"),
};
let wasm_streaming = {
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow();
// If message_type is not Bytes, we'll be consuming the WasmStreaming
// instance, so let's also remove it from the resource table.
let wasm_streaming: Result<Rc<WasmStreamingResource>, AnyError> =
match message_type {
MessageType::Bytes => state.op_state.borrow().resource_table.get(rid),
_ => state.op_state.borrow_mut().resource_table.take(rid),
};
match wasm_streaming {
Ok(wasm_streaming) => wasm_streaming,
Err(e) => return throw_type_error(scope, e.to_string()),
}
};
match message_type {
MessageType::Bytes => {
let bytes: ZeroCopyBuf = match serde_v8::from_v8(scope, args.get(2)) {
Ok(bytes) => bytes,
Err(_) => return throw_type_error(scope, "Invalid resource ID."),
};
wasm_streaming.0.borrow_mut().on_bytes_received(&bytes);
}
_ => {
// These types need to consume the WasmStreaming instance.
let wasm_streaming = match Rc::try_unwrap(wasm_streaming) {
Ok(streaming) => streaming.0.into_inner(),
Err(_) => panic!("Couldn't consume WasmStreamingResource."),
};
match message_type {
MessageType::Bytes => unreachable!(),
MessageType::Finish => wasm_streaming.finish(),
MessageType::Abort => wasm_streaming.abort(Some(args.get(2))),
}
}
}
}
fn encode(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,

View file

@ -56,25 +56,18 @@ declare namespace Deno {
* (`WebAssembly.compileStreaming` and `WebAssembly.instantiateStreaming`)
* are called in order to feed the source's bytes to the wasm compiler.
* The callback is called with the source argument passed to the streaming
* APIs and an rid to use with `Deno.core.wasmStreamingFeed`.
* APIs and an rid to use with the wasm streaming ops.
*
* The callback should eventually invoke the following ops:
* - `op_wasm_streaming_feed`. Feeds bytes from the wasm resource to the
* compiler. Takes the rid and a `Uint8Array`.
* - `op_wasm_streaming_abort`. Aborts the wasm compilation. Takes the rid
* and an exception. Invalidates the resource.
* - To indicate the end of the resource, use `Deno.core.close()` with the
* rid.
*/
function setWasmStreamingCallback(
cb: (source: any, rid: number) => void,
): void;
/**
* Affect the state of the WebAssembly streaming compiler, by either passing
* it bytes, aborting it, or indicating that all bytes were received.
* `rid` must be a resource ID that was passed to the callback set with
* `Deno.core.setWasmStreamingCallback`. Calling this function with `type`
* set to either "abort" or "finish" invalidates the rid.
*/
function wasmStreamingFeed(
rid: number,
type: "bytes",
bytes: Uint8Array,
): void;
function wasmStreamingFeed(rid: number, type: "abort", error: any): void;
function wasmStreamingFeed(rid: number, type: "finish"): void;
}
}

View file

@ -5,7 +5,11 @@ use crate::op_sync;
use crate::resources::ResourceId;
use crate::Extension;
use crate::OpState;
use crate::Resource;
use crate::ZeroCopyBuf;
use std::cell::RefCell;
use std::io::{stderr, stdout, Write};
use std::rc::Rc;
pub(crate) fn init_builtins() -> Extension {
Extension::builder()
@ -20,6 +24,8 @@ pub(crate) fn init_builtins() -> Extension {
("op_try_close", op_sync(op_try_close)),
("op_print", op_sync(op_print)),
("op_resources", op_sync(op_resources)),
("op_wasm_streaming_feed", op_sync(op_wasm_streaming_feed)),
("op_wasm_streaming_abort", op_sync(op_wasm_streaming_abort)),
])
.build()
}
@ -81,3 +87,53 @@ pub fn op_print(
}
Ok(())
}
pub struct WasmStreamingResource(pub(crate) RefCell<rusty_v8::WasmStreaming>);
impl Resource for WasmStreamingResource {
fn close(self: Rc<Self>) {
// At this point there are no clones of Rc<WasmStreamingResource> on the
// resource table, and no one should own a reference outside of the stack.
// Therefore, we can be sure `self` is the only reference.
if let Ok(wsr) = Rc::try_unwrap(self) {
wsr.0.into_inner().finish();
} else {
panic!("Couldn't consume WasmStreamingResource.");
}
}
}
/// Feed bytes to WasmStreamingResource.
pub fn op_wasm_streaming_feed(
state: &mut OpState,
rid: ResourceId,
bytes: ZeroCopyBuf,
) -> Result<(), AnyError> {
let wasm_streaming =
state.resource_table.get::<WasmStreamingResource>(rid)?;
wasm_streaming.0.borrow_mut().on_bytes_received(&bytes);
Ok(())
}
/// Abort a WasmStreamingResource.
pub fn op_wasm_streaming_abort(
state: &mut OpState,
rid: ResourceId,
exception: serde_v8::Value,
) -> Result<(), AnyError> {
let wasm_streaming =
state.resource_table.take::<WasmStreamingResource>(rid)?;
// At this point there are no clones of Rc<WasmStreamingResource> on the
// resource table, and no one should own a reference because we're never
// cloning them. So we can be sure `wasm_streaming` is the only reference.
if let Ok(wsr) = Rc::try_unwrap(wasm_streaming) {
wsr.0.into_inner().abort(Some(exception.v8_value));
} else {
panic!("Couldn't consume WasmStreamingResource.");
}
Ok(())
}

View file

@ -505,8 +505,8 @@
*
* @param {any} source The source parameter that the WebAssembly
* streaming API was called with.
* @param {number} rid An rid that can be used with
* `Deno.core.wasmStreamingFeed`.
* @param {number} rid An rid that represents the wasm streaming
* resource.
*/
function handleWasmStreaming(source, rid) {
// This implements part of
@ -543,15 +543,15 @@
while (true) {
const { value: chunk, done } = await reader.read();
if (done) break;
core.wasmStreamingFeed(rid, "bytes", chunk);
core.opSync("op_wasm_streaming_feed", rid, chunk);
}
}
// 2.7.
core.wasmStreamingFeed(rid, "finish");
core.close(rid);
} catch (err) {
// 2.8 and 3
core.wasmStreamingFeed(rid, "abort", err);
core.opSync("op_wasm_streaming_abort", rid, err);
}
})();
}