1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 15:24:46 -05:00

refactor(core): Strongly typed deserialization of JSON ops (#9423)

This PR makes json_op_sync/async generic to all Deserialize/Serialize types
instead of the loosely-typed serde_json::Value. Since serde_json::Value
implements Deserialize/Serialize, very little existing code needs to be updated,
however as json_op_sync/async are now generic, type inference is broken in some
cases (see cli/build.rs:146). I've found this reduces a good bit of boilerplate,
as seen in the updated deno_core examples.

This change may also reduce serialization and deserialization overhead as serde
has a better idea of what types it is working with. I am currently working on
benchmarks to confirm this and I will update this PR with my findings.
This commit is contained in:
Jared Beller 2021-02-13 11:56:56 -05:00 committed by GitHub
parent af460fc464
commit b50691efed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 67 additions and 109 deletions

View file

@ -8,6 +8,7 @@ use deno_core::JsRuntime;
use deno_core::RuntimeOptions; use deno_core::RuntimeOptions;
use regex::Regex; use regex::Regex;
use serde::Deserialize; use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::path::Path; use std::path::Path;
@ -142,7 +143,7 @@ fn create_compiler_snapshot(
}); });
js_runtime.register_op( js_runtime.register_op(
"op_build_info", "op_build_info",
json_op_sync(move |_state, _args, _bufs| { json_op_sync(move |_state, _args: Value, _bufs| {
Ok(json!({ Ok(json!({
"buildSpecifier": build_specifier, "buildSpecifier": build_specifier,
"libs": build_libs, "libs": build_libs,

View file

@ -21,22 +21,8 @@ unitTest(function malformedJsonControlBuffer(): void {
unitTest(function invalidPromiseId(): void { unitTest(function invalidPromiseId(): void {
const opId = Deno.core.ops()["op_open_async"]; const opId = Deno.core.ops()["op_open_async"];
const argsObj = { const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]);
promiseId: "1. NEIN!", const resBuf = Deno.core.send(opId, reqBuf);
path: "/tmp/P.I.S.C.I.X/yeah",
mode: 0o666,
options: {
read: true,
write: true,
create: true,
truncate: false,
append: false,
createNew: false,
},
};
const argsText = JSON.stringify(argsObj);
const argsBuf = new TextEncoder().encode(argsText);
const resBuf = Deno.core.send(opId, argsBuf);
const resText = new TextDecoder().decode(resBuf); const resText = new TextDecoder().decode(resBuf);
const resObj = JSON.parse(resText); const resObj = JSON.parse(resText);
console.error(resText); console.error(resText);

View file

@ -213,12 +213,13 @@ SharedQueue Binary Layout
throw new ErrorClass(res.err.message); throw new ErrorClass(res.err.message);
} }
async function jsonOpAsync(opName, args = {}, ...zeroCopy) { async function jsonOpAsync(opName, args = null, ...zeroCopy) {
setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
args.promiseId = nextPromiseId++; const promiseId = nextPromiseId++;
const argsBuf = encodeJson(args); const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
dispatch(opName, argsBuf, ...zeroCopy); new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
dispatch(opName, reqBuf, ...zeroCopy);
let resolve, reject; let resolve, reject;
const promise = new Promise((resolve_, reject_) => { const promise = new Promise((resolve_, reject_) => {
resolve = resolve_; resolve = resolve_;
@ -226,11 +227,11 @@ SharedQueue Binary Layout
}); });
promise.resolve = resolve; promise.resolve = resolve;
promise.reject = reject; promise.reject = reject;
promiseTable[args.promiseId] = promise; promiseTable[promiseId] = promise;
return processResponse(await promise); return processResponse(await promise);
} }
function jsonOpSync(opName, args = {}, ...zeroCopy) { function jsonOpSync(opName, args = null, ...zeroCopy) {
const argsBuf = encodeJson(args); const argsBuf = encodeJson(args);
const res = dispatch(opName, argsBuf, ...zeroCopy); const res = dispatch(opName, argsBuf, ...zeroCopy);
return processResponse(decodeJson(res)); return processResponse(decodeJson(res));

View file

@ -50,27 +50,13 @@ fn main() {
// The json_op_sync function automatically deserializes // The json_op_sync function automatically deserializes
// the first ZeroCopyBuf and serializes the return value // the first ZeroCopyBuf and serializes the return value
// to reduce boilerplate // to reduce boilerplate
json_op_sync(|_state, json, zero_copy| { json_op_sync(|_state, json: Vec<f64>, zero_copy| {
// We check that we only got the JSON value, // We check that we only got the JSON value.
// and that it's of the right type.
if !zero_copy.is_empty() { if !zero_copy.is_empty() {
Err(anyhow!("Expected exactly one argument")) Err(anyhow!("Expected exactly one argument"))
} else if !json.is_array() {
Err(anyhow!("Argument is not of type array"))
} else if !json
.as_array()
.unwrap()
.iter()
.all(|value| value.is_number())
{
Err(anyhow!("Argument is not array of numbers"))
} else { } else {
// And if everything checks out we do our actual task // And if we did, do our actual task
let sum = json let sum = json.iter().fold(0.0, |a, v| a + v);
.as_array()
.unwrap()
.iter()
.fold(0.0, |a, v| a + v.as_f64().unwrap());
// Finally we return a JSON value // Finally we return a JSON value
Ok(Value::from(sum)) Ok(Value::from(sum))

View file

@ -11,7 +11,7 @@ const responseBuf = new Uint8Array(
/** Listens on 0.0.0.0:4500, returns rid. */ /** Listens on 0.0.0.0:4500, returns rid. */
function listen() { function listen() {
const { rid } = Deno.core.jsonOpSync("listen", {}); const { rid } = Deno.core.jsonOpSync("listen");
return rid; return rid;
} }

View file

@ -14,10 +14,11 @@ use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use std::cell::RefCell; use std::cell::RefCell;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::convert::TryInto;
use std::env; use std::env;
use std::io::Error; use std::io::Error;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -124,83 +125,67 @@ fn create_js_runtime() -> JsRuntime {
runtime runtime
} }
#[derive(Deserialize, Serialize)]
struct ResourceId {
rid: u32,
}
fn op_listen( fn op_listen(
state: &mut OpState, state: &mut OpState,
_args: Value, _args: (),
_bufs: &mut [ZeroCopyBuf], _bufs: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> { ) -> Result<ResourceId, AnyError> {
debug!("listen"); debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?; let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?; std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?; let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table.add(listener); let rid = state.resource_table.add(listener);
Ok(serde_json::json!({ "rid": rid })) Ok(ResourceId { rid })
} }
fn op_close( fn op_close(
state: &mut OpState, state: &mut OpState,
args: Value, args: ResourceId,
_buf: &mut [ZeroCopyBuf], _buf: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> { ) -> Result<(), AnyError> {
let rid: u32 = args debug!("close rid={}", args.rid);
.get("rid")
.unwrap()
.as_u64()
.unwrap()
.try_into()
.unwrap();
debug!("close rid={}", rid);
state state
.resource_table .resource_table
.close(rid) .close(args.rid)
.map(|_| serde_json::json!(())) .map(|_| ())
.ok_or_else(bad_resource_id) .ok_or_else(bad_resource_id)
} }
async fn op_accept( async fn op_accept(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
args: Value, args: ResourceId,
_bufs: BufVec, _bufs: BufVec,
) -> Result<Value, AnyError> { ) -> Result<ResourceId, AnyError> {
let rid: u32 = args debug!("accept rid={}", args.rid);
.get("rid")
.unwrap()
.as_u64()
.unwrap()
.try_into()
.unwrap();
debug!("accept rid={}", rid);
let listener = state let listener = state
.borrow() .borrow()
.resource_table .resource_table
.get::<TcpListener>(rid) .get::<TcpListener>(args.rid)
.ok_or_else(bad_resource_id)?; .ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?; let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table.add(stream); let rid = state.borrow_mut().resource_table.add(stream);
Ok(serde_json::json!({ "rid": rid })) Ok(ResourceId { rid })
} }
async fn op_read( async fn op_read(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
args: Value, args: ResourceId,
mut bufs: BufVec, mut bufs: BufVec,
) -> Result<Value, AnyError> { ) -> Result<Value, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments"); assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let rid: u32 = args debug!("read rid={}", args.rid);
.get("rid")
.unwrap()
.as_u64()
.unwrap()
.try_into()
.unwrap();
debug!("read rid={}", rid);
let stream = state let stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<TcpStream>(rid) .get::<TcpStream>(args.rid)
.ok_or_else(bad_resource_id)?; .ok_or_else(bad_resource_id)?;
let nread = stream.read(&mut bufs[0]).await?; let nread = stream.read(&mut bufs[0]).await?;
Ok(serde_json::json!({ "nread": nread })) Ok(serde_json::json!({ "nread": nread }))
@ -208,23 +193,16 @@ async fn op_read(
async fn op_write( async fn op_write(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
args: Value, args: ResourceId,
bufs: BufVec, bufs: BufVec,
) -> Result<Value, AnyError> { ) -> Result<Value, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments"); assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let rid: u32 = args debug!("write rid={}", args.rid);
.get("rid")
.unwrap()
.as_u64()
.unwrap()
.try_into()
.unwrap();
debug!("write rid={}", rid);
let stream = state let stream = state
.borrow() .borrow()
.resource_table .resource_table
.get::<TcpStream>(rid) .get::<TcpStream>(args.rid)
.ok_or_else(bad_resource_id)?; .ok_or_else(bad_resource_id)?;
let nwritten = stream.write(&bufs[0]).await?; let nwritten = stream.write(&bufs[0]).await?;
Ok(serde_json::json!({ "nwritten": nwritten })) Ok(serde_json::json!({ "nwritten": nwritten }))

View file

@ -10,14 +10,14 @@ declare namespace Deno {
/** Send a JSON op to Rust, and synchronously receive the result. */ /** Send a JSON op to Rust, and synchronously receive the result. */
function jsonOpSync( function jsonOpSync(
opName: string, opName: string,
args: any, args?: any,
...zeroCopy: Uint8Array[] ...zeroCopy: Uint8Array[]
): any; ): any;
/** Send a JSON op to Rust, and asynchronously receive the result. */ /** Send a JSON op to Rust, and asynchronously receive the result. */
function jsonOpAsync( function jsonOpAsync(
opName: string, opName: string,
args: any, args?: any,
...zeroCopy: Uint8Array[] ...zeroCopy: Uint8Array[]
): Promise<any>; ): Promise<any>;

View file

@ -10,10 +10,13 @@ use crate::BufVec;
use crate::ZeroCopyBuf; use crate::ZeroCopyBuf;
use futures::Future; use futures::Future;
use indexmap::IndexMap; use indexmap::IndexMap;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::Value; use serde_json::Value;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto;
use std::iter::once; use std::iter::once;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
@ -118,10 +121,10 @@ impl Default for OpTable {
/// ///
/// The provided function `op_fn` has the following parameters: /// The provided function `op_fn` has the following parameters:
/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. /// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
/// * `Value`: the JSON value that is passed to the Rust function. /// * `V`: the deserializable value that is passed to the Rust function.
/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used. /// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used.
/// ///
/// `op_fn` returns a JSON value, which is directly returned to JavaScript. /// `op_fn` returns a serializable value, which is directly returned to JavaScript.
/// ///
/// When registering an op like this... /// When registering an op like this...
/// ```ignore /// ```ignore
@ -137,10 +140,11 @@ impl Default for OpTable {
/// ///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. /// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory. /// A more complete example is available in the examples directory.
pub fn json_op_sync<F>(op_fn: F) -> Box<OpFn> pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
where where
F: Fn(&mut OpState, Value, &mut [ZeroCopyBuf]) -> Result<Value, AnyError> F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
+ 'static, V: DeserializeOwned,
R: Serialize,
{ {
Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op { Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
let result = serde_json::from_slice(&bufs[0]) let result = serde_json::from_slice(&bufs[0])
@ -156,10 +160,10 @@ where
/// ///
/// The provided function `op_fn` has the following parameters: /// The provided function `op_fn` has the following parameters:
/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op. /// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
/// * `Value`: the JSON value that is passed to the Rust function. /// * `V`: the deserializable value that is passed to the Rust function.
/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. /// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
/// ///
/// `op_fn` returns a future, whose output is a JSON value. This value will be asynchronously /// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously
/// returned to JavaScript. /// returned to JavaScript.
/// ///
/// When registering an op like this... /// When registering an op like this...
@ -176,18 +180,20 @@ where
/// ///
/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. /// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
/// A more complete example is available in the examples directory. /// A more complete example is available in the examples directory.
pub fn json_op_async<F, R>(op_fn: F) -> Box<OpFn> pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
where where
F: Fn(Rc<RefCell<OpState>>, Value, BufVec) -> R + 'static, F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
R: Future<Output = Result<Value, AnyError>> + 'static, V: DeserializeOwned,
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: Serialize,
{ {
let try_dispatch_op = let try_dispatch_op =
move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> { move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
let args: Value = serde_json::from_slice(&bufs[0])?; let promise_id = bufs[0]
let promise_id = args .get(0..8)
.get("promiseId") .map(|b| u64::from_be_bytes(b.try_into().unwrap()))
.and_then(Value::as_u64)
.ok_or_else(|| type_error("missing or invalid `promiseId`"))?; .ok_or_else(|| type_error("missing or invalid `promiseId`"))?;
let args = serde_json::from_slice(&bufs[0][8..])?;
let bufs = bufs[1..].into(); let bufs = bufs[1..].into();
use crate::futures::FutureExt; use crate::futures::FutureExt;
let fut = op_fn(state.clone(), args, bufs).map(move |result| { let fut = op_fn(state.clone(), args, bufs).map(move |result| {
@ -205,16 +211,16 @@ where
Ok(op) => op, Ok(op) => op,
Err(err) => Op::Sync(json_serialize_op_result( Err(err) => Op::Sync(json_serialize_op_result(
None, None,
Err(err), Err::<(), AnyError>(err),
state.borrow().get_error_class_fn, state.borrow().get_error_class_fn,
)), )),
} }
}) })
} }
fn json_serialize_op_result( fn json_serialize_op_result<R: Serialize>(
promise_id: Option<u64>, promise_id: Option<u64>,
result: Result<serde_json::Value, AnyError>, result: Result<R, AnyError>,
get_error_class_fn: crate::runtime::GetErrorClassFn, get_error_class_fn: crate::runtime::GetErrorClassFn,
) -> Box<[u8]> { ) -> Box<[u8]> {
let value = match result { let value = match result {