1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

refactor: new optimized op-layer using serde_v8 (#9843)

- Improves op performance.
- Handle op-metadata (errors, promise IDs) explicitly in the op-layer vs
  per op-encoding (aka: out-of-payload).
- Remove shared queue & custom "asyncHandlers", all async values are
  returned in batches via js_recv_cb.
- The op-layer should be thought of as simple function calls with little
  indirection or translation besides the conceptually straightforward
  serde_v8 bijections.
- Preserve concepts of json/bin/min as semantic groups of their
  inputs/outputs instead of their op-encoding strategy, preserving these
  groups will also facilitate partial transitions over to v8 Fast API for the
  "min" and "bin" groups
This commit is contained in:
Aaron O'Mullan 2021-03-31 16:37:38 +02:00 committed by GitHub
parent 6dc3549a81
commit fec1b2a5a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 794 additions and 1811 deletions

10
Cargo.lock generated
View file

@ -565,6 +565,7 @@ version = "0.82.0"
dependencies = [
"anyhow",
"bencher",
"erased-serde",
"futures",
"indexmap",
"lazy_static",
@ -897,6 +898,15 @@ dependencies = [
"termcolor",
]
[[package]]
name = "erased-serde"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0465971a8cc1fa2455c8465aaa377131e1f1cf4983280f474a13e68793aa770c"
dependencies = [
"serde",
]
[[package]]
name = "errno"
version = "0.1.8"

View file

@ -1387,11 +1387,12 @@ fn cache_snapshot(
Ok(())
}
// buffer-less json_sync ops
fn op<F, V, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(&mut State, V) -> Result<R, AnyError> + 'static,
V: de::DeserializeOwned,
R: Serialize,
R: Serialize + 'static,
{
json_op_sync(move |s, args, _bufs| {
let state = s.borrow_mut::<State>();

View file

@ -8,9 +8,9 @@ import {
const readErrorStackPattern = new RegExp(
`^.*
at handleError \\(.*core\\.js:.*\\)
at binOpParseResult \\(.*core\\.js:.*\\)
at asyncHandle \\(.*core\\.js:.*\\).*$`,
at processErr \\(.*core\\.js:.*\\)
at opAsyncHandler \\(.*core\\.js:.*\\)
at handleAsyncMsgFromRust \\(.*core\\.js:.*\\).*$`,
"ms",
);
@ -32,45 +32,3 @@ declare global {
var core: any; // eslint-disable-line no-var
}
}
unitTest(function binOpsHeaderTooShort(): void {
for (const op of ["op_read_sync", "op_read_async"]) {
const readOpId = Deno.core.ops()[op];
const res = Deno.core.send(
readOpId,
new Uint8Array([
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
]),
);
const headerByteLength = 4 * 4;
assert(res.byteLength > headerByteLength);
const view = new DataView(
res.buffer,
res.byteOffset + res.byteLength - headerByteLength,
headerByteLength,
);
const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);
assert(requestId === 0);
assert(status !== 0);
assertEquals(new TextDecoder().decode(res.slice(0, result)), "TypeError");
assertEquals(
new TextDecoder().decode(res.slice(result, -headerByteLength)).trim(),
"Unparsable control buffer",
);
}
});

View file

@ -1,32 +0,0 @@
import { assertMatch, assertStrictEquals, unitTest } from "./test_util.ts";
declare global {
// deno-lint-ignore no-namespace
namespace Deno {
// deno-lint-ignore no-explicit-any
var core: any; // eslint-disable-line no-var
}
}
unitTest(function malformedJsonControlBuffer(): void {
const opId = Deno.core.ops()["op_open_sync"];
const argsBuf = new Uint8Array([1, 2, 3, 4, 5]);
const resBuf = Deno.core.send(opId, argsBuf);
const resText = new TextDecoder().decode(resBuf);
const resObj = JSON.parse(resText);
assertStrictEquals(resObj.ok, undefined);
assertStrictEquals(resObj.err.className, "SyntaxError");
assertMatch(resObj.err.message, /\bexpected value\b/);
});
unitTest(function invalidRequestId(): void {
const opId = Deno.core.ops()["op_open_async"];
const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]);
const resBuf = Deno.core.send(opId, reqBuf);
const resText = new TextDecoder().decode(resBuf);
const resObj = JSON.parse(resText);
console.error(resText);
assertStrictEquals(resObj.ok, undefined);
assertStrictEquals(resObj.err.className, "TypeError");
assertMatch(resObj.err.message, /\brequestId\b/);
});

View file

@ -7,35 +7,38 @@ unitTest(async function metrics(): Promise<void> {
const dataMsg = new Uint8Array([13, 13, 13]); // "\r\r\r",
await Deno.stdout.write(dataMsg);
// WARNING: bytesReceived & bytesSentControl are now always zero
// following https://github.com/denoland/deno/pull/9843
const m1 = Deno.metrics();
assert(m1.opsDispatched > 0);
assert(m1.opsCompleted > 0);
assert(m1.bytesSentControl > 0);
assert(m1.bytesSentControl === 0);
assert(m1.bytesSentData >= 0);
assert(m1.bytesReceived > 0);
assert(m1.bytesReceived === 0);
const m1OpWrite = m1.ops["op_write_async"];
assert(m1OpWrite.opsDispatchedAsync > 0);
assert(m1OpWrite.opsCompletedAsync > 0);
assert(m1OpWrite.bytesSentControl > 0);
assert(m1OpWrite.bytesSentControl === 0);
assert(m1OpWrite.bytesSentData >= 0);
assert(m1OpWrite.bytesReceived > 0);
assert(m1OpWrite.bytesReceived === 0);
await Deno.stdout.write(dataMsg);
const m2 = Deno.metrics();
assert(m2.opsDispatchedAsync > m1.opsDispatchedAsync);
assert(m2.opsCompletedAsync > m1.opsCompletedAsync);
assert(m2.bytesSentControl > m1.bytesSentControl);
assert(m2.bytesSentControl === m1.bytesSentControl);
assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength);
assert(m2.bytesReceived > m1.bytesReceived);
assert(m2.bytesReceived === m1.bytesReceived);
const m2OpWrite = m2.ops["op_write_async"];
assert(m2OpWrite.opsDispatchedAsync > m1OpWrite.opsDispatchedAsync);
assert(m2OpWrite.opsCompletedAsync > m1OpWrite.opsCompletedAsync);
assert(m2OpWrite.bytesSentControl > m1OpWrite.bytesSentControl);
assert(m2OpWrite.bytesSentControl === m1OpWrite.bytesSentControl);
assert(
m2OpWrite.bytesSentData >= m1OpWrite.bytesSentData + dataMsg.byteLength,
);
assert(m2OpWrite.bytesReceived > m1OpWrite.bytesReceived);
assert(m2OpWrite.bytesReceived === m1OpWrite.bytesReceived);
});
unitTest(

View file

@ -16,7 +16,6 @@ import "./copy_file_test.ts";
import "./custom_event_test.ts";
import "./dir_test.ts";
import "./dispatch_bin_test.ts";
import "./dispatch_json_test.ts";
import "./error_stack_test.ts";
import "./event_test.ts";
import "./event_target_test.ts";

View file

@ -14,6 +14,7 @@ path = "lib.rs"
[dependencies]
anyhow = "1.0.38"
erased-serde = "0.3.13"
futures = "0.3.12"
indexmap = "1.6.1"
lazy_static = "1.4.0"

View file

@ -5,12 +5,14 @@ use deno_core::json_op_sync;
use deno_core::v8;
use deno_core::JsRuntime;
use deno_core::Op;
use deno_core::OpResponse;
fn create_js_runtime() -> JsRuntime {
let mut runtime = JsRuntime::new(Default::default());
runtime.register_op("pi_bin", bin_op_sync(|_, _, _| Ok(314159)));
runtime.register_op("pi_json", json_op_sync(|_, _: (), _| Ok(314159)));
runtime.register_op("nop", |_, _| Op::Sync(Box::new(9_u64.to_le_bytes())));
runtime
.register_op("nop", |_, _, _| Op::Sync(OpResponse::Value(Box::new(9))));
// Init ops
runtime
@ -43,7 +45,7 @@ fn bench_op_pi_bin(b: &mut Bencher) {
bench_runtime_js(
b,
r#"for(let i=0; i < 1e3; i++) {
Deno.core.binOpSync("pi_bin", 0);
Deno.core.binOpSync("pi_bin", 0, nopView);
}"#,
);
}
@ -61,7 +63,7 @@ fn bench_op_nop(b: &mut Bencher) {
bench_runtime_js(
b,
r#"for(let i=0; i < 1e3; i++) {
Deno.core.dispatchByName("nop", nopView);
Deno.core.dispatchByName("nop", null, null, nopView);
}"#,
);
}

View file

@ -1,11 +1,13 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::AnyError;
use crate::runtime::JsRuntimeState;
use crate::JsRuntime;
use crate::Op;
use crate::OpId;
use crate::OpPayload;
use crate::OpResponse;
use crate::OpTable;
use crate::PromiseId;
use crate::ZeroCopyBuf;
use futures::future::FutureExt;
use rusty_v8 as v8;
@ -37,9 +39,6 @@ lazy_static::lazy_static! {
v8::ExternalReference {
function: eval_context.map_fn_to()
},
v8::ExternalReference {
getter: shared_getter.map_fn_to()
},
v8::ExternalReference {
function: queue_microtask.map_fn_to()
},
@ -142,9 +141,6 @@ pub fn initialize_context<'s>(
set_func(scope, core_val, "getProxyDetails", get_proxy_details);
set_func(scope, core_val, "heapStats", heap_stats);
let shared_key = v8::String::new(scope, "shared").unwrap();
core_val.set_accessor(scope, shared_key.into(), shared_getter);
// Direct bindings on `window`.
set_func(scope, global, "queueMicrotask", queue_microtask);
@ -380,59 +376,86 @@ fn send<'s>(
let mut state = state_rc.borrow_mut();
let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map(|l| l.value() as OpId)
.map_err(AnyError::from)
.and_then(|l| OpId::try_from(l.value()).map_err(AnyError::from))
{
Ok(op_id) => op_id,
Err(err) => {
let msg = format!("invalid op id: {}", err);
let msg = v8::String::new(scope, &msg).unwrap();
let exc = v8::Exception::type_error(scope, msg);
scope.throw_exception(exc);
throw_type_error(scope, format!("invalid op id: {}", err));
return;
}
};
let buf_iter = (1..args.length()).map(|idx| {
v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx))
// send(0) returns obj of all ops, handle as special case
if op_id == 0 {
// TODO: Serialize as HashMap when serde_v8 supports maps ...
let ops = OpTable::op_entries(state.op_state.clone());
rv.set(to_v8(scope, ops).unwrap());
return;
}
// PromiseId
let arg1 = args.get(1);
let promise_id = if arg1.is_null_or_undefined() {
Ok(0) // Accept null or undefined as 0
} else {
// Otherwise expect int
v8::Local::<v8::Integer>::try_from(arg1)
.map(|l| l.value() as PromiseId)
.map_err(AnyError::from)
};
// Fail if promise id invalid (not null/undefined or int)
let promise_id: PromiseId = match promise_id {
Ok(promise_id) => promise_id,
Err(err) => {
throw_type_error(scope, format!("invalid promise id: {}", err));
return;
}
};
// Structured args
let v = args.get(2);
// Buf arg (optional)
let arg3 = args.get(3);
let buf: Option<ZeroCopyBuf> = if arg3.is_null_or_undefined() {
None
} else {
match v8::Local::<v8::ArrayBufferView>::try_from(arg3)
.map(|view| ZeroCopyBuf::new(scope, view))
.map_err(|err| {
let msg = format!("Invalid argument at position {}: {}", idx, err);
let msg = v8::String::new(scope, &msg).unwrap();
v8::Exception::type_error(scope, msg)
})
});
let bufs = match buf_iter.collect::<Result<_, _>>() {
Ok(bufs) => bufs,
Err(exc) => {
scope.throw_exception(exc);
.map_err(AnyError::from)
{
Ok(buf) => Some(buf),
Err(err) => {
throw_type_error(scope, format!("Err with buf arg: {}", err));
return;
}
}
};
let op = OpTable::route_op(op_id, state.op_state.clone(), bufs);
assert_eq!(state.shared.size(), 0);
let payload = OpPayload::new(scope, v);
let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf);
match op {
Op::Sync(buf) if !buf.is_empty() => {
Op::Sync(resp) => match resp {
OpResponse::Value(v) => {
rv.set(to_v8(scope, v).unwrap());
}
OpResponse::Buffer(buf) => {
rv.set(boxed_slice_to_uint8array(scope, buf).into());
}
Op::Sync(_) => {}
},
Op::Async(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_ops.push(fut2.boxed_local());
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_unref_ops.push(fut2.boxed_local());
state.have_unpolled_ops = true;
}
Op::NotFound => {
let msg = format!("Unknown op id: {}", op_id);
let msg = v8::String::new(scope, &msg).unwrap();
let exc = v8::Exception::type_error(scope, msg);
scope.throw_exception(exc);
throw_type_error(scope, format!("Unknown op id: {}", op_id));
}
}
}
@ -711,33 +734,6 @@ fn queue_microtask(
};
}
fn shared_getter(
scope: &mut v8::HandleScope,
_name: v8::Local<v8::Name>,
_args: v8::PropertyCallbackArguments,
mut rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
let JsRuntimeState {
shared_ab, shared, ..
} = &mut *state;
// Lazily initialize the persistent external ArrayBuffer.
let shared_ab = match shared_ab {
Some(ref ab) => v8::Local::new(scope, ab),
slot @ None => {
let ab = v8::SharedArrayBuffer::with_backing_store(
scope,
shared.get_backing_store(),
);
slot.replace(v8::Global::new(scope, ab));
ab
}
};
rv.set(shared_ab.into())
}
// Called by V8 during `Isolate::mod_instantiate`.
pub fn module_resolve_callback<'s>(
context: v8::Local<'s, v8::Context>,

View file

@ -1,171 +1,37 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
/*
SharedQueue Binary Layout
+-------------------------------+-------------------------------+
| NUM_RECORDS (32) |
+---------------------------------------------------------------+
| NUM_SHIFTED_OFF (32) |
+---------------------------------------------------------------+
| HEAD (32) |
+---------------------------------------------------------------+
| OFFSETS (32) |
+---------------------------------------------------------------+
| RECORD_ENDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
| RECORDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
*/
"use strict";
((window) => {
const MAX_RECORDS = 100;
const INDEX_NUM_RECORDS = 0;
const INDEX_NUM_SHIFTED_OFF = 1;
const INDEX_HEAD = 2;
const INDEX_OFFSETS = 3;
const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS;
const HEAD_INIT = 4 * INDEX_RECORDS;
// Available on start due to bindings.
const core = window.Deno.core;
const { recv, send } = core;
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////// Dispatch /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
const dispatch = send;
const dispatchByName = (opName, control, ...zeroCopy) =>
dispatch(opsCache[opName], control, ...zeroCopy);
////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////// Shared array buffer ///////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
let sharedBytes;
let shared32;
let opsCache = {};
const errorMap = {};
let nextPromiseId = 1;
const promiseTable = new Map();
function init() {
const shared = core.shared;
assert(shared.byteLength > 0);
assert(sharedBytes == null);
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
}
function ops() {
// op id 0 is a special value to retrieve the map of registered ops.
const opsMapBytes = send(0);
const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
opsCache = JSON.parse(opsMapJson);
return { ...opsCache };
const newOpsCache = Object.fromEntries(send(0));
opsCache = Object.freeze(newOpsCache);
return opsCache;
}
function assert(cond) {
if (!cond) {
throw Error("assert");
function handleAsyncMsgFromRust() {
for (let i = 0; i < arguments.length; i += 2) {
opAsyncHandler(arguments[i], arguments[i + 1]);
}
}
function reset() {
shared32[INDEX_NUM_RECORDS] = 0;
shared32[INDEX_NUM_SHIFTED_OFF] = 0;
shared32[INDEX_HEAD] = HEAD_INIT;
function dispatch(opName, promiseId, control, zeroCopy) {
return send(opsCache[opName], promiseId, control, zeroCopy);
}
function head() {
return shared32[INDEX_HEAD];
}
function numRecords() {
return shared32[INDEX_NUM_RECORDS];
}
function size() {
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}
function setMeta(index, end, opId) {
shared32[INDEX_OFFSETS + 2 * index] = end;
shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
}
function getMeta(index) {
if (index >= numRecords()) {
return null;
}
const buf = shared32[INDEX_OFFSETS + 2 * index];
const opId = shared32[INDEX_OFFSETS + 2 * index + 1];
return [opId, buf];
}
function getOffset(index) {
if (index >= numRecords()) {
return null;
}
if (index == 0) {
return HEAD_INIT;
}
const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)];
return (prevEnd + 3) & ~3;
}
function push(opId, buf) {
const off = head();
const end = off + buf.byteLength;
const alignedEnd = (end + 3) & ~3;
const index = numRecords();
const shouldNotPush = alignedEnd > shared32.byteLength ||
index >= MAX_RECORDS;
if (shouldNotPush) {
// console.log("shared_queue.js push fail");
return false;
}
setMeta(index, end, opId);
assert(alignedEnd % 4 === 0);
assert(end - off == buf.byteLength);
sharedBytes.set(buf, off);
shared32[INDEX_NUM_RECORDS] += 1;
shared32[INDEX_HEAD] = alignedEnd;
return true;
}
/// Returns null if empty.
function shift() {
const i = shared32[INDEX_NUM_SHIFTED_OFF];
if (size() == 0) {
assert(i == 0);
return null;
}
const off = getOffset(i);
const [opId, end] = getMeta(i);
if (size() > 1) {
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
} else {
reset();
}
assert(off != null);
assert(end != null);
const buf = sharedBytes.subarray(off, end);
return [opId, buf];
}
////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Error handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
const errorMap = {};
function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
@ -173,239 +39,86 @@ SharedQueue Binary Layout
errorMap[errorName] = [className, args ?? []];
}
function handleError(className, message) {
if (typeof errorMap[className] === "undefined") {
function getErrorClassAndArgs(errorName) {
return errorMap[errorName] ?? [undefined, []];
}
function processResponse(res) {
// const [ok, err] = res;
if (res[1] === null) {
return res[0];
}
throw processErr(res[1]);
}
function processErr(err) {
const [ErrorClass, args] = getErrorClassAndArgs(err.className);
if (!ErrorClass) {
return new Error(
`Unregistered error class: "${className}"\n` +
` ${message}\n` +
` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
`Unregistered error class: "${err.className}"\n ${err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);
}
const [ErrorClass, args] = errorMap[className];
return new ErrorClass(message, ...args);
return new ErrorClass(err.message, ...args);
}
////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Async handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
let asyncHandlers = [];
function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}
for (let i = 0; i < arguments.length; i += 2) {
asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// General sync & async ops handling ////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
let nextRequestId = 1;
const promiseTable = {};
function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
if (error !== null) {
promiseTable[requestId][1](error);
} else {
promiseTable[requestId][0](result);
}
delete promiseTable[requestId];
}
function opAsync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
// Make sure requests of this type are handled by the asyncHandler
// The asyncHandler's role is to call the "promiseTable[requestId]" function
if (typeof asyncHandlers[opId] === "undefined") {
asyncHandlers[opId] = (buffer, isCopyNeeded) =>
asyncHandle(buffer, isCopyNeeded, opResultParser);
}
const requestId = nextRequestId++;
// Create and store promise
const promise = new Promise((resolve, reject) => {
promiseTable[requestId] = [resolve, reject];
function jsonOpAsync(opName, args = null, zeroCopy = null) {
const promiseId = nextPromiseId++;
const maybeError = dispatch(opName, promiseId, args, zeroCopy);
// Handle sync error (e.g: error parsing args)
if (maybeError) processResponse(maybeError);
let resolve, reject;
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
// Synchronously dispatch async request
core.dispatch(opId, ...opRequestBuilder(requestId));
// Wait for async response
promise.resolve = resolve;
promise.reject = reject;
promiseTable.set(promiseId, promise);
return promise;
}
function opSync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
const u8Array = core.dispatch(opId, ...opRequestBuilder());
const [_, result, error] = opResultParser(u8Array, false);
if (error !== null) throw error;
return result;
function jsonOpSync(opName, args = null, zeroCopy = null) {
return processResponse(dispatch(opName, null, args, zeroCopy));
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Bin ops handling /////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
const binRequestHeaderByteLength = 8 + 4;
const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
const scratchView = new DataView(scratchBuffer);
function binOpBuildRequest(requestId, argument, zeroCopy) {
scratchView.setBigUint64(0, BigInt(requestId), true);
scratchView.setUint32(8, argument, true);
return [scratchView, ...zeroCopy];
}
function binOpParseResult(u8Array, isCopyNeeded) {
// Decode header value from u8Array
const headerByteLength = 8 + 2 * 4;
assert(u8Array.byteLength >= headerByteLength);
assert(u8Array.byteLength % 4 == 0);
const view = new DataView(
u8Array.buffer,
u8Array.byteOffset + u8Array.byteLength - headerByteLength,
headerByteLength,
);
const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);
// Error handling
if (status !== 0) {
const className = core.decode(u8Array.subarray(0, result));
const message = core.decode(u8Array.subarray(result, -headerByteLength))
.trim();
return [requestId, null, handleError(className, message)];
}
if (u8Array.byteLength === headerByteLength) {
return [requestId, result, null];
}
// Rest of response buffer is passed as reference or as a copy
let respBuffer = null;
if (isCopyNeeded) {
// Copy part of the response array (if sent through shared array buf)
respBuffer = u8Array.slice(0, result);
function opAsyncHandler(promiseId, res) {
// const [ok, err] = res;
const promise = promiseTable.get(promiseId);
promiseTable.delete(promiseId);
if (!res[1]) {
promise.resolve(res[0]);
} else {
// Create view on existing array (if sent through overflow)
respBuffer = u8Array.subarray(0, result);
promise.reject(processErr(res[1]));
}
}
return [requestId, respBuffer, null];
function binOpSync(opName, args = null, zeroCopy = null) {
return jsonOpSync(opName, args, zeroCopy);
}
function binOpAsync(opName, argument = 0, ...zeroCopy) {
return opAsync(
opName,
(requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
binOpParseResult,
);
}
function binOpSync(opName, argument = 0, ...zeroCopy) {
return opSync(
opName,
() => binOpBuildRequest(0, argument, zeroCopy),
binOpParseResult,
);
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Json ops handling ////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////
const jsonRequestHeaderLength = 8;
function jsonOpBuildRequest(requestId, argument, zeroCopy) {
const u8Array = core.encode(
"\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
);
new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
return [u8Array, ...zeroCopy];
}
function jsonOpParseResult(u8Array, _) {
const data = JSON.parse(core.decode(u8Array));
if ("err" in data) {
return [
data.requestId,
null,
handleError(data.err.className, data.err.message),
];
}
return [data.requestId, data.ok, null];
}
function jsonOpAsync(opName, argument = null, ...zeroCopy) {
return opAsync(
opName,
(requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
jsonOpParseResult,
);
}
function jsonOpSync(opName, argument = null, ...zeroCopy) {
return opSync(
opName,
() => [core.encode(JSON.stringify(argument)), ...zeroCopy],
jsonOpParseResult,
);
function binOpAsync(opName, args = null, zeroCopy = null) {
return jsonOpAsync(opName, args, zeroCopy);
}
function resources() {
return jsonOpSync("op_resources");
}
function close(rid) {
return jsonOpSync("op_close", { rid });
jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
binOpAsync,
binOpSync,
dispatch,
dispatchByName,
jsonOpAsync,
jsonOpSync,
dispatch: send,
dispatchByName: dispatch,
ops,
close,
resources,
registerErrorClass,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
MAX_RECORDS,
head,
numRecords,
size,
push,
reset,
shift,
},
// setAsyncHandler is private but exposed for testing.
setAsyncHandler,
init,
});
})(this);

View file

@ -1,88 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
// Check overflow (corresponds to full_records test in rust)
function fullRecords(q) {
q.reset();
const oneByte = new Uint8Array([42]);
for (let i = 0; i < q.MAX_RECORDS; i++) {
assert(q.push(1, oneByte));
}
assert(!q.push(1, oneByte));
const [opId, r] = q.shift();
assert(opId == 1);
assert(r.byteLength == 1);
assert(r[0] == 42);
// Even if we shift one off, we still cannot push a new record.
assert(!q.push(1, oneByte));
}
function main() {
const q = Deno.core.sharedQueue;
const h = q.head();
assert(h > 0);
// This record's len is not divisible by
// 4 so after pushing it to the queue,
// next record offset should be aligned to 4.
let r = new Uint8Array([1, 2, 3, 4, 5]);
const len = r.byteLength + h;
assert(q.push(1, r));
// Record should be aligned to 4 bytes
assert(q.head() == len + 3);
r = new Uint8Array([6, 7]);
assert(q.push(1, r));
r = new Uint8Array([8, 9, 10, 11]);
assert(q.push(1, r));
assert(q.numRecords() == 3);
assert(q.size() == 3);
let opId;
[opId, r] = q.shift();
assert(r.byteLength == 5);
assert(r[0] == 1);
assert(r[1] == 2);
assert(r[2] == 3);
assert(r[3] == 4);
assert(r[4] == 5);
assert(q.numRecords() == 3);
assert(q.size() == 2);
[opId, r] = q.shift();
assert(r.byteLength == 2);
assert(r[0] == 6);
assert(r[1] == 7);
assert(q.numRecords() == 3);
assert(q.size() == 1);
[opId, r] = q.shift();
assert(opId == 1);
assert(r.byteLength == 4);
assert(r[0] == 8);
assert(r[1] == 9);
assert(r[2] == 10);
assert(r[3] == 11);
assert(q.numRecords() == 0);
assert(q.size() == 0);
assert(q.shift() == null);
assert(q.shift() == null);
assert(q.numRecords() == 0);
assert(q.size() == 0);
fullRecords(q);
Deno.core.print("shared_queue_test.js ok\n");
q.reset();
}
main();

View file

@ -2,11 +2,8 @@
//! This example shows you how to define ops in Rust and then call them from
//! JavaScript.
use anyhow::anyhow;
use deno_core::json_op_sync;
use deno_core::JsRuntime;
use deno_core::Op;
use serde_json::Value;
use std::io::Write;
fn main() {
@ -21,53 +18,50 @@ fn main() {
//
// The second one just transforms some input and returns it to JavaScript.
// Register the op for outputting bytes to stdout.
// Register the op for outputting a string to stdout.
// It can be invoked with Deno.core.dispatch and the id this method returns
// or Deno.core.dispatchByName and the name provided.
runtime.register_op(
"op_print",
// The op_fn callback takes a state object OpState
// and a vector of ZeroCopyBuf's, which are mutable references
// to ArrayBuffer's in JavaScript.
|_state, zero_copy| {
// The op_fn callback takes a state object OpState,
// a structured arg of type `T` and an optional ZeroCopyBuf,
// a mutable reference to a JavaScript ArrayBuffer
json_op_sync(|_state, msg: Option<String>, zero_copy| {
let mut out = std::io::stdout();
// Write msg to stdout
if let Some(msg) = msg {
out.write_all(msg.as_bytes()).unwrap();
}
// Write the contents of every buffer to stdout
for buf in zero_copy {
out.write_all(&buf).unwrap();
}
Op::Sync(Box::new([])) // No meaningful result
},
Ok(()) // No meaningful result
}),
);
// Register the JSON op for summing a number array.
// A JSON op is just an op where the first ZeroCopyBuf is a serialized JSON
// value, the return value is also a serialized JSON value. It can be invoked
// with Deno.core.jsonOpSync and the name.
runtime.register_op(
"op_sum",
// The json_op_sync function automatically deserializes
// the first ZeroCopyBuf and serializes the return value
// to reduce boilerplate
json_op_sync(|_state, json: Vec<f64>, zero_copy| {
// We check that we only got the JSON value.
if !zero_copy.is_empty() {
Err(anyhow!("Expected exactly one argument"))
} else {
// And if we did, do our actual task
let sum = json.iter().fold(0.0, |a, v| a + v);
// Finally we return a JSON value
Ok(Value::from(sum))
}
json_op_sync(|_state, nums: Vec<f64>, _| {
// Sum inputs
let sum = nums.iter().fold(0.0, |a, v| a + v);
// return as a Result<f64, AnyError>
Ok(sum)
}),
);
// Now we see how to invoke the ops we just defined. The runtime automatically
// contains a Deno.core object with several functions for interacting with it.
// You can find its definition in core.js.
runtime.execute(
runtime
.execute(
"<init>",
r#"
// First we initialize the ops cache.
@ -78,14 +72,15 @@ Deno.core.ops();
// our op_print op to display the stringified argument.
const _newline = new Uint8Array([10]);
function print(value) {
Deno.core.dispatchByName('op_print', Deno.core.encode(value.toString()), _newline);
Deno.core.dispatchByName('op_print', 0, value.toString(), _newline);
}
// Finally we register the error class used by op_sum
// so that it throws the correct class.
Deno.core.registerErrorClass('Error', Error);
"#,
).unwrap();
)
.unwrap();
// Now we can finally use this in an example.
runtime

View file

@ -9,14 +9,19 @@ const responseBuf = new Uint8Array(
.map((c) => c.charCodeAt(0)),
);
// This buffer exists purely to avoid trigerring the bin-op buf assert
// in practice all deno bin ops accept buffers, this bench is an exception
// TODO(@AaronO): remove once we drop variadic BufVec compat
const nopBuffer = new Uint8Array();
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
return Deno.core.binOpSync("listen");
return Deno.core.binOpSync("listen", 0, nopBuffer);
}
/** Accepts a connection, returns rid. */
function accept(rid) {
return Deno.core.binOpAsync("accept", rid);
return Deno.core.binOpAsync("accept", rid, nopBuffer);
}
/**
@ -33,7 +38,7 @@ function write(rid, data) {
}
function close(rid) {
Deno.core.binOpSync("close", rid);
Deno.core.binOpSync("close", rid, nopBuffer);
}
async function serve(rid) {

View file

@ -11,33 +11,29 @@ const responseBuf = new Uint8Array(
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
const { rid } = Deno.core.jsonOpSync("listen");
return rid;
return Deno.core.jsonOpSync("listen");
}
/** Accepts a connection, returns rid. */
async function accept(serverRid) {
const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid });
return rid;
function accept(serverRid) {
return Deno.core.jsonOpAsync("accept", serverRid);
}
/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
async function read(rid, data) {
const { nread } = await Deno.core.jsonOpAsync("read", { rid }, data);
return nread;
function read(rid, data) {
return Deno.core.jsonOpAsync("read", rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
const { nwritten } = await Deno.core.jsonOpAsync("write", { rid }, data);
return nwritten;
function write(rid, data) {
return Deno.core.jsonOpAsync("write", rid, data);
}
function close(rid) {
Deno.core.jsonOpSync("close", { rid });
Deno.core.jsonOpSync("close", rid);
}
async function serve(rid) {

View file

@ -9,10 +9,8 @@ use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::env;
@ -121,11 +119,6 @@ fn create_js_runtime() -> JsRuntime {
runtime
}
#[derive(Deserialize, Serialize)]
struct ResourceId {
rid: u32,
}
fn op_listen(
state: &mut OpState,
_args: (),
@ -137,71 +130,71 @@ fn op_listen(
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table.add(listener);
Ok(ResourceId { rid })
Ok(rid)
}
fn op_close(
state: &mut OpState,
args: ResourceId,
rid: ResourceId,
_buf: &mut [ZeroCopyBuf],
) -> Result<(), AnyError> {
log::debug!("close rid={}", args.rid);
log::debug!("close rid={}", rid);
state
.resource_table
.close(args.rid)
.close(rid)
.map(|_| ())
.ok_or_else(bad_resource_id)
}
async fn op_accept(
state: Rc<RefCell<OpState>>,
args: ResourceId,
rid: ResourceId,
_bufs: BufVec,
) -> Result<ResourceId, AnyError> {
log::debug!("accept rid={}", args.rid);
log::debug!("accept rid={}", rid);
let listener = state
.borrow()
.resource_table
.get::<TcpListener>(args.rid)
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table.add(stream);
Ok(ResourceId { rid })
Ok(rid)
}
async fn op_read(
state: Rc<RefCell<OpState>>,
args: ResourceId,
rid: ResourceId,
mut bufs: BufVec,
) -> Result<Value, AnyError> {
) -> Result<usize, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
log::debug!("read rid={}", args.rid);
log::debug!("read rid={}", rid);
let stream = state
.borrow()
.resource_table
.get::<TcpStream>(args.rid)
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nread = stream.read(&mut bufs[0]).await?;
Ok(serde_json::json!({ "nread": nread }))
Ok(nread)
}
async fn op_write(
state: Rc<RefCell<OpState>>,
args: ResourceId,
rid: ResourceId,
bufs: BufVec,
) -> Result<Value, AnyError> {
) -> Result<usize, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
log::debug!("write rid={}", args.rid);
log::debug!("write rid={}", rid);
let stream = state
.borrow()
.resource_table
.get::<TcpStream>(args.rid)
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nwritten = stream.write(&bufs[0]).await?;
Ok(serde_json::json!({ "nwritten": nwritten }))
Ok(nwritten)
}
fn main() {

View file

@ -11,14 +11,14 @@ declare namespace Deno {
function jsonOpSync(
opName: string,
args?: any,
...zeroCopy: Uint8Array[]
zeroCopy?: Uint8Array,
): any;
/** Send a JSON op to Rust, and asynchronously receive the result. */
function jsonOpAsync(
opName: string,
args?: any,
...zeroCopy: Uint8Array[]
zeroCopy?: Uint8Array,
): Promise<any>;
/**

View file

@ -14,7 +14,6 @@ mod ops_json;
pub mod plugin_api;
mod resources;
mod runtime;
mod shared_queue;
mod zero_copy_buf;
// Re-exports
@ -56,12 +55,17 @@ pub use crate::modules::RecursiveModuleLoad;
pub use crate::normalize_path::normalize_path;
pub use crate::ops::op_close;
pub use crate::ops::op_resources;
pub use crate::ops::serialize_op_result;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpPayload;
pub use crate::ops::OpResponse;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
pub use crate::ops::PromiseId;
pub use crate::ops::Serializable;
pub use crate::ops_bin::bin_op_async;
pub use crate::ops_bin::bin_op_sync;
pub use crate::ops_bin::ValueOrVector;

View file

@ -6,10 +6,12 @@ use crate::error::AnyError;
use crate::gotham_state::GothamState;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
use indexmap::IndexMap;
use rusty_v8 as v8;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use std::cell::RefCell;
@ -20,12 +22,50 @@ use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Box<[u8]>>>>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static;
pub use erased_serde::Serialize as Serializable;
pub type PromiseId = u64;
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = OpResponse>>>;
pub type OpFn =
dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static;
pub type OpId = usize;
pub struct OpPayload<'a, 'b, 'c> {
pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>,
pub(crate) value: Option<v8::Local<'c, v8::Value>>,
}
impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
pub fn new(
scope: &'a mut v8::HandleScope<'b>,
value: v8::Local<'c, v8::Value>,
) -> Self {
Self {
scope: Some(scope),
value: Some(value),
}
}
pub fn empty() -> Self {
Self {
scope: None,
value: None,
}
}
pub fn deserialize<T: DeserializeOwned>(self) -> Result<T, AnyError> {
serde_v8::from_v8(self.scope.unwrap(), self.value.unwrap())
.map_err(AnyError::from)
.map_err(|e| type_error(format!("Error parsing args: {}", e)))
}
}
pub enum OpResponse {
Value(Box<dyn Serializable>),
Buffer(Box<[u8]>),
}
pub enum Op {
Sync(Box<[u8]>),
Sync(OpResponse),
Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
@ -33,6 +73,32 @@ pub enum Op {
NotFound,
}
#[derive(Serialize)]
pub struct OpResult<R>(Option<R>, Option<OpError>);
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct OpError {
class_name: &'static str,
message: String,
}
pub fn serialize_op_result<R: Serialize + 'static>(
result: Result<R, AnyError>,
state: Rc<RefCell<OpState>>,
) -> OpResponse {
OpResponse::Value(Box::new(match result {
Ok(v) => OpResult::<R>(Some(v), None),
Err(err) => OpResult::<R>(
None,
Some(OpError {
class_name: (state.borrow().get_error_class_fn)(&err),
message: err.to_string(),
}),
),
}))
}
/// Maintains the resources and ops inside a JS runtime.
pub struct OpState {
pub resource_table: ResourceTable,
@ -73,24 +139,23 @@ pub struct OpTable(IndexMap<String, Rc<OpFn>>);
impl OpTable {
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
where
F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static,
F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
{
let (op_id, prev) = self.0.insert_full(name.to_owned(), Rc::new(op_fn));
assert!(prev.is_none());
op_id
}
pub fn op_entries(state: Rc<RefCell<OpState>>) -> Vec<(String, OpId)> {
state.borrow().op_table.0.keys().cloned().zip(0..).collect()
}
pub fn route_op(
op_id: OpId,
state: Rc<RefCell<OpState>>,
bufs: BufVec,
payload: OpPayload,
buf: Option<ZeroCopyBuf>,
) -> Op {
if op_id == 0 {
let ops: HashMap<String, OpId> =
state.borrow().op_table.0.keys().cloned().zip(0..).collect();
let buf = serde_json::to_vec(&ops).map(Into::into).unwrap();
Op::Sync(buf)
} else {
let op_fn = state
.borrow()
.op_table
@ -98,16 +163,19 @@ impl OpTable {
.get_index(op_id)
.map(|(_, op_fn)| op_fn.clone());
match op_fn {
Some(f) => (f)(state, bufs),
Some(f) => (f)(state, payload, buf),
None => Op::NotFound,
}
}
}
}
impl Default for OpTable {
fn default() -> Self {
fn dummy(_state: Rc<RefCell<OpState>>, _bufs: BufVec) -> Op {
fn dummy(
_state: Rc<RefCell<OpState>>,
_p: OpPayload,
_b: Option<ZeroCopyBuf>,
) -> Op {
unreachable!()
}
Self(once(("ops".to_owned(), Rc::new(dummy) as _)).collect())
@ -164,24 +232,36 @@ mod tests {
let bar_id;
{
let op_table = &mut state.borrow_mut().op_table;
foo_id = op_table.register_op("foo", |_, _| Op::Sync(b"oof!"[..].into()));
foo_id = op_table.register_op("foo", |_, _, _| {
Op::Sync(OpResponse::Buffer(b"oof!"[..].into()))
});
assert_eq!(foo_id, 1);
bar_id = op_table.register_op("bar", |_, _| Op::Sync(b"rab!"[..].into()));
bar_id = op_table.register_op("bar", |_, _, _| {
Op::Sync(OpResponse::Buffer(b"rab!"[..].into()))
});
assert_eq!(bar_id, 2);
}
let foo_res = OpTable::route_op(foo_id, state.clone(), Default::default());
assert!(matches!(foo_res, Op::Sync(buf) if &*buf == b"oof!"));
let bar_res = OpTable::route_op(bar_id, state.clone(), Default::default());
assert!(matches!(bar_res, Op::Sync(buf) if &*buf == b"rab!"));
let foo_res = OpTable::route_op(
foo_id,
state.clone(),
OpPayload::empty(),
Default::default(),
);
assert!(
matches!(foo_res, Op::Sync(OpResponse::Buffer(buf)) if &*buf == b"oof!")
);
let bar_res = OpTable::route_op(
bar_id,
state.clone(),
OpPayload::empty(),
Default::default(),
);
assert!(
matches!(bar_res, Op::Sync(OpResponse::Buffer(buf)) if &*buf == b"rab!")
);
let catalog_res = OpTable::route_op(0, state, Default::default());
let mut catalog_entries = match catalog_res {
Op::Sync(buf) => serde_json::from_slice::<HashMap<String, OpId>>(&buf)
.map(|map| map.into_iter().collect::<Vec<_>>())
.unwrap(),
_ => panic!("unexpected `Op` variant"),
};
let mut catalog_entries = OpTable::op_entries(state);
catalog_entries.sort_by(|(_, id1), (_, id2)| id1.partial_cmp(id2).unwrap());
assert_eq!(
catalog_entries,

View file

@ -1,54 +1,23 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::type_error;
use crate::error::AnyError;
use crate::futures::future::FutureExt;
use crate::serialize_op_result;
use crate::BufVec;
use crate::Op;
use crate::OpFn;
use crate::OpPayload;
use crate::OpResponse;
use crate::OpState;
use crate::ZeroCopyBuf;
use std::boxed::Box;
use std::cell::RefCell;
use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct RequestHeader {
pub request_id: u64,
pub argument: u32,
}
impl RequestHeader {
pub fn from_raw(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 3 * 4 {
return None;
}
Some(Self {
request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
})
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct ResponseHeader {
pub request_id: u64,
pub status: u32,
pub result: u32,
}
impl From<ResponseHeader> for [u8; 16] {
fn from(r: ResponseHeader) -> Self {
let mut resp_header = [0u8; 16];
resp_header[0..8].copy_from_slice(&r.request_id.to_le_bytes());
resp_header[8..12].copy_from_slice(&r.status.to_le_bytes());
resp_header[12..16].copy_from_slice(&r.result.to_le_bytes());
resp_header
}
}
// TODO: rewrite this, to have consistent buffer returns
// possibly via direct serde_v8 support
pub trait ValueOrVector {
fn value(&self) -> u32;
fn vector(self) -> Option<Vec<u8>>;
@ -72,10 +41,6 @@ impl ValueOrVector for u32 {
}
}
fn gen_padding_32bit(len: usize) -> &'static [u8] {
&[b' ', b' ', b' '][0..(4 - (len & 3)) & 3]
}
/// Creates an op that passes data synchronously using raw ui8 buffer.
///
/// The provided function `op_fn` has the following parameters:
@ -104,80 +69,49 @@ where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
{
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
let mut bufs_iter = bufs.into_iter();
let record_buf = bufs_iter.next().expect("Expected record at position 0");
let mut zero_copy = bufs_iter.collect::<BufVec>();
Box::new(move |state, payload, buf| -> Op {
let min_arg: u32 = payload.deserialize().unwrap();
// For sig compat map Option<ZeroCopyBuf> to BufVec
let mut bufs: BufVec = match buf {
Some(b) => vec![b],
None => vec![],
}
.into();
// Bin op buffer arg assert
if bufs.is_empty() {
return Op::Sync(serialize_bin_result::<u32>(
Err(type_error("bin-ops require a non-null buffer arg")),
state,
));
}
let req_header = match RequestHeader::from_raw(&record_buf) {
Some(r) => r,
None => {
let error_class = b"TypeError";
let error_message = b"Unparsable control buffer";
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: 0,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
};
match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) {
Ok(possibly_vector) => {
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 0,
result: possibly_vector.value(),
};
let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
let resp_vector = match possibly_vector.vector() {
Some(mut vector) => {
let padding = gen_padding_32bit(vector.len());
vector.extend(padding);
vector.extend(&resp_encoded_header);
vector
}
None => resp_encoded_header.to_vec(),
};
Op::Sync(resp_vector.into_boxed_slice())
}
Err(error) => {
let error_class =
(state.borrow().get_error_class_fn)(&error).as_bytes();
let error_message = error.to_string().as_bytes().to_owned();
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
}
let result = op_fn(&mut state.borrow_mut(), min_arg, &mut bufs);
Op::Sync(serialize_bin_result(result, state))
})
}
// wraps serialize_op_result but handles ValueOrVector
fn serialize_bin_result<R>(
result: Result<R, AnyError>,
state: Rc<RefCell<OpState>>,
) -> OpResponse
where
R: ValueOrVector,
{
match result {
Ok(v) => {
let min_val = v.value();
match v.vector() {
// Warning! this is incorrect, but buffers aren't use ATM, will fix in future PR
Some(vec) => OpResponse::Buffer(vec.into()),
// u32
None => serialize_op_result(Ok(min_val), state),
}
}
Err(e) => serialize_op_result::<()>(Err(e), state),
}
}
/// Creates an op that passes data asynchronously using raw ui8 buffer.
///
/// The provided function `op_fn` has the following parameters:
@ -208,170 +142,30 @@ where
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: ValueOrVector,
{
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
let mut bufs_iter = bufs.into_iter();
let record_buf = bufs_iter.next().expect("Expected record at position 0");
let zero_copy = bufs_iter.collect::<BufVec>();
Box::new(
move |state: Rc<RefCell<OpState>>,
p: OpPayload,
b: Option<ZeroCopyBuf>|
-> Op {
let min_arg: u32 = p.deserialize().unwrap();
// For sig compat map Option<ZeroCopyBuf> to BufVec
let bufs: BufVec = match b {
Some(b) => vec![b],
None => vec![],
}
.into();
// Bin op buffer arg assert
if bufs.is_empty() {
return Op::Sync(serialize_bin_result::<u32>(
Err(type_error("bin-ops require a non-null buffer arg")),
state,
));
}
let req_header = match RequestHeader::from_raw(&record_buf) {
Some(r) => r,
None => {
let error_class = b"TypeError";
let error_message = b"Unparsable control buffer";
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: 0,
status: 1,
result: error_class.len() as u32,
};
return Op::Sync(
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect(),
);
}
};
let fut =
op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| {
match result {
Ok(possibly_vector) => {
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 0,
result: possibly_vector.value(),
};
let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
let resp_vector = match possibly_vector.vector() {
Some(mut vector) => {
let padding = gen_padding_32bit(vector.len());
vector.extend(padding);
vector.extend(&resp_encoded_header);
vector
}
None => resp_encoded_header.to_vec(),
};
resp_vector.into_boxed_slice()
}
Err(error) => {
let error_class =
(state.borrow().get_error_class_fn)(&error).as_bytes();
let error_message = error.to_string().as_bytes().to_owned();
let len = error_class.len() + error_message.len();
let padding = gen_padding_32bit(len);
let resp_header = ResponseHeader {
request_id: req_header.request_id,
status: 1,
result: error_class.len() as u32,
};
error_class
.iter()
.chain(error_message.iter())
.chain(padding)
.chain(&Into::<[u8; 16]>::into(resp_header))
.cloned()
.collect()
}
}
});
let fut = op_fn(state.clone(), min_arg, bufs)
.map(move |result| serialize_bin_result(result, state));
let temp = Box::pin(fut);
Op::Async(temp)
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn padding() {
assert_eq!(gen_padding_32bit(0), &[] as &[u8]);
assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']);
assert_eq!(gen_padding_32bit(2), &[b' ', b' ']);
assert_eq!(gen_padding_32bit(3), &[b' ']);
assert_eq!(gen_padding_32bit(4), &[] as &[u8]);
assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']);
}
#[test]
fn response_header_to_bytes() {
// Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
let resp_header = ResponseHeader {
request_id: 0x0102030405060708u64,
status: 0x090A0B0Cu32,
result: 0x0D0E0F10u32,
};
// All numbers are always little-endian encoded, as the js side also wants this to be fixed
assert_eq!(
&Into::<[u8; 16]>::into(resp_header),
&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13]
);
}
#[test]
fn response_header_to_bytes_max_value() {
// Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
let resp_header = ResponseHeader {
request_id: (1u64 << 53u64) - 1u64,
status: 0xFFFFFFFFu32,
result: 0xFFFFFFFFu32,
};
// All numbers are always little-endian encoded, as the js side also wants this to be fixed
assert_eq!(
&Into::<[u8; 16]>::into(resp_header),
&[
255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255,
255
]
);
}
#[test]
fn request_header_from_bytes() {
let req_header =
RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9])
.unwrap();
assert_eq!(req_header.request_id, 0x0102030405060708u64);
assert_eq!(req_header.argument, 0x090A0B0Cu32);
}
#[test]
fn request_header_from_bytes_max_value() {
let req_header = RequestHeader::from_raw(&[
255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255,
])
.unwrap();
assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64);
assert_eq!(req_header.argument, 0xFFFFFFFFu32);
}
#[test]
fn request_header_from_bytes_too_short() {
let req_header =
RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]);
assert_eq!(req_header, None);
}
#[test]
fn request_header_from_bytes_long() {
let req_header = RequestHeader::from_raw(&[
8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21,
])
.unwrap();
assert_eq!(req_header.request_id, 0x0102030405060708u64);
assert_eq!(req_header.argument, 0x090A0B0Cu32);
}
},
)
}

View file

@ -1,37 +1,19 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::type_error;
use crate::error::AnyError;
use crate::serialize_op_result;
use crate::BufVec;
use crate::Op;
use crate::OpFn;
use crate::OpPayload;
use crate::OpState;
use crate::ZeroCopyBuf;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::cell::RefCell;
use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
fn json_serialize_op_result<R: Serialize>(
request_id: Option<u64>,
result: Result<R, AnyError>,
get_error_class_fn: crate::runtime::GetErrorClassFn,
) -> Box<[u8]> {
let value = match result {
Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }),
Err(err) => serde_json::json!({
"requestId": request_id,
"err": {
"className": (get_error_class_fn)(&err),
"message": err.to_string(),
}
}),
};
serde_json::to_vec(&value).unwrap().into_boxed_slice()
}
/// Creates an op that passes data synchronously using JSON.
///
/// The provided function `op_fn` has the following parameters:
@ -59,15 +41,20 @@ pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
V: DeserializeOwned,
R: Serialize,
R: Serialize + 'static,
{
Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
let result = serde_json::from_slice(&bufs[0])
.map_err(AnyError::from)
.and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
let buf =
json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
Op::Sync(buf)
Box::new(move |state, payload, buf: Option<ZeroCopyBuf>| -> Op {
// For sig compat map Option<ZeroCopyBuf> to BufVec
let mut bufs: BufVec = match buf {
Some(b) => vec![b],
None => vec![],
}
.into();
let result = payload
.deserialize()
.and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs));
Op::Sync(serialize_op_result(result, state))
})
}
@ -100,35 +87,38 @@ where
F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
V: DeserializeOwned,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: Serialize,
RV: Serialize + 'static,
{
let try_dispatch_op =
move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
let request_id = bufs[0]
.get(0..8)
.map(|b| u64::from_le_bytes(b.try_into().unwrap()))
.ok_or_else(|| type_error("missing or invalid `requestId`"))?;
let args = serde_json::from_slice(&bufs[0][8..])?;
let bufs = bufs[1..].into();
let try_dispatch_op = move |state: Rc<RefCell<OpState>>,
p: OpPayload,
b: Option<ZeroCopyBuf>|
-> Result<Op, AnyError> {
// For sig compat map Option<ZeroCopyBuf> to BufVec
let bufs: BufVec = match b {
Some(b) => vec![b],
None => vec![],
}
.into();
// Parse args
let args = p.deserialize()?;
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), args, bufs).map(move |result| {
json_serialize_op_result(
Some(request_id),
result,
state.borrow().get_error_class_fn,
)
});
let fut = op_fn(state.clone(), args, bufs)
.map(move |result| serialize_op_result(result, state));
Ok(Op::Async(Box::pin(fut)))
};
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
match try_dispatch_op(state.clone(), bufs) {
Box::new(
move |state: Rc<RefCell<OpState>>,
p: OpPayload,
b: Option<ZeroCopyBuf>|
-> Op {
match try_dispatch_op(state.clone(), p, b) {
Ok(op) => op,
Err(err) => Op::Sync(json_serialize_op_result(
None,
Err::<(), AnyError>(err),
state.borrow().get_error_class_fn,
)),
Err(err) => {
Op::Sync(serialize_op_result(Err::<(), AnyError>(err), state))
}
})
}
},
)
}

View file

@ -10,6 +10,7 @@
pub use crate::Op;
pub use crate::OpId;
pub use crate::OpResponse;
pub use crate::ZeroCopyBuf;
pub type InitFn = fn(&mut dyn Interface);

View file

@ -20,10 +20,11 @@ use crate::modules::NoopModuleLoader;
use crate::modules::PrepareLoadFuture;
use crate::modules::RecursiveModuleLoad;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::BufVec;
use crate::OpPayload;
use crate::OpResponse;
use crate::OpState;
use crate::PromiseId;
use crate::ZeroCopyBuf;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
@ -45,7 +46,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>;
type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>;
pub enum Snapshot {
Static(&'static [u8]),
@ -99,7 +100,6 @@ struct ModEvaluate {
/// embedder slots.
pub(crate) struct JsRuntimeState {
pub global_context: Option<v8::Global<v8::Context>>,
pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions:
@ -107,7 +107,6 @@ pub(crate) struct JsRuntimeState {
pending_dyn_mod_evaluate: HashMap<ModuleLoadId, DynImportModEvaluate>,
pending_mod_evaluate: Option<ModEvaluate>,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: bool,
@ -276,11 +275,9 @@ impl JsRuntime {
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: HashMap::new(),
pending_mod_evaluate: None,
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
js_error_create_fn,
shared: SharedQueue::new(RECOMMENDED_SIZE),
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
@ -305,7 +302,7 @@ impl JsRuntime {
}
if !options.will_snapshot {
js_runtime.shared_queue_init();
js_runtime.core_js_init();
}
js_runtime
@ -350,16 +347,13 @@ impl JsRuntime {
.unwrap();
}
/// Executes a JavaScript code to initialize shared queue binding
/// between Rust and JS.
/// Executes JavaScript code to initialize core.js,
/// specifically the js_recv_cb setter
///
/// This function mustn't be called during snapshotting.
fn shared_queue_init(&mut self) {
fn core_js_init(&mut self) {
self
.execute(
"deno:core/shared_queue_init.js",
"Deno.core.sharedQueueInit()",
)
.execute("deno:core/init.js", "Deno.core.init()")
.unwrap();
}
@ -448,7 +442,7 @@ impl JsRuntime {
/// * [json_op_async()](fn.json_op_async.html)
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
where
F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static,
F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
{
Self::state(self.v8_isolate())
.borrow_mut()
@ -516,8 +510,8 @@ impl JsRuntime {
// Ops
{
let overflow_response = self.poll_pending_ops(cx);
self.async_op_response(overflow_response)?;
let async_responses = self.poll_pending_ops(cx);
self.async_op_response(async_responses)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
}
@ -1325,9 +1319,12 @@ impl JsRuntime {
self.mod_instantiate(root_id).map(|_| root_id)
}
fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Vec<(PromiseId, OpResponse)> {
let state_rc = Self::state(self.v8_isolate());
let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();
let mut async_responses: Vec<(PromiseId, OpResponse)> = Vec::new();
let mut state = state_rc.borrow_mut();
@ -1339,11 +1336,8 @@ impl JsRuntime {
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
overflow_response.push((op_id, buf));
}
Poll::Ready(Some((promise_id, resp))) => {
async_responses.push((promise_id, resp));
}
};
}
@ -1353,16 +1347,13 @@ impl JsRuntime {
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
overflow_response.push((op_id, buf));
}
Poll::Ready(Some((promise_id, resp))) => {
async_responses.push((promise_id, resp));
}
};
}
overflow_response
async_responses
}
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
@ -1391,17 +1382,15 @@ impl JsRuntime {
exception_to_err_result(scope, exception, true)
}
// Respond using shared queue and optionally overflown response
// Send finished responses to JS
fn async_op_response(
&mut self,
overflown_responses: Vec<(OpId, Box<[u8]>)>,
async_responses: Vec<(PromiseId, OpResponse)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let shared_queue_size = state_rc.borrow().shared.size();
let overflown_responses_size = overflown_responses.len();
if shared_queue_size == 0 && overflown_responses_size == 0 {
let async_responses_size = async_responses.len();
if async_responses_size == 0 {
return Ok(());
}
@ -1422,26 +1411,32 @@ impl JsRuntime {
let tc_scope = &mut v8::TryCatch::new(scope);
// We return async responses to JS in unbounded batches (may change),
// each batch is a flat vector of tuples:
// `[promise_id1, op_result1, promise_id2, op_result2, ...]`
// promise_id is a simple integer, op_result is an ops::OpResult
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
let mut args: Vec<v8::Local<v8::Value>> =
Vec::with_capacity(2 * overflown_responses_size);
for overflown_response in overflown_responses {
let (op_id, buf) = overflown_response;
args.push(v8::Integer::new(tc_scope, op_id as i32).into());
args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
Vec::with_capacity(2 * async_responses_size);
for overflown_response in async_responses {
let (promise_id, resp) = overflown_response;
args.push(v8::Integer::new(tc_scope, promise_id as i32).into());
args.push(match resp {
OpResponse::Value(value) => serde_v8::to_v8(tc_scope, value).unwrap(),
OpResponse::Buffer(buf) => {
bindings::boxed_slice_to_uint8array(tc_scope, buf).into()
}
});
}
if shared_queue_size > 0 || overflown_responses_size > 0 {
if async_responses_size > 0 {
js_recv_cb.call(tc_scope, global, args.as_slice());
}
match tc_scope.exception() {
None => {
// The other side should have shifted off all the messages.
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
Ok(())
}
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
}
}
@ -1485,7 +1480,6 @@ impl JsRuntime {
pub mod tests {
use super::*;
use crate::modules::ModuleSourceFuture;
use crate::BufVec;
use futures::future::lazy;
use futures::FutureExt;
use std::io;
@ -1501,31 +1495,10 @@ pub mod tests {
futures::executor::block_on(lazy(move |cx| f(cx)));
}
fn poll_until_ready(
runtime: &mut JsRuntime,
max_poll_count: usize,
) -> Result<(), AnyError> {
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
for _ in 0..max_poll_count {
match runtime.poll_event_loop(&mut cx) {
Poll::Pending => continue,
Poll::Ready(val) => return val,
}
}
panic!(
"JsRuntime still not ready after polling {} times.",
max_poll_count
)
}
enum Mode {
Async,
AsyncUnref,
AsyncZeroCopy(u8),
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
OverflowResAsync,
AsyncZeroCopy(bool),
}
struct TestState {
@ -1533,68 +1506,39 @@ pub mod tests {
dispatch_count: Arc<AtomicUsize>,
}
fn dispatch(op_state: Rc<RefCell<OpState>>, bufs: BufVec) -> Op {
fn dispatch(
op_state: Rc<RefCell<OpState>>,
payload: OpPayload,
buf: Option<ZeroCopyBuf>,
) -> Op {
let op_state_ = op_state.borrow();
let test_state = op_state_.borrow::<TestState>();
test_state.dispatch_count.fetch_add(1, Ordering::Relaxed);
match test_state.mode {
Mode::Async => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 1);
assert_eq!(bufs[0][0], 42);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = OpResponse::Value(Box::new(43));
Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncUnref => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 1);
assert_eq!(bufs[0][0], 42);
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
vec![43u8].into_boxed_slice()
OpResponse::Value(Box::new(43))
};
Op::AsyncUnref(fut.boxed())
Op::AsyncUnref(Box::pin(fut))
}
Mode::AsyncZeroCopy(count) => {
assert_eq!(bufs.len(), count as usize);
bufs.iter().enumerate().for_each(|(idx, buf)| {
Mode::AsyncZeroCopy(has_buffer) => {
assert_eq!(buf.is_some(), has_buffer);
if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
assert_eq!(idx, buf[0] as usize);
});
}
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowReqSync => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowResSync => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 1);
assert_eq!(bufs[0][0], 42);
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 99;
let buf = vec.into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowReqAsync => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 1);
assert_eq!(bufs[0][0], 42);
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
let resp = OpResponse::Value(Box::new(43));
Op::Async(Box::pin(futures::future::ready(resp)))
}
}
}
@ -1633,10 +1577,10 @@ pub mod tests {
.execute(
"filename.js",
r#"
let control = new Uint8Array([42]);
Deno.core.send(1, control);
let control = 42;
Deno.core.send(1, null, control);
async function main() {
Deno.core.send(1, control);
Deno.core.send(1, null, control);
}
main();
"#,
@ -1647,7 +1591,7 @@ pub mod tests {
#[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(0));
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime
.execute(
"filename.js",
@ -1661,14 +1605,13 @@ pub mod tests {
#[test]
fn test_dispatch_stack_zero_copy_bufs() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(2));
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(true));
runtime
.execute(
"filename.js",
r#"
let zero_copy_a = new Uint8Array([0]);
let zero_copy_b = new Uint8Array([1]);
Deno.core.send(1, zero_copy_a, zero_copy_b);
Deno.core.send(1, null, null, zero_copy_a);
"#,
)
.unwrap();
@ -1676,23 +1619,7 @@ pub mod tests {
}
#[test]
fn test_dispatch_heap_zero_copy_bufs() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(5));
runtime.execute(
"filename.js",
r#"
let zero_copy_a = new Uint8Array([0]);
let zero_copy_b = new Uint8Array([1]);
let zero_copy_c = new Uint8Array([2]);
let zero_copy_d = new Uint8Array([3]);
let zero_copy_e = new Uint8Array([4]);
Deno.core.send(1, zero_copy_a, zero_copy_b, zero_copy_c, zero_copy_d, zero_copy_e);
"#,
).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
#[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_delayed_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::Async);
@ -1714,8 +1641,8 @@ pub mod tests {
"check1.js",
r#"
assert(nrecv == 0);
let control = new Uint8Array([42]);
Deno.core.send(1, control);
let control = 42;
Deno.core.send(1, null, control);
assert(nrecv == 0);
"#,
)
@ -1728,7 +1655,7 @@ pub mod tests {
"check2.js",
r#"
assert(nrecv == 1);
Deno.core.send(1, control);
Deno.core.send(1, null, control);
assert(nrecv == 1);
"#,
)
@ -1743,6 +1670,7 @@ pub mod tests {
}
#[test]
#[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::AsyncUnref);
@ -1754,8 +1682,8 @@ pub mod tests {
// This handler will never be called
assert(false);
});
let control = new Uint8Array([42]);
Deno.core.send(1, control);
let control = 42;
Deno.core.send(1, null, control);
"#,
)
.unwrap();
@ -1817,262 +1745,10 @@ pub mod tests {
v8_isolate_handle.terminate_execution();
}
#[test]
fn overflow_req_sync() {
let (mut runtime, dispatch_count) = setup(Mode::OverflowReqSync);
runtime
.execute(
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(1, control);
assert(response instanceof Uint8Array);
assert(response.length == 1);
assert(response[0] == 43);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
fn overflow_res_sync() {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
let (mut runtime, dispatch_count) = setup(Mode::OverflowResSync);
runtime
.execute(
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(1, control);
assert(response instanceof Uint8Array);
assert(response.length == 100 * 1024 * 1024);
assert(response[0] == 99);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
fn overflow_req_async() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::OverflowReqAsync);
runtime
.execute(
"overflow_req_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(1, control);
// Async messages always have null response.
assert(response == null);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
runtime
.execute("check.js", "assert(asyncRecv == 1);")
.unwrap();
});
}
#[test]
fn overflow_res_async_combined_with_unref() {
run_in_task(|cx| {
let mut runtime = JsRuntime::new(Default::default());
runtime.register_op(
"test1",
|_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
},
);
runtime.register_op(
"test2",
|_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
let mut vec = vec![0u8; 100 * 1024 * 1024];
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::AsyncUnref(futures::future::ready(buf).boxed())
},
);
runtime
.execute(
"overflow_res_async_combined_with_unref.js",
r#"
function assert(cond) {
if (!cond) {
throw Error("assert");
}
}
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
Deno.core.setAsyncHandler(2, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
let control = new Uint8Array(1);
let response1 = Deno.core.dispatch(1, control);
// Async messages always have null response.
assert(response1 == null);
assert(asyncRecv == 0);
let response2 = Deno.core.dispatch(2, control);
// Async messages always have null response.
assert(response2 == null);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
runtime
.execute("check.js", "assert(asyncRecv == 2);")
.unwrap();
});
}
#[test]
fn overflow_res_async() {
run_in_task(|_cx| {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
runtime
.execute(
"overflow_res_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(1, control);
assert(response == null);
assert(asyncRecv == 0);
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
poll_until_ready(&mut runtime, 3).unwrap();
runtime
.execute("check.js", "assert(asyncRecv == 1);")
.unwrap();
});
}
#[test]
fn overflow_res_multiple_dispatch_async() {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
run_in_task(|_cx| {
let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
runtime
.execute(
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(1, control);
assert(response == null);
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows
Deno.core.dispatch(1, control);
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
poll_until_ready(&mut runtime, 3).unwrap();
runtime
.execute("check.js", "assert(asyncRecv == 2);")
.unwrap();
});
}
#[test]
fn shared_queue_not_empty_when_js_error() {
run_in_task(|_cx| {
let dispatch_count = Arc::new(AtomicUsize::new(0));
let mut runtime = JsRuntime::new(Default::default());
let op_state = runtime.op_state();
op_state.borrow_mut().put(TestState {
mode: Mode::Async,
dispatch_count: dispatch_count.clone(),
});
runtime.register_op("test", dispatch);
runtime
.execute(
"shared_queue_not_empty_when_js_error.js",
r#"
const assert = (cond) => {if (!cond) throw Error("assert")};
let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => {
asyncRecv++;
throw Error('x');
});
Deno.core.dispatch(1, new Uint8Array([42]));
Deno.core.dispatch(1, new Uint8Array([42]));
"#,
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
if poll_until_ready(&mut runtime, 3).is_ok() {
panic!("Thrown error was not detected!")
}
runtime
.execute("check.js", "assert(asyncRecv == 1);")
.unwrap();
let state_rc = JsRuntime::state(runtime.v8_isolate());
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 1);
});
}
#[test]
fn test_pre_dispatch() {
run_in_task(|mut cx| {
let (mut runtime, _dispatch_count) = setup(Mode::OverflowResAsync);
let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute(
"bad_op_id.js",
@ -2093,19 +1769,6 @@ pub mod tests {
});
}
#[test]
fn core_test_js() {
run_in_task(|mut cx| {
let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute("core_test.js", include_str!("core_test.js"))
.unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
unreachable!();
}
});
}
#[test]
fn syntax_error() {
let mut runtime = JsRuntime::new(Default::default());
@ -2315,13 +1978,12 @@ pub mod tests {
let dispatch_count = Arc::new(AtomicUsize::new(0));
let dispatch_count_ = dispatch_count.clone();
let dispatcher = move |_state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
let dispatcher = move |_state, payload: OpPayload, _buf| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(bufs.len(), 1);
assert_eq!(bufs[0].len(), 1);
assert_eq!(bufs[0][0], 42);
let buf = [43u8, 0, 0, 0][..].into();
Op::Async(futures::future::ready(buf).boxed())
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = OpResponse::Value(Box::new(43));
Op::Async(Box::pin(futures::future::ready(resp)))
};
let mut runtime = JsRuntime::new(RuntimeOptions {
@ -2353,8 +2015,8 @@ pub mod tests {
r#"
import { b } from './b.js'
if (b() != 'b') throw Error();
let control = new Uint8Array([42]);
Deno.core.send(1, control);
let control = 42;
Deno.core.send(1, null, control);
"#,
)
.unwrap();

View file

@ -1,313 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
/*
SharedQueue Binary Layout
+-------------------------------+-------------------------------+
| NUM_RECORDS (32) |
+---------------------------------------------------------------+
| NUM_SHIFTED_OFF (32) |
+---------------------------------------------------------------+
| HEAD (32) |
+---------------------------------------------------------------+
| OFFSETS (32) |
+---------------------------------------------------------------+
| RECORD_ENDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
| RECORDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
*/
use crate::bindings;
use crate::ops::OpId;
use log::debug;
use rusty_v8 as v8;
use std::convert::TryInto;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
const INDEX_NUM_RECORDS: usize = 0;
/// Number of records that have been shifted off.
const INDEX_NUM_SHIFTED_OFF: usize = 1;
/// The head is the number of initialized bytes in SharedQueue.
/// It grows monotonically.
const INDEX_HEAD: usize = 2;
const INDEX_OFFSETS: usize = 3;
const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
/// Byte offset of where the records begin. Also where the head starts.
const HEAD_INIT: usize = 4 * INDEX_RECORDS;
/// A rough guess at how big we should make the shared buffer in bytes.
pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS;
pub struct SharedQueue {
buf: v8::SharedRef<v8::BackingStore>,
}
impl SharedQueue {
pub fn new(len: usize) -> Self {
let buf = vec![0; HEAD_INIT + len].into_boxed_slice();
let buf = v8::SharedArrayBuffer::new_backing_store_from_boxed_slice(buf);
let mut q = Self {
buf: buf.make_shared(),
};
q.reset();
q
}
pub fn get_backing_store(&mut self) -> &mut v8::SharedRef<v8::BackingStore> {
&mut self.buf
}
pub fn bytes(&self) -> &[u8] {
unsafe {
bindings::get_backing_store_slice(&self.buf, 0, self.buf.byte_length())
}
}
pub fn bytes_mut(&mut self) -> &mut [u8] {
unsafe {
bindings::get_backing_store_slice_mut(
&self.buf,
0,
self.buf.byte_length(),
)
}
}
fn reset(&mut self) {
debug!("rust:shared_queue:reset");
let s: &mut [u32] = self.as_u32_slice_mut();
s[INDEX_NUM_RECORDS] = 0;
s[INDEX_NUM_SHIFTED_OFF] = 0;
s[INDEX_HEAD] = HEAD_INIT as u32;
}
fn as_u32_slice(&self) -> &[u32] {
let p = self.bytes().as_ptr();
// Assert pointer is 32 bit aligned before casting.
assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *const u32;
unsafe { std::slice::from_raw_parts(p32, self.bytes().len() / 4) }
}
fn as_u32_slice_mut(&mut self) -> &mut [u32] {
let p = self.bytes_mut().as_mut_ptr();
// Assert pointer is 32 bit aligned before casting.
assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *mut u32;
unsafe { std::slice::from_raw_parts_mut(p32, self.bytes().len() / 4) }
}
pub fn size(&self) -> usize {
let s = self.as_u32_slice();
(s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize
}
fn num_records(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_NUM_RECORDS] as usize
}
fn head(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_HEAD] as usize
}
fn num_shifted_off(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_NUM_SHIFTED_OFF] as usize
}
fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
let s = self.as_u32_slice_mut();
s[INDEX_OFFSETS + 2 * index] = end as u32;
s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap();
}
#[cfg(test)]
fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
if index < self.num_records() {
let s = self.as_u32_slice();
let end = s[INDEX_OFFSETS + 2 * index] as usize;
let op_id = s[INDEX_OFFSETS + 2 * index + 1] as OpId;
Some((op_id, end))
} else {
None
}
}
#[cfg(test)]
fn get_offset(&self, index: usize) -> Option<usize> {
if index < self.num_records() {
Some(if index == 0 {
HEAD_INIT
} else {
let s = self.as_u32_slice();
let prev_end = s[INDEX_OFFSETS + 2 * (index - 1)] as usize;
(prev_end + 3) & !3
})
} else {
None
}
}
/// Returns none if empty.
#[cfg(test)]
pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
let u32_slice = self.as_u32_slice();
let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
if self.size() == 0 {
assert_eq!(i, 0);
return None;
}
let off = self.get_offset(i).unwrap();
let (op_id, end) = self.get_meta(i).unwrap();
if self.size() > 1 {
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
} else {
self.reset();
}
println!(
"rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
Some((op_id, &self.bytes()[off..end]))
}
/// Because JS-side may cast popped message to Int32Array it is required
/// that every message is aligned to 4-bytes.
pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
let off = self.head();
assert_eq!(off % 4, 0);
let end = off + record.len();
let aligned_end = (end + 3) & !3;
debug!(
"rust:shared_queue:pre-push: op={}, off={}, end={}, len={}, aligned_end={}",
op_id,
off,
end,
record.len(),
aligned_end,
);
let index = self.num_records();
if aligned_end > self.bytes().len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
assert_eq!(aligned_end % 4, 0);
self.set_meta(index, end, op_id);
assert_eq!(end - off, record.len());
self.bytes_mut()[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = aligned_end as u32;
debug!(
"rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
let h = q.head();
assert!(h > 0);
let r = vec![1u8, 2, 3, 4].into_boxed_slice();
let len = r.len() + h;
assert!(q.push(0, &r));
assert_eq!(q.head(), len);
let r = vec![5, 6, 7, 8].into_boxed_slice();
assert!(q.push(0, &r));
let r = vec![9, 10, 11, 12].into_boxed_slice();
assert!(q.push(0, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![1, 2, 3, 4].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![5, 6, 7, 8].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![9, 10, 11, 12].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
assert!(q.shift().is_none());
assert!(q.shift().is_none());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
}
fn alloc_buf(byte_length: usize) -> Box<[u8]> {
vec![0; byte_length].into_boxed_slice()
}
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 5)));
assert_eq!(q.size(), 1);
assert!(!q.push(0, &alloc_buf(6)));
assert_eq!(q.size(), 1);
assert!(q.push(0, &alloc_buf(1)));
assert_eq!(q.size(), 2);
let (_op_id, buf) = q.shift().unwrap();
assert_eq!(buf.len(), RECOMMENDED_SIZE - 5);
assert_eq!(q.size(), 1);
assert!(!q.push(0, &alloc_buf(1)));
let (_op_id, buf) = q.shift().unwrap();
assert_eq!(buf.len(), 1);
assert_eq!(q.size(), 0);
}
#[test]
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
assert!(q.push(0, &alloc_buf(1)))
}
assert_eq!(q.push(0, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
let _ignored = q.shift().unwrap();
assert_eq!(q.push(0, &alloc_buf(1)), false);
}
#[test]
fn allow_any_buf_length() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
// Check that `record` that has length not a multiple of 4 will
// not cause panic. Still make sure that records are always
// aligned to 4 bytes.
for i in 1..9 {
q.push(0, &alloc_buf(i));
assert_eq!(q.num_records(), i);
assert_eq!(q.head() % 4, 0);
}
}
}

View file

@ -101,27 +101,27 @@ impl OpMetrics {
}
}
use deno_core::BufVec;
use deno_core::Op;
use deno_core::OpFn;
use deno_core::OpState;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
Box::new(move |op_state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
Box::new(move |op_state, payload, buf| -> Op {
// TODOs:
// * The 'bytes' metrics seem pretty useless, especially now that the
// distinction between 'control' and 'data' buffers has become blurry.
// * Tracking completion of async ops currently makes us put the boxed
// future into _another_ box. Keeping some counters may not be expensive
// in itself, but adding a heap allocation for every metric seems bad.
let mut buf_len_iter = bufs.iter().map(|buf| buf.len());
let bytes_sent_control = buf_len_iter.next().unwrap_or(0);
let bytes_sent_data = buf_len_iter.sum();
let op = (op_fn)(op_state.clone(), bufs);
// TODO: remove this, doesn't make a ton of sense
let bytes_sent_control = 0;
let bytes_sent_data = match buf {
Some(ref b) => b.len(),
None => 0,
};
let op = (op_fn)(op_state.clone(), payload, buf);
let op_state_ = op_state.clone();
let mut s = op_state.borrow_mut();
@ -138,17 +138,17 @@ pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
match op {
Op::Sync(buf) => {
metrics.op_sync(bytes_sent_control, bytes_sent_data, buf.len());
metrics.op_sync(bytes_sent_control, bytes_sent_data, 0);
Op::Sync(buf)
}
Op::Async(fut) => {
metrics.op_dispatched_async(bytes_sent_control, bytes_sent_data);
let fut = fut
.inspect(move |buf| {
.inspect(move |_resp| {
let mut s = op_state_.borrow_mut();
let runtime_metrics = s.borrow_mut::<RuntimeMetrics>();
let metrics = runtime_metrics.ops.get_mut(name).unwrap();
metrics.op_completed_async(buf.len());
metrics.op_completed_async(0);
})
.boxed_local();
Op::Async(fut)
@ -156,11 +156,11 @@ pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
Op::AsyncUnref(fut) => {
metrics.op_dispatched_async_unref(bytes_sent_control, bytes_sent_data);
let fut = fut
.inspect(move |buf| {
.inspect(move |_resp| {
let mut s = op_state_.borrow_mut();
let runtime_metrics = s.borrow_mut::<RuntimeMetrics>();
let metrics = runtime_metrics.ops.get_mut(name).unwrap();
metrics.op_completed_async_unref(buf.len());
metrics.op_completed_async_unref(0);
})
.boxed_local();
Op::AsyncUnref(fut)

View file

@ -48,7 +48,7 @@ pub fn reg_json_async<F, V, R, RV>(
F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
V: DeserializeOwned,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: Serialize,
RV: Serialize + 'static,
{
rt.register_op(name, metrics_op(name, json_op_async(op_fn)));
}
@ -57,7 +57,7 @@ pub fn reg_json_sync<F, V, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
where
F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
V: DeserializeOwned,
R: Serialize,
R: Serialize + 'static,
{
rt.register_op(name, metrics_op(name, json_op_sync(op_fn)));
}

View file

@ -10,6 +10,7 @@ use deno_core::BufVec;
use deno_core::JsRuntime;
use deno_core::Op;
use deno_core::OpAsyncFuture;
use deno_core::OpFn;
use deno_core::OpId;
use deno_core::OpState;
use deno_core::Resource;
@ -18,7 +19,6 @@ use dlopen::symbor::Library;
use log::debug;
use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
@ -110,11 +110,17 @@ impl<'a> plugin_api::Interface for PluginInterface<'a> {
dispatch_op_fn: plugin_api::DispatchOpFn,
) -> OpId {
let plugin_lib = self.plugin_lib.clone();
let plugin_op_fn = move |state_rc: Rc<RefCell<OpState>>,
mut zero_copy: BufVec| {
let plugin_op_fn: Box<OpFn> = Box::new(move |state_rc, _payload, buf| {
// For sig compat map Option<ZeroCopyBuf> to BufVec
let mut bufs: BufVec = match buf {
Some(b) => vec![b],
None => vec![],
}
.into();
let mut state = state_rc.borrow_mut();
let mut interface = PluginInterface::new(&mut state, &plugin_lib);
let op = dispatch_op_fn(&mut interface, &mut zero_copy);
let op = dispatch_op_fn(&mut interface, &mut bufs);
match op {
sync_op @ Op::Sync(..) => sync_op,
Op::Async(fut) => Op::Async(PluginOpAsyncFuture::new(&plugin_lib, fut)),
@ -123,13 +129,10 @@ impl<'a> plugin_api::Interface for PluginInterface<'a> {
}
_ => unreachable!(),
}
};
});
self.state.op_table.register_op(
name,
metrics_op(
Box::leak(Box::new(name.to_string())),
Box::new(plugin_op_fn),
),
metrics_op(Box::leak(Box::new(name.to_string())), plugin_op_fn),
)
}
}

View file

@ -255,6 +255,14 @@ impl<'de> de::Visitor<'de> for ParseBooleanOrStringVec {
formatter.write_str("a vector of strings or a boolean")
}
// visit_unit maps undefined/missing values to false
fn visit_unit<E>(self) -> Result<UnaryPermissionBase, E>
where
E: de::Error,
{
self.visit_bool(false)
}
fn visit_bool<E>(self, v: bool) -> Result<UnaryPermissionBase, E>
where
E: de::Error,

View file

@ -92,7 +92,16 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
match ValueType::from_v8(self.input) {
ValueType::Null => self.deserialize_unit(visitor),
ValueType::Bool => self.deserialize_bool(visitor),
ValueType::Number => self.deserialize_f64(visitor),
// Handle floats & ints separately to work with loosely-typed serde_json
ValueType::Number => {
if self.input.is_uint32() {
self.deserialize_u32(visitor)
} else if self.input.is_int32() {
self.deserialize_i32(visitor)
} else {
self.deserialize_f64(visitor)
}
}
ValueType::String => self.deserialize_string(visitor),
ValueType::Array => self.deserialize_seq(visitor),
ValueType::Object => self.deserialize_map(visitor),
@ -103,11 +112,8 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
where
V: Visitor<'de>,
{
if self.input.is_boolean() {
visitor.visit_bool(self.input.boolean_value(&mut self.scope))
} else {
Err(Error::ExpectedBoolean)
}
// Relaxed typechecking, will map all non-true vals to false
visitor.visit_bool(self.input.is_true())
}
deserialize_signed!(deserialize_i8, visit_i8, i8);
@ -148,7 +154,12 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
V: Visitor<'de>,
{
if self.input.is_string() {
let string = self.input.to_rust_string_lossy(self.scope);
// TODO(@AaronO): implement a `.to_rust_string -> Option<String>` in rusty-v8
let v8_string = v8::Local::<v8::String>::try_from(self.input).unwrap();
let string = match v8_to_rust_string(self.scope, v8_string) {
Some(string) => string,
None => return Err(Error::ExpectedUtf8),
};
visitor.visit_string(string)
} else {
Err(Error::ExpectedString)
@ -209,7 +220,8 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
where
V: Visitor<'de>,
{
let arr = v8::Local::<v8::Array>::try_from(self.input).unwrap();
let arr = v8::Local::<v8::Array>::try_from(self.input)
.map_err(|_| Error::ExpectedArray)?;
let len = arr.length();
let obj = v8::Local::<v8::Object>::from(arr);
let seq = SeqAccess {
@ -261,8 +273,13 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
Some(names) => from_v8(self.scope, names.into()).unwrap(),
None => vec![],
};
let keys: Vec<v8::Local<v8::Value>> =
keys.drain(..).map(|x| x.into()).collect();
let keys: Vec<v8::Local<v8::Value>> = keys
.drain(..)
.map(|x| x.into())
// Filter keys to drop keys whose value is undefined
// TODO: optimize, since this doubles our get calls
.filter(|key| !obj.get(self.scope, *key).unwrap().is_undefined())
.collect();
let map = MapAccess {
obj,
@ -305,16 +322,51 @@ impl<'de, 'a, 'b, 's, 'x> de::Deserializer<'de>
visitor.visit_map(map)
}
/// To be compatible with `serde-json`, we expect enums to be:
/// - `"Variant"`: strings for unit variants, i.e: Enum::Variant
/// - `{ Variant: payload }`: single K/V pairs, converted to `Enum::Variant { payload }`
fn deserialize_enum<V>(
self,
_name: &str,
_variants: &'static [&'static str],
_visitor: V,
visitor: V,
) -> Result<V::Value>
where
V: Visitor<'de>,
{
unimplemented!();
// Unit variant
if self.input.is_string() {
let payload = v8::undefined(self.scope).into();
visitor.visit_enum(EnumAccess {
scope: self.scope,
tag: self.input,
payload,
})
}
// Struct or tuple variant
else if self.input.is_object() {
// Assume object
let obj = v8::Local::<v8::Object>::try_from(self.input).unwrap();
// Unpack single-key
let tag = {
let prop_names = obj.get_own_property_names(self.scope);
let prop_names = prop_names.ok_or(Error::ExpectedEnum)?;
if prop_names.length() != 1 {
return Err(Error::LengthMismatch);
}
prop_names.get_index(self.scope, 0).unwrap()
};
let payload = obj.get(self.scope, tag).unwrap();
visitor.visit_enum(EnumAccess {
scope: self.scope,
tag,
payload,
})
} else {
// TODO: improve error
Err(Error::ExpectedEnum)
}
}
// An identifier in Serde is the type that identifies a field of a struct or
@ -483,3 +535,85 @@ impl<'de> de::SeqAccess<'de> for SeqAccess<'_, '_, '_> {
}
}
}
struct EnumAccess<'a, 'b, 's> {
tag: v8::Local<'a, v8::Value>,
payload: v8::Local<'a, v8::Value>,
scope: &'b mut v8::HandleScope<'s>,
// p1: std::marker::PhantomData<&'x ()>,
}
impl<'de, 'a, 'b, 's, 'x> de::EnumAccess<'de> for EnumAccess<'a, 'b, 's> {
type Error = Error;
type Variant = VariantDeserializer<'a, 'b, 's>;
fn variant_seed<V: de::DeserializeSeed<'de>>(
self,
seed: V,
) -> Result<(V::Value, Self::Variant)> {
let seed = {
let mut dtag = Deserializer::new(self.scope, self.tag, None);
seed.deserialize(&mut dtag)
};
let dpayload = VariantDeserializer::<'a, 'b, 's> {
scope: self.scope,
value: self.payload,
};
Ok((seed?, dpayload))
}
}
struct VariantDeserializer<'a, 'b, 's> {
value: v8::Local<'a, v8::Value>,
scope: &'b mut v8::HandleScope<'s>,
}
impl<'de, 'a, 'b, 's> de::VariantAccess<'de>
for VariantDeserializer<'a, 'b, 's>
{
type Error = Error;
fn unit_variant(self) -> Result<()> {
let mut d = Deserializer::new(self.scope, self.value, None);
de::Deserialize::deserialize(&mut d)
}
fn newtype_variant_seed<T: de::DeserializeSeed<'de>>(
self,
seed: T,
) -> Result<T::Value> {
let mut d = Deserializer::new(self.scope, self.value, None);
seed.deserialize(&mut d)
}
fn tuple_variant<V: de::Visitor<'de>>(
self,
len: usize,
visitor: V,
) -> Result<V::Value> {
let mut d = Deserializer::new(self.scope, self.value, None);
de::Deserializer::deserialize_tuple(&mut d, len, visitor)
}
fn struct_variant<V: de::Visitor<'de>>(
self,
fields: &'static [&'static str],
visitor: V,
) -> Result<V::Value> {
let mut d = Deserializer::new(self.scope, self.value, None);
de::Deserializer::deserialize_struct(&mut d, "", fields, visitor)
}
}
// Like v8::String::to_rust_string_lossy except returns None on non-utf8
fn v8_to_rust_string(
scope: &mut v8::HandleScope,
s: v8::Local<v8::String>,
) -> Option<String> {
let string = s.to_rust_string_lossy(scope);
match string.find(std::char::REPLACEMENT_CHARACTER) {
Some(_) => None,
None => Some(string),
}
}

View file

@ -17,6 +17,8 @@ pub enum Error {
ExpectedMap,
ExpectedEnum,
ExpectedUtf8,
LengthMismatch,
}

View file

@ -12,6 +12,22 @@ struct MathOp {
pub operator: Option<String>,
}
#[derive(Debug, PartialEq, Deserialize)]
enum EnumUnit {
A,
B,
C,
}
#[derive(Debug, PartialEq, Deserialize)]
enum EnumPayloads {
UInt(u64),
Int(i64),
Float(f64),
Point { x: i64, y: i64 },
Tuple(bool, i64, ()),
}
fn dedo(
code: &str,
f: impl FnOnce(&mut v8::HandleScope, v8::Local<v8::Value>),
@ -73,6 +89,43 @@ detest!(
}
);
// Unit enums
detest!(de_enum_unit_a, EnumUnit, "'A'", EnumUnit::A);
detest!(de_enum_unit_b, EnumUnit, "'B'", EnumUnit::B);
detest!(de_enum_unit_c, EnumUnit, "'C'", EnumUnit::C);
// Enums with payloads (tuples & struct)
detest!(
de_enum_payload_int,
EnumPayloads,
"({ Int: -123 })",
EnumPayloads::Int(-123)
);
detest!(
de_enum_payload_uint,
EnumPayloads,
"({ UInt: 123 })",
EnumPayloads::UInt(123)
);
detest!(
de_enum_payload_float,
EnumPayloads,
"({ Float: 1.23 })",
EnumPayloads::Float(1.23)
);
detest!(
de_enum_payload_point,
EnumPayloads,
"({ Point: { x: 1, y: 2 } })",
EnumPayloads::Point { x: 1, y: 2 }
);
detest!(
de_enum_payload_tuple,
EnumPayloads,
"({ Tuple: [true, 123, null ] })",
EnumPayloads::Tuple(true, 123, ())
);
#[test]
fn de_f64() {
dedo("12345.0", |scope, v| {
@ -114,7 +167,7 @@ detest!(
de_json_int,
serde_json::Value,
"123",
serde_json::Value::Number(serde_json::Number::from_f64(123.0).unwrap())
serde_json::Value::Number(serde_json::Number::from(123))
);
detest!(
de_json_float,
@ -156,7 +209,7 @@ detest!(
vec![
(
"a".to_string(),
serde_json::Value::Number(serde_json::Number::from_f64(1.0).unwrap()),
serde_json::Value::Number(serde_json::Number::from(1)),
),
(
"b".to_string(),

View file

@ -2,6 +2,7 @@
use deno_core::plugin_api::Interface;
use deno_core::plugin_api::Op;
use deno_core::plugin_api::OpResponse;
use deno_core::plugin_api::ZeroCopyBuf;
use futures::future::FutureExt;
@ -25,7 +26,7 @@ fn op_test_sync(
}
let result = b"test";
let result_box: Box<[u8]> = Box::new(*result);
Op::Sync(result_box)
Op::Sync(OpResponse::Buffer(result_box))
}
fn op_test_async(
@ -49,7 +50,7 @@ fn op_test_async(
assert!(rx.await.is_ok());
let result = b"test";
let result_box: Box<[u8]> = Box::new(*result);
result_box
OpResponse::Buffer(result_box)
};
Op::Async(fut.boxed())

View file

@ -10,6 +10,11 @@ const BUILD_VARIANT: &str = "debug";
const BUILD_VARIANT: &str = "release";
#[test]
// TODO: re-enable after adapting plugins to new op-layer
// see:
// - https://github.com/denoland/deno/pull/9843
// - https://github.com/denoland/deno/pull/9850
#[ignore]
fn basic() {
let mut build_plugin_base = Command::new("cargo");
let mut build_plugin =

View file

@ -633,6 +633,7 @@
],
"idlharness.any.js": false,
"url-constructor.any.js": [
"Parsing: <https://x/<2F>?<3F>#<23>> against <about:blank>",
"Parsing: <http://example.com/\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿?\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿> against <about:blank>",
"Parsing: <file://%43%7C> against <about:blank>",
"Parsing: <file://%43|> against <about:blank>",
@ -686,7 +687,8 @@
"Parsing: <path> against <non-spec:/..//p>"
],
"url-origin.any.js": [
"Origin parsing: <http://example.com/\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿?\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿> against <about:blank>"
"Origin parsing: <http://example.com/\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿?\ud800𐟾\udfff﷐﷏﷯ﷰ￾￿> against <about:blank>",
"Origin parsing: <https://x/<2F>?<3F>#<23>> against <about:blank>"
],
"url-searchparams.any.js": true,
"url-setters-stripping.any.js": [
@ -742,7 +744,12 @@
"urlsearchparams-getall.any.js": true,
"urlsearchparams-has.any.js": true,
"urlsearchparams-set.any.js": true,
"urlsearchparams-sort.any.js": true,
"urlsearchparams-sort.any.js": [
"Parse and sort: <20>=x&&<26>=a",
"URL parse and sort: <20>=x&&<26>=a",
"Parse and sort: é&e<>&é",
"URL parse and sort: é&e<>&é"
],
"urlsearchparams-stringifier.any.js": true
},
"fetch": {