diff --git a/cli/tests/unit/dispatch_buffer_test.ts b/cli/tests/unit/dispatch_bin_test.ts similarity index 86% rename from cli/tests/unit/dispatch_buffer_test.ts rename to cli/tests/unit/dispatch_bin_test.ts index 0e213fe3b6..b2d96f3b35 100644 --- a/cli/tests/unit/dispatch_buffer_test.ts +++ b/cli/tests/unit/dispatch_bin_test.ts @@ -8,9 +8,9 @@ import { const readErrorStackPattern = new RegExp( `^.* - at handleError \\(.*10_dispatch_buffer\\.js:.*\\) - at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\) - at Array. \\(.*10_dispatch_buffer\\.js:.*\\).*$`, + at handleError \\(.*core\\.js:.*\\) + at binOpParseResult \\(.*core\\.js:.*\\) + at asyncHandle \\(.*core\\.js:.*\\).*$`, "ms", ); @@ -33,7 +33,7 @@ declare global { } } -unitTest(function bufferOpsHeaderTooShort(): void { +unitTest(function binOpsHeaderTooShort(): void { for (const op of ["op_read_sync", "op_read_async"]) { const readOpId = Deno.core.ops()[op]; const res = Deno.core.send( diff --git a/cli/tests/unit/dispatch_json_test.ts b/cli/tests/unit/dispatch_json_test.ts index c283e20c99..3cb9506dd3 100644 --- a/cli/tests/unit/dispatch_json_test.ts +++ b/cli/tests/unit/dispatch_json_test.ts @@ -19,7 +19,7 @@ unitTest(function malformedJsonControlBuffer(): void { assertMatch(resObj.err.message, /\bexpected value\b/); }); -unitTest(function invalidPromiseId(): void { +unitTest(function invalidRequestId(): void { const opId = Deno.core.ops()["op_open_async"]; const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]); const resBuf = Deno.core.send(opId, reqBuf); @@ -28,5 +28,5 @@ unitTest(function invalidPromiseId(): void { console.error(resText); assertStrictEquals(resObj.ok, undefined); assertStrictEquals(resObj.err.className, "TypeError"); - assertMatch(resObj.err.message, /\bpromiseId\b/); + assertMatch(resObj.err.message, /\brequestId\b/); }); diff --git a/cli/tests/unit/unit_tests.ts b/cli/tests/unit/unit_tests.ts index 6277abdfe1..d804033662 100644 --- a/cli/tests/unit/unit_tests.ts +++ b/cli/tests/unit/unit_tests.ts @@ -15,7 +15,7 @@ import "./console_test.ts"; import "./copy_file_test.ts"; import "./custom_event_test.ts"; import "./dir_test.ts"; -import "./dispatch_buffer_test.ts"; +import "./dispatch_bin_test.ts"; import "./dispatch_json_test.ts"; import "./error_stack_test.ts"; import "./event_test.ts"; diff --git a/core/core.js b/core/core.js index f44bf253ef..2de8e1fffa 100644 --- a/core/core.js +++ b/core/core.js @@ -30,13 +30,22 @@ SharedQueue Binary Layout const core = window.Deno.core; const { recv, send } = core; + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////////// Dispatch ///////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const dispatch = send; + const dispatchByName = (opName, control, ...zeroCopy) => + dispatch(opsCache[opName], control, ...zeroCopy); + + //////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////// Shared array buffer /////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + let sharedBytes; let shared32; - let asyncHandlers; - let opsCache = {}; - const errorMap = {}; function init() { const shared = core.shared; @@ -45,6 +54,7 @@ SharedQueue Binary Layout assert(shared32 == null); sharedBytes = new Uint8Array(shared); shared32 = new Int32Array(shared); + asyncHandlers = []; // Callers should not call core.recv, use setAsyncHandler. recv(handleAsyncMsgFromRust); @@ -150,15 +160,43 @@ SharedQueue Binary Layout return [opId, buf]; } + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Error handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const errorMap = {}; + + function registerErrorClass(errorName, className, args) { + if (typeof errorMap[errorName] !== "undefined") { + throw new TypeError(`Error class for "${errorName}" already registered`); + } + errorMap[errorName] = [className, args ?? []]; + } + + function handleError(className, message) { + if (typeof errorMap[className] === "undefined") { + return new Error( + `Unregistered error class: "${className}"\n` + + ` ${message}\n` + + ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, + ); + } + + const [ErrorClass, args] = errorMap[className]; + return new ErrorClass(message, ...args); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Async handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + let asyncHandlers = []; + function setAsyncHandler(opId, cb) { assert(opId != null); asyncHandlers[opId] = cb; } - function setAsyncHandlerByName(opName, cb) { - setAsyncHandler(opsCache[opName], cb); - } - function handleAsyncMsgFromRust() { while (true) { const opIdBuf = shift(); @@ -166,108 +204,196 @@ SharedQueue Binary Layout break; } assert(asyncHandlers[opIdBuf[0]] != null); - asyncHandlers[opIdBuf[0]](opIdBuf[1]); + asyncHandlers[opIdBuf[0]](opIdBuf[1], true); } for (let i = 0; i < arguments.length; i += 2) { - asyncHandlers[arguments[i]](arguments[i + 1]); + asyncHandlers[arguments[i]](arguments[i + 1], false); } } - function dispatch(opName, control, ...zeroCopy) { - return send(opsCache[opName], control, ...zeroCopy); - } + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////// General sync & async ops handling //////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// - function registerErrorClass(errorName, className, args) { - if (typeof errorMap[errorName] !== "undefined") { - throw new TypeError(`Error class for "${errorName}" already registered`); - } - errorMap[errorName] = [className, args ?? []]; - } - - function getErrorClassAndArgs(errorName) { - return errorMap[errorName] ?? [undefined, []]; - } - - // Returns Uint8Array - function encodeJson(args) { - const s = JSON.stringify(args); - return core.encode(s); - } - - function decodeJson(ui8) { - const s = core.decode(ui8); - return JSON.parse(s); - } - - let nextPromiseId = 1; + let nextRequestId = 1; const promiseTable = {}; - function processResponse(res) { - if ("ok" in res) { - return res.ok; + function asyncHandle(u8Array, isCopyNeeded, opResultParser) { + const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded); + if (error !== null) { + promiseTable[requestId][1](error); + } else { + promiseTable[requestId][0](result); } - const [ErrorClass, args] = getErrorClassAndArgs(res.err.className); - if (!ErrorClass) { - throw new Error( - `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, - ); - } - throw new ErrorClass(res.err.message, ...args); + delete promiseTable[requestId]; } - async function jsonOpAsync(opName, args = null, ...zeroCopy) { - setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); + function opAsync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + // Make sure requests of this type are handled by the asyncHandler + // The asyncHandler's role is to call the "promiseTable[requestId]" function + if (typeof asyncHandlers[opId] === "undefined") { + asyncHandlers[opId] = (buffer, isCopyNeeded) => + asyncHandle(buffer, isCopyNeeded, opResultParser); + } - const promiseId = nextPromiseId++; - const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args)); - new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId)); - dispatch(opName, reqBuf, ...zeroCopy); - let resolve, reject; - const promise = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; + const requestId = nextRequestId++; + + // Create and store promise + const promise = new Promise((resolve, reject) => { + promiseTable[requestId] = [resolve, reject]; }); - promise.resolve = resolve; - promise.reject = reject; - promiseTable[promiseId] = promise; - return processResponse(await promise); + + // Synchronously dispatch async request + core.dispatch(opId, ...opRequestBuilder(requestId)); + + // Wait for async response + return promise; } - function jsonOpSync(opName, args = null, ...zeroCopy) { - const argsBuf = encodeJson(args); - const res = dispatch(opName, argsBuf, ...zeroCopy); - return processResponse(decodeJson(res)); + function opSync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + const u8Array = core.dispatch(opId, ...opRequestBuilder()); + + const [_, result, error] = opResultParser(u8Array, false); + if (error !== null) throw error; + return result; } - function jsonOpAsyncHandler(buf) { - // Json Op. - const res = decodeJson(buf); - const promise = promiseTable[res.promiseId]; - delete promiseTable[res.promiseId]; - promise.resolve(res); + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Bin ops handling ///////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const binRequestHeaderByteLength = 8 + 4; + const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength); + const scratchView = new DataView(scratchBuffer); + + function binOpBuildRequest(requestId, argument, zeroCopy) { + scratchView.setBigUint64(0, BigInt(requestId), true); + scratchView.setUint32(8, argument, true); + return [scratchView, ...zeroCopy]; + } + + function binOpParseResult(u8Array, isCopyNeeded) { + // Decode header value from u8Array + const headerByteLength = 8 + 2 * 4; + assert(u8Array.byteLength >= headerByteLength); + assert(u8Array.byteLength % 4 == 0); + const view = new DataView( + u8Array.buffer, + u8Array.byteOffset + u8Array.byteLength - headerByteLength, + headerByteLength, + ); + + const requestId = Number(view.getBigUint64(0, true)); + const status = view.getUint32(8, true); + const result = view.getUint32(12, true); + + // Error handling + if (status !== 0) { + const className = core.decode(u8Array.subarray(0, result)); + const message = core.decode(u8Array.subarray(result, -headerByteLength)) + .trim(); + + return [requestId, null, handleError(className, message)]; + } + + if (u8Array.byteLength === headerByteLength) { + return [requestId, result, null]; + } + + // Rest of response buffer is passed as reference or as a copy + let respBuffer = null; + if (isCopyNeeded) { + // Copy part of the response array (if sent through shared array buf) + respBuffer = u8Array.slice(0, result); + } else { + // Create view on existing array (if sent through overflow) + respBuffer = u8Array.subarray(0, result); + } + + return [requestId, respBuffer, null]; + } + + function binOpAsync(opName, argument = 0, ...zeroCopy) { + return opAsync( + opName, + (requestId) => binOpBuildRequest(requestId, argument, zeroCopy), + binOpParseResult, + ); + } + + function binOpSync(opName, argument = 0, ...zeroCopy) { + return opSync( + opName, + () => binOpBuildRequest(0, argument, zeroCopy), + binOpParseResult, + ); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Json ops handling //////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const jsonRequestHeaderLength = 8; + + function jsonOpBuildRequest(requestId, argument, zeroCopy) { + const u8Array = core.encode( + "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument), + ); + new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true); + return [u8Array, ...zeroCopy]; + } + + function jsonOpParseResult(u8Array, _) { + const data = JSON.parse(core.decode(u8Array)); + + if ("err" in data) { + return [ + data.requestId, + null, + handleError(data.err.className, data.err.message), + ]; + } + + return [data.requestId, data.ok, null]; + } + + function jsonOpAsync(opName, argument = null, ...zeroCopy) { + return opAsync( + opName, + (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy), + jsonOpParseResult, + ); + } + + function jsonOpSync(opName, argument = null, ...zeroCopy) { + return opSync( + opName, + () => [core.encode(JSON.stringify(argument)), ...zeroCopy], + jsonOpParseResult, + ); } function resources() { return jsonOpSync("op_resources"); } - function close(rid) { - jsonOpSync("op_close", { rid }); + return jsonOpSync("op_close", { rid }); } Object.assign(window.Deno.core, { jsonOpAsync, jsonOpSync, - setAsyncHandler, - setAsyncHandlerByName, - dispatch: send, - dispatchByName: dispatch, + binOpAsync, + binOpSync, + dispatch, + dispatchByName, ops, close, resources, registerErrorClass, - getErrorClassAndArgs, sharedQueueInit: init, // sharedQueue is private but exposed for testing. sharedQueue: { @@ -279,5 +405,7 @@ SharedQueue Binary Layout reset, shift, }, + // setAsyncHandler is private but exposed for testing. + setAsyncHandler, }); })(this); diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js index f203664947..18f98419f1 100644 --- a/core/examples/http_bench_bin_ops.js +++ b/core/examples/http_bench_bin_ops.js @@ -8,85 +8,15 @@ const responseBuf = new Uint8Array( .split("") .map((c) => c.charCodeAt(0)), ); -const promiseMap = new Map(); -let nextPromiseId = 1; - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -function createResolvable() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - promise.resolve = resolve; - promise.reject = reject; - return promise; -} - -const scratch32 = new Int32Array(3); -const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength, -); -assert(scratchBytes.byteLength === 3 * 4); - -function send(promiseId, opId, rid, ...zeroCopy) { - scratch32[0] = promiseId; - scratch32[1] = rid; - scratch32[2] = -1; - return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); -} - -/** Returns Promise */ -function sendAsync(opId, rid, ...zeroCopy) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - const buf = send(promiseId, opId, rid, ...zeroCopy); - if (buf) { - const record = recordFromBuf(buf); - // Sync result. - p.resolve(record.result); - } else { - // Async result. - promiseMap.set(promiseId, p); - } - return p; -} - -/** Returns i32 number */ -function sendSync(opId, rid) { - const buf = send(0, opId, rid); - const record = recordFromBuf(buf); - return record[2]; -} - -function recordFromBuf(buf) { - assert(buf.byteLength === 3 * 4); - return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); -} - -function handleAsyncMsgFromRust(buf) { - const record = recordFromBuf(buf); - const p = promiseMap.get(record[0]); - promiseMap.delete(record[0]); - p.resolve(record[2]); -} /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(ops["listen"], -1); + return Deno.core.binOpSync("listen"); } /** Accepts a connection, returns rid. */ function accept(rid) { - return sendAsync(ops["accept"], rid); + return Deno.core.binOpAsync("accept", rid); } /** @@ -94,16 +24,16 @@ function accept(rid) { * Returns bytes read. */ function read(rid, data) { - return sendAsync(ops["read"], rid, data); + return Deno.core.binOpAsync("read", rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ function write(rid, data) { - return sendAsync(ops["write"], rid, data); + return Deno.core.binOpAsync("write", rid, data); } function close(rid) { - return sendSync(ops["close"], rid); + Deno.core.binOpSync("close", rid); } async function serve(rid) { @@ -121,16 +51,14 @@ async function serve(rid) { close(rid); } -let ops; - async function main() { - ops = Deno.core.ops(); - for (const opName in ops) { - Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust); - } + Deno.core.ops(); + Deno.core.registerErrorClass("Error", Error); const listenerRid = listen(); - Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`); + Deno.core.print( + `http_bench_bin_ops listening on http://127.0.0.1:4544/\n`, + ); for (;;) { const rid = await accept(listenerRid); diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index bc4ca4dce9..1f649b235d 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,30 +3,23 @@ #[macro_use] extern crate log; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::JsRuntime; -use deno_core::Op; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use futures::future::FutureExt; -use futures::future::TryFuture; -use futures::future::TryFutureExt; use std::cell::RefCell; use std::convert::TryFrom; -use std::convert::TryInto; use std::env; -use std::fmt::Debug; use std::io::Error; -use std::io::ErrorKind; -use std::mem::size_of; use std::net::SocketAddr; -use std::ptr; use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -120,52 +113,21 @@ impl From for TcpStream { } } -#[derive(Copy, Clone, Debug, PartialEq)] -struct Record { - promise_id: u32, - rid: ResourceId, - result: i32, -} - -type RecordBuf = [u8; size_of::()]; - -impl From<&[u8]> for Record { - fn from(buf: &[u8]) -> Self { - assert_eq!(buf.len(), size_of::()); - unsafe { *(buf as *const _ as *const RecordBuf) }.into() - } -} - -impl From for Record { - fn from(buf: RecordBuf) -> Self { - unsafe { - #[allow(clippy::cast_ptr_alignment)] - ptr::read_unaligned(&buf as *const _ as *const Self) - } - } -} - -impl From for RecordBuf { - fn from(record: Record) -> Self { - unsafe { ptr::read(&record as *const _ as *const Self) } - } -} - fn create_js_runtime() -> JsRuntime { - let mut js_runtime = JsRuntime::new(Default::default()); - register_op_bin_sync(&mut js_runtime, "listen", op_listen); - register_op_bin_sync(&mut js_runtime, "close", op_close); - register_op_bin_async(&mut js_runtime, "accept", op_accept); - register_op_bin_async(&mut js_runtime, "read", op_read); - register_op_bin_async(&mut js_runtime, "write", op_write); - js_runtime + let mut runtime = JsRuntime::new(Default::default()); + runtime.register_op("listen", deno_core::bin_op_sync(op_listen)); + runtime.register_op("close", deno_core::bin_op_sync(op_close)); + runtime.register_op("accept", deno_core::bin_op_async(op_accept)); + runtime.register_op("read", deno_core::bin_op_async(op_read)); + runtime.register_op("write", deno_core::bin_op_async(op_write)); + runtime } fn op_listen( state: &mut OpState, _rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result { +) -> Result { debug!("listen"); let addr = "127.0.0.1:4544".parse::().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; @@ -179,7 +141,7 @@ fn op_close( state: &mut OpState, rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result { +) -> Result { debug!("close rid={}", rid); state .resource_table @@ -192,7 +154,7 @@ async fn op_accept( state: Rc>, rid: ResourceId, _bufs: BufVec, -) -> Result { +) -> Result { debug!("accept rid={}", rid); let listener = state @@ -209,7 +171,7 @@ async fn op_read( state: Rc>, rid: ResourceId, mut bufs: BufVec, -) -> Result { +) -> Result { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("read rid={}", rid); @@ -218,14 +180,15 @@ async fn op_read( .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; - stream.read(&mut bufs[0]).await + let nread = stream.read(&mut bufs[0]).await?; + Ok(nread as u32) } async fn op_write( state: Rc>, rid: ResourceId, bufs: BufVec, -) -> Result { +) -> Result { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("write rid={}", rid); @@ -234,70 +197,8 @@ async fn op_write( .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; - stream.write(&bufs[0]).await -} - -fn register_op_bin_sync( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, -{ - let base_op_fn = move |state: Rc>, mut bufs: BufVec| -> Op { - let record = Record::from(bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let zero_copy_bufs = &mut bufs[1..]; - let result: i32 = - match op_fn(&mut state.borrow_mut(), record.rid, zero_copy_bufs) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn register_op_bin_async( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(Rc>, u32, BufVec) -> R + Copy + 'static, - R: TryFuture, - R::Ok: TryInto, - >::Error: Debug, -{ - let base_op_fn = move |state: Rc>, bufs: BufVec| -> Op { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().unwrap(); - let zero_copy_bufs = bufs_iter.collect::(); - - let record = Record::from(record_buf.as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let fut = async move { - let op = op_fn(state, record.rid, zero_copy_bufs); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn bad_resource_id() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") + let nwritten = stream.write(&bufs[0]).await?; + Ok(nwritten as u32) } fn main() { @@ -329,18 +230,3 @@ fn main() { }; runtime.block_on(future).unwrap(); } - -#[test] -fn test_record_from() { - let expected = Record { - promise_id: 1, - rid: 3, - result: 4, - }; - let buf = RecordBuf::from(expected); - if cfg!(target_endian = "little") { - assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); - } - let actual = Record::from(buf); - assert_eq!(actual, expected); -} diff --git a/core/lib.rs b/core/lib.rs index deea9d2812..c65ed7aac6 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -15,6 +15,8 @@ mod module_specifier; mod modules; mod normalize_path; mod ops; +mod ops_bin; +mod ops_json; pub mod plugin_api; mod resources; mod runtime; @@ -58,8 +60,6 @@ pub use crate::modules::ModuleSourceFuture; pub use crate::modules::NoopModuleLoader; pub use crate::modules::RecursiveModuleLoad; pub use crate::normalize_path::normalize_path; -pub use crate::ops::json_op_async; -pub use crate::ops::json_op_sync; pub use crate::ops::op_close; pub use crate::ops::op_resources; pub use crate::ops::Op; @@ -68,6 +68,11 @@ pub use crate::ops::OpFn; pub use crate::ops::OpId; pub use crate::ops::OpState; pub use crate::ops::OpTable; +pub use crate::ops_bin::bin_op_async; +pub use crate::ops_bin::bin_op_sync; +pub use crate::ops_bin::ValueOrVector; +pub use crate::ops_json::json_op_async; +pub use crate::ops_json::json_op_sync; pub use crate::resources::Resource; pub use crate::resources::ResourceId; pub use crate::resources::ResourceTable; diff --git a/core/ops.rs b/core/ops.rs index eceab7febe..212a713adc 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,13 +10,10 @@ use crate::BufVec; use crate::ZeroCopyBuf; use futures::Future; use indexmap::IndexMap; -use serde::de::DeserializeOwned; -use serde::Serialize; use serde_json::json; use serde_json::Value; use std::cell::RefCell; use std::collections::HashMap; -use std::convert::TryInto; use std::iter::once; use std::ops::Deref; use std::ops::DerefMut; @@ -117,125 +114,6 @@ impl Default for OpTable { } } -/// Creates an op that passes data synchronously using JSON. -/// -/// The provided function `op_fn` has the following parameters: -/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. -/// * `V`: the deserializable value that is passed to the Rust function. -/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used. -/// -/// `op_fn` returns a serializable value, which is directly returned to JavaScript. -/// -/// When registering an op like this... -/// ```ignore -/// let mut runtime = JsRuntime::new(...); -/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op)); -/// ``` -/// -/// ...it can be invoked from JS using the provided name, for example: -/// ```js -/// Deno.core.ops(); -/// let result = Deno.core.jsonOpSync("function_name", args); -/// ``` -/// -/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. -/// A more complete example is available in the examples directory. -pub fn json_op_sync(op_fn: F) -> Box -where - F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result + 'static, - V: DeserializeOwned, - R: Serialize, -{ - Box::new(move |state: Rc>, mut bufs: BufVec| -> Op { - let result = serde_json::from_slice(&bufs[0]) - .map_err(AnyError::from) - .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..])); - let buf = - json_serialize_op_result(None, result, state.borrow().get_error_class_fn); - Op::Sync(buf) - }) -} - -/// Creates an op that passes data asynchronously using JSON. -/// -/// The provided function `op_fn` has the following parameters: -/// * `Rc`: the op state, can be used to read/write resources in the runtime from an op. -/// * `V`: the deserializable value that is passed to the Rust function. -/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. -/// -/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously -/// returned to JavaScript. -/// -/// When registering an op like this... -/// ```ignore -/// let mut runtime = JsRuntime::new(...); -/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); -/// ``` -/// -/// ...it can be invoked from JS using the provided name, for example: -/// ```js -/// Deno.core.ops(); -/// let future = Deno.core.jsonOpAsync("function_name", args); -/// ``` -/// -/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. -/// A more complete example is available in the examples directory. -pub fn json_op_async(op_fn: F) -> Box -where - F: Fn(Rc>, V, BufVec) -> R + 'static, - V: DeserializeOwned, - R: Future> + 'static, - RV: Serialize, -{ - let try_dispatch_op = - move |state: Rc>, bufs: BufVec| -> Result { - let promise_id = bufs[0] - .get(0..8) - .map(|b| u64::from_be_bytes(b.try_into().unwrap())) - .ok_or_else(|| type_error("missing or invalid `promiseId`"))?; - let args = serde_json::from_slice(&bufs[0][8..])?; - let bufs = bufs[1..].into(); - use crate::futures::FutureExt; - let fut = op_fn(state.clone(), args, bufs).map(move |result| { - json_serialize_op_result( - Some(promise_id), - result, - state.borrow().get_error_class_fn, - ) - }); - Ok(Op::Async(Box::pin(fut))) - }; - - Box::new(move |state: Rc>, bufs: BufVec| -> Op { - match try_dispatch_op(state.clone(), bufs) { - Ok(op) => op, - Err(err) => Op::Sync(json_serialize_op_result( - None, - Err::<(), AnyError>(err), - state.borrow().get_error_class_fn, - )), - } - }) -} - -fn json_serialize_op_result( - promise_id: Option, - result: Result, - get_error_class_fn: crate::runtime::GetErrorClassFn, -) -> Box<[u8]> { - let value = match result { - Ok(v) => serde_json::json!({ "ok": v, "promiseId": promise_id }), - Err(err) => serde_json::json!({ - "promiseId": promise_id , - "err": { - "className": (get_error_class_fn)(&err), - "message": err.to_string(), - } - }), - }; - serde_json::to_vec(&value).unwrap().into_boxed_slice() -} - /// Return map of resources with id as key /// and string representation as value. /// diff --git a/runtime/ops/ops_buffer.rs b/core/ops_bin.rs similarity index 96% rename from runtime/ops/ops_buffer.rs rename to core/ops_bin.rs index 6998144cf6..053150bfd5 100644 --- a/runtime/ops/ops_buffer.rs +++ b/core/ops_bin.rs @@ -1,12 +1,12 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use deno_core::error::AnyError; -use deno_core::futures::future::FutureExt; -use deno_core::BufVec; -use deno_core::Op; -use deno_core::OpFn; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; +use crate::error::AnyError; +use crate::futures::future::FutureExt; +use crate::BufVec; +use crate::Op; +use crate::OpFn; +use crate::OpState; +use crate::ZeroCopyBuf; use std::boxed::Box; use std::cell::RefCell; use std::convert::TryInto; @@ -88,18 +88,18 @@ fn gen_padding_32bit(len: usize) -> &'static [u8] { /// When registering an op like this... /// ```ignore /// let mut runtime = JsRuntime::new(...); -/// runtime.register_op("hello", deno_core::buffer_op_sync(Self::hello_op)); +/// runtime.register_op("hello", deno_core::bin_op_sync(Self::hello_op)); /// ``` /// /// ...it can be invoked from JS using the provided name, for example: /// ```js /// Deno.core.ops(); -/// let result = Deno.core.bufferOpSync("function_name", args); +/// let result = Deno.core.binOpSync("function_name", args); /// ``` /// /// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. /// A more complete example is available in the examples directory. -pub fn buffer_op_sync(op_fn: F) -> Box +pub fn bin_op_sync(op_fn: F) -> Box where F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, R: ValueOrVector, @@ -202,7 +202,7 @@ where /// /// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. /// A more complete example is available in the examples directory. -pub fn buffer_op_async(op_fn: F) -> Box +pub fn bin_op_async(op_fn: F) -> Box where F: Fn(Rc>, u32, BufVec) -> R + 'static, R: Future> + 'static, diff --git a/core/ops_json.rs b/core/ops_json.rs new file mode 100644 index 0000000000..0ef91ed33c --- /dev/null +++ b/core/ops_json.rs @@ -0,0 +1,134 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::error::type_error; +use crate::error::AnyError; +use crate::BufVec; +use crate::Op; +use crate::OpFn; +use crate::OpState; +use crate::ZeroCopyBuf; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::cell::RefCell; +use std::convert::TryInto; +use std::future::Future; +use std::rc::Rc; + +fn json_serialize_op_result( + request_id: Option, + result: Result, + get_error_class_fn: crate::runtime::GetErrorClassFn, +) -> Box<[u8]> { + let value = match result { + Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }), + Err(err) => serde_json::json!({ + "requestId": request_id, + "err": { + "className": (get_error_class_fn)(&err), + "message": err.to_string(), + } + }), + }; + serde_json::to_vec(&value).unwrap().into_boxed_slice() +} + +/// Creates an op that passes data synchronously using JSON. +/// +/// The provided function `op_fn` has the following parameters: +/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. +/// * `V`: the deserializable value that is passed to the Rust function. +/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a serializable value, which is directly returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let result = Deno.core.jsonOpSync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn json_op_sync(op_fn: F) -> Box +where + F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result + 'static, + V: DeserializeOwned, + R: Serialize, +{ + Box::new(move |state: Rc>, mut bufs: BufVec| -> Op { + let result = serde_json::from_slice(&bufs[0]) + .map_err(AnyError::from) + .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..])); + let buf = + json_serialize_op_result(None, result, state.borrow().get_error_class_fn); + Op::Sync(buf) + }) +} + +/// Creates an op that passes data asynchronously using JSON. +/// +/// The provided function `op_fn` has the following parameters: +/// * `Rc`: the op state, can be used to read/write resources in the runtime from an op. +/// * `V`: the deserializable value that is passed to the Rust function. +/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously +/// returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let future = Deno.core.jsonOpAsync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn json_op_async(op_fn: F) -> Box +where + F: Fn(Rc>, V, BufVec) -> R + 'static, + V: DeserializeOwned, + R: Future> + 'static, + RV: Serialize, +{ + let try_dispatch_op = + move |state: Rc>, bufs: BufVec| -> Result { + let request_id = bufs[0] + .get(0..8) + .map(|b| u64::from_le_bytes(b.try_into().unwrap())) + .ok_or_else(|| type_error("missing or invalid `requestId`"))?; + let args = serde_json::from_slice(&bufs[0][8..])?; + let bufs = bufs[1..].into(); + use crate::futures::FutureExt; + let fut = op_fn(state.clone(), args, bufs).map(move |result| { + json_serialize_op_result( + Some(request_id), + result, + state.borrow().get_error_class_fn, + ) + }); + Ok(Op::Async(Box::pin(fut))) + }; + + Box::new(move |state: Rc>, bufs: BufVec| -> Op { + match try_dispatch_op(state.clone(), bufs) { + Ok(op) => op, + Err(err) => Op::Sync(json_serialize_op_result( + None, + Err::<(), AnyError>(err), + state.borrow().get_error_class_fn, + )), + } + }) +} diff --git a/runtime/js/10_dispatch_buffer.js b/runtime/js/10_dispatch_buffer.js deleted file mode 100644 index 091fce504a..0000000000 --- a/runtime/js/10_dispatch_buffer.js +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const core = window.Deno.core; - - function assert(cond) { - if (!cond) { - throw Error("assert"); - } - } - - //////////////////////////////////////////////////////////////////////////////////////////// - ////////////////////////////// General async handling ////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////////////////// - - // General Async response handling - let nextRequestId = 1; - const promiseTable = {}; - - function opAsync(opName, opRequestBuilder, opResultParser) { - // Make sure requests of this type are handled by the asyncHandler - // The asyncHandler's role is to call the "promiseTable[requestId]" function - core.setAsyncHandlerByName(opName, (bufUi8, _) => { - const [requestId, result, error] = opResultParser(bufUi8, true); - if (error !== null) { - promiseTable[requestId][1](error); - } else { - promiseTable[requestId][0](result); - } - delete promiseTable[requestId]; - }); - - const requestId = nextRequestId++; - - // Create and store promise - const promise = new Promise((resolve, reject) => { - promiseTable[requestId] = [resolve, reject]; - }); - - // Synchronously dispatch async request - core.dispatchByName(opName, ...opRequestBuilder(requestId)); - - // Wait for async response - return promise; - } - - function opSync(opName, opRequestBuilder, opResultParser) { - const rawResult = core.dispatchByName(opName, ...opRequestBuilder()); - - const [_, result, error] = opResultParser(rawResult, false); - if (error !== null) throw error; - return result; - } - - //////////////////////////////////////////////////////////////////////////////////////////// - /////////////////////////////////// Error handling ///////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////////////////// - - function handleError(className, message) { - const [ErrorClass, args] = core.getErrorClassAndArgs(className); - if (!ErrorClass) { - return new Error( - `Unregistered error class: "${className}"\n` + - ` ${message}\n` + - ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, - ); - } - return new ErrorClass(message, ...args); - } - - //////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////// Buffer ops handling ////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////////////////// - - const scratchBytes = new ArrayBuffer(3 * 4); - const scratchView = new DataView( - scratchBytes, - scratchBytes.byteOffset, - scratchBytes.byteLength, - ); - - function bufferOpBuildRequest(requestId, argument, zeroCopy) { - scratchView.setBigUint64(0, BigInt(requestId), true); - scratchView.setUint32(8, argument, true); - return [scratchView, ...zeroCopy]; - } - - function bufferOpParseResult(bufUi8, isCopyNeeded) { - // Decode header value from ui8 buffer - const headerByteLength = 4 * 4; - assert(bufUi8.byteLength >= headerByteLength); - assert(bufUi8.byteLength % 4 == 0); - const view = new DataView( - bufUi8.buffer, - bufUi8.byteOffset + bufUi8.byteLength - headerByteLength, - headerByteLength, - ); - - const requestId = Number(view.getBigUint64(0, true)); - const status = view.getUint32(8, true); - const result = view.getUint32(12, true); - - // Error handling - if (status !== 0) { - const className = core.decode(bufUi8.subarray(0, result)); - const message = core.decode(bufUi8.subarray(result, -headerByteLength)) - .trim(); - - return [requestId, null, handleError(className, message)]; - } - - if (bufUi8.byteLength === headerByteLength) { - return [requestId, result, null]; - } - - // Rest of response buffer is passed as reference or as a copy - let respBuffer = null; - if (isCopyNeeded) { - // Copy part of the response array (if sent through shared array buf) - respBuffer = bufUi8.slice(0, result); - } else { - // Create view on existing array (if sent through overflow) - respBuffer = bufUi8.subarray(0, result); - } - - return [requestId, respBuffer, null]; - } - - function bufferOpAsync(opName, argument = 0, ...zeroCopy) { - return opAsync( - opName, - (requestId) => bufferOpBuildRequest(requestId, argument, zeroCopy), - bufferOpParseResult, - ); - } - - function bufferOpSync(opName, argument = 0, ...zeroCopy) { - return opSync( - opName, - () => bufferOpBuildRequest(0, argument, zeroCopy), - bufferOpParseResult, - ); - } - - window.__bootstrap.dispatchBuffer = { - bufferOpSync, - bufferOpAsync, - }; -})(this); diff --git a/runtime/js/11_timers.js b/runtime/js/11_timers.js index f076223885..7a0307c06d 100644 --- a/runtime/js/11_timers.js +++ b/runtime/js/11_timers.js @@ -4,7 +4,6 @@ ((window) => { const assert = window.__bootstrap.util.assert; const core = window.Deno.core; - const { bufferOpSync } = window.__bootstrap.dispatchBuffer; function opStopGlobalTimer() { core.jsonOpSync("op_global_timer_stop"); @@ -20,7 +19,7 @@ const nowBytes = new Uint8Array(8); function opNow() { - bufferOpSync("op_now", 0, nowBytes); + core.binOpSync("op_now", 0, nowBytes); return new DataView(nowBytes.buffer).getFloat64(); } diff --git a/runtime/js/12_io.js b/runtime/js/12_io.js index 09e87f990f..fe815c7ed0 100644 --- a/runtime/js/12_io.js +++ b/runtime/js/12_io.js @@ -6,8 +6,8 @@ "use strict"; ((window) => { + const core = window.Deno.core; const DEFAULT_BUFFER_SIZE = 32 * 1024; - const { bufferOpSync, bufferOpAsync } = window.__bootstrap.dispatchBuffer; // Seek whence values. // https://golang.org/pkg/io/#pkg-constants const SeekMode = { @@ -81,7 +81,7 @@ return 0; } - const nread = bufferOpSync("op_read_sync", rid, buffer); + const nread = core.binOpSync("op_read_sync", rid, buffer); if (nread < 0) { throw new Error("read error"); } @@ -97,7 +97,7 @@ return 0; } - const nread = await bufferOpAsync("op_read_async", rid, buffer); + const nread = await core.binOpAsync("op_read_async", rid, buffer); if (nread < 0) { throw new Error("read error"); } @@ -106,7 +106,7 @@ } function writeSync(rid, data) { - const result = bufferOpSync("op_write_sync", rid, data); + const result = core.binOpSync("op_write_sync", rid, data); if (result < 0) { throw new Error("write error"); } @@ -115,7 +115,7 @@ } async function write(rid, data) { - const result = await bufferOpAsync("op_write_async", rid, data); + const result = await core.binOpAsync("op_write_async", rid, data); if (result < 0) { throw new Error("write error"); } diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index e1520b2c5e..1260452b6f 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -99,11 +99,11 @@ lazy_static! { } pub fn init(rt: &mut JsRuntime) { - super::reg_buffer_async(rt, "op_read_async", op_read_async); - super::reg_buffer_async(rt, "op_write_async", op_write_async); + super::reg_bin_async(rt, "op_read_async", op_read_async); + super::reg_bin_async(rt, "op_write_async", op_write_async); - super::reg_buffer_sync(rt, "op_read_sync", op_read_sync); - super::reg_buffer_sync(rt, "op_write_sync", op_write_sync); + super::reg_bin_sync(rt, "op_read_sync", op_read_sync); + super::reg_bin_sync(rt, "op_write_sync", op_write_sync); super::reg_json_async(rt, "op_shutdown", op_shutdown); } diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index e082c5d3a7..2e94d99f56 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -8,7 +8,6 @@ pub mod io; pub mod net; #[cfg(unix)] mod net_unix; -mod ops_buffer; pub mod os; pub mod permissions; pub mod plugin; @@ -25,6 +24,8 @@ pub mod websocket; pub mod worker_host; use crate::metrics::metrics_op; +use deno_core::bin_op_async; +use deno_core::bin_op_sync; use deno_core::error::AnyError; use deno_core::json_op_async; use deno_core::json_op_sync; @@ -33,10 +34,8 @@ use deno_core::serde::Serialize; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::ValueOrVector; use deno_core::ZeroCopyBuf; -use ops_buffer::buffer_op_async; -use ops_buffer::buffer_op_sync; -use ops_buffer::ValueOrVector; use std::cell::RefCell; use std::future::Future; use std::rc::Rc; @@ -63,24 +62,21 @@ where rt.register_op(name, metrics_op(name, json_op_sync(op_fn))); } -pub fn reg_buffer_async( - rt: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where +pub fn reg_bin_async(rt: &mut JsRuntime, name: &'static str, op_fn: F) +where F: Fn(Rc>, u32, BufVec) -> R + 'static, R: Future> + 'static, RV: ValueOrVector, { - rt.register_op(name, metrics_op(name, buffer_op_async(op_fn))); + rt.register_op(name, metrics_op(name, bin_op_async(op_fn))); } -pub fn reg_buffer_sync(rt: &mut JsRuntime, name: &'static str, op_fn: F) +pub fn reg_bin_sync(rt: &mut JsRuntime, name: &'static str, op_fn: F) where F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, R: ValueOrVector, { - rt.register_op(name, metrics_op(name, buffer_op_sync(op_fn))); + rt.register_op(name, metrics_op(name, bin_op_sync(op_fn))); } /// `UnstableChecker` is a struct so it can be placed inside `GothamState`; diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs index 445b7366cb..4395b4885c 100644 --- a/runtime/ops/timers.rs +++ b/runtime/ops/timers.rs @@ -77,7 +77,7 @@ pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop); super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start); super::reg_json_async(rt, "op_global_timer", op_global_timer); - super::reg_buffer_sync(rt, "op_now", op_now); + super::reg_bin_sync(rt, "op_now", op_now); super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync); }