mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
Async op dispatcher support with 'stateful_json_op_(a)sync()' (#7095)
Closes: #7020
This commit is contained in:
parent
b308a774e8
commit
f6e9150b33
8 changed files with 193 additions and 48 deletions
|
@ -30,7 +30,7 @@ fn json_err(err: OpError) -> Value {
|
|||
})
|
||||
}
|
||||
|
||||
fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf {
|
||||
pub fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf {
|
||||
let value = match result {
|
||||
Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
|
||||
Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
|
||||
|
|
|
@ -6,21 +6,29 @@ use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
|
|||
use crate::op_error::OpError;
|
||||
use crate::ops::dispatch_json::JsonResult;
|
||||
use crate::state::State;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CoreIsolate;
|
||||
use deno_core::CoreIsolateState;
|
||||
use deno_core::ResourceTable;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use futures::future::FutureExt;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::From;
|
||||
use std::env::{current_dir, set_current_dir, temp_dir};
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::rc::Rc;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
pub fn init(i: &mut CoreIsolate, s: &State) {
|
||||
i.register_op("op_open", s.stateful_json_op2(op_open));
|
||||
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
|
||||
|
||||
i.register_op("op_open_sync", s.stateful_json_op_sync(t, op_open_sync));
|
||||
i.register_op("op_open_async", s.stateful_json_op_async(t, op_open_async));
|
||||
|
||||
i.register_op("op_seek", s.stateful_json_op2(op_seek));
|
||||
i.register_op("op_fdatasync", s.stateful_json_op2(op_fdatasync));
|
||||
i.register_op("op_fsync", s.stateful_json_op2(op_fsync));
|
||||
|
@ -54,10 +62,9 @@ fn into_string(s: std::ffi::OsString) -> Result<String, OpError> {
|
|||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct OpenArgs {
|
||||
promise_id: Option<u64>,
|
||||
path: String,
|
||||
options: OpenOptions,
|
||||
mode: Option<u32>,
|
||||
options: OpenOptions,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Default, Debug)]
|
||||
|
@ -72,15 +79,12 @@ struct OpenOptions {
|
|||
create_new: bool,
|
||||
}
|
||||
|
||||
fn op_open(
|
||||
isolate_state: &mut CoreIsolateState,
|
||||
fn open_helper(
|
||||
state: &State,
|
||||
args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
) -> Result<JsonOp, OpError> {
|
||||
) -> Result<(PathBuf, std::fs::OpenOptions), OpError> {
|
||||
let args: OpenArgs = serde_json::from_value(args)?;
|
||||
let path = Path::new(&args.path).to_path_buf();
|
||||
let resource_table = isolate_state.resource_table.clone();
|
||||
|
||||
let mut open_options = std::fs::OpenOptions::new();
|
||||
|
||||
|
@ -113,37 +117,46 @@ fn op_open(
|
|||
.append(options.append)
|
||||
.create_new(options.create_new);
|
||||
|
||||
let is_sync = args.promise_id.is_none();
|
||||
Ok((path, open_options))
|
||||
}
|
||||
|
||||
if is_sync {
|
||||
let std_file = open_options.open(path)?;
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
let mut resource_table = resource_table.borrow_mut();
|
||||
let rid = resource_table.add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
Ok(JsonOp::Sync(json!(rid)))
|
||||
} else {
|
||||
let fut = async move {
|
||||
let tokio_file = tokio::fs::OpenOptions::from(open_options)
|
||||
.open(path)
|
||||
.await?;
|
||||
let mut resource_table = resource_table.borrow_mut();
|
||||
let rid = resource_table.add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
Ok(json!(rid))
|
||||
};
|
||||
Ok(JsonOp::Async(fut.boxed_local()))
|
||||
}
|
||||
fn op_open_sync(
|
||||
state: &State,
|
||||
resource_table: &mut ResourceTable,
|
||||
args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
) -> Result<Value, OpError> {
|
||||
let (path, open_options) = open_helper(state, args)?;
|
||||
let std_file = open_options.open(path)?;
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
let rid = resource_table.add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
async fn op_open_async(
|
||||
state: State,
|
||||
resource_table: Rc<RefCell<ResourceTable>>,
|
||||
args: Value,
|
||||
_zero_copy: BufVec,
|
||||
) -> Result<Value, OpError> {
|
||||
let (path, open_options) = open_helper(&state, args)?;
|
||||
let tokio_file = tokio::fs::OpenOptions::from(open_options)
|
||||
.open(path)
|
||||
.await?;
|
||||
let rid = resource_table.borrow_mut().add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
|
|
@ -3,6 +3,7 @@ mod dispatch_json;
|
|||
mod dispatch_minimal;
|
||||
|
||||
pub use dispatch_json::json_op;
|
||||
pub use dispatch_json::serialize_result;
|
||||
pub use dispatch_json::JsonOp;
|
||||
pub use dispatch_json::JsonResult;
|
||||
pub use dispatch_minimal::minimal_op;
|
||||
|
|
|
@ -28,7 +28,10 @@
|
|||
) {
|
||||
checkOpenOptions(options);
|
||||
const mode = options?.mode;
|
||||
const rid = sendSync("op_open", { path: pathFromURL(path), options, mode });
|
||||
const rid = sendSync(
|
||||
"op_open_sync",
|
||||
{ path: pathFromURL(path), options, mode },
|
||||
);
|
||||
|
||||
return new File(rid);
|
||||
}
|
||||
|
@ -40,7 +43,7 @@
|
|||
checkOpenOptions(options);
|
||||
const mode = options?.mode;
|
||||
const rid = await sendAsync(
|
||||
"op_open",
|
||||
"op_open_async",
|
||||
{ path: pathFromURL(path), options, mode },
|
||||
);
|
||||
|
||||
|
|
91
cli/state.rs
91
cli/state.rs
|
@ -6,17 +6,21 @@ use crate::http_util::create_http_client;
|
|||
use crate::import_map::ImportMap;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::op_error::OpError;
|
||||
use crate::ops::serialize_result;
|
||||
use crate::ops::JsonOp;
|
||||
use crate::ops::MinimalOp;
|
||||
use crate::permissions::Permissions;
|
||||
use crate::tsc::TargetLib;
|
||||
use crate::web_worker::WebWorkerHandle;
|
||||
use deno_core::Buf;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CoreIsolateState;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::ModuleLoadId;
|
||||
use deno_core::ModuleLoader;
|
||||
use deno_core::ModuleSpecifier;
|
||||
use deno_core::Op;
|
||||
use deno_core::ResourceTable;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use futures::future::FutureExt;
|
||||
use futures::Future;
|
||||
|
@ -75,6 +79,93 @@ impl State {
|
|||
self.core_op(json_op(self.stateful_op(dispatcher)))
|
||||
}
|
||||
|
||||
pub fn stateful_json_op_sync<D>(
|
||||
&self,
|
||||
resource_table: &Rc<RefCell<ResourceTable>>,
|
||||
dispatcher: D,
|
||||
) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
|
||||
where
|
||||
D: Fn(
|
||||
&State,
|
||||
&mut ResourceTable,
|
||||
Value,
|
||||
&mut [ZeroCopyBuf],
|
||||
) -> Result<Value, OpError>,
|
||||
{
|
||||
let state = self.clone();
|
||||
let resource_table = resource_table.clone();
|
||||
|
||||
move |_: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| {
|
||||
// The first buffer should contain JSON encoded op arguments; parse them.
|
||||
let args: Value = match serde_json::from_slice(&bufs[0]) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
let e = OpError::from(e);
|
||||
return Op::Sync(serialize_result(None, Err(e)));
|
||||
}
|
||||
};
|
||||
|
||||
// Make a slice containing all buffers except for the first one.
|
||||
let zero_copy = &mut bufs[1..];
|
||||
|
||||
let result =
|
||||
dispatcher(&state, &mut *resource_table.borrow_mut(), args, zero_copy);
|
||||
|
||||
// Convert to Op.
|
||||
Op::Sync(serialize_result(None, result))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stateful_json_op_async<D, F>(
|
||||
&self,
|
||||
resource_table: &Rc<RefCell<ResourceTable>>,
|
||||
dispatcher: D,
|
||||
) -> impl Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
|
||||
where
|
||||
D: FnOnce(State, Rc<RefCell<ResourceTable>>, Value, BufVec) -> F + Clone,
|
||||
F: Future<Output = Result<Value, OpError>> + 'static,
|
||||
{
|
||||
let state = self.clone();
|
||||
let resource_table = resource_table.clone();
|
||||
|
||||
move |_: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| {
|
||||
// The first buffer should contain JSON encoded op arguments; parse them.
|
||||
let args: Value = match serde_json::from_slice(&bufs[0]) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
let e = OpError::from(e);
|
||||
return Op::Sync(serialize_result(None, Err(e)));
|
||||
}
|
||||
};
|
||||
|
||||
// `args` should have a `promiseId` property with positive integer value.
|
||||
let promise_id = match args.get("promiseId").and_then(|v| v.as_u64()) {
|
||||
Some(i) => i,
|
||||
None => {
|
||||
let e = OpError::type_error("`promiseId` invalid/missing".to_owned());
|
||||
return Op::Sync(serialize_result(None, Err(e)));
|
||||
}
|
||||
};
|
||||
|
||||
// Take ownership of all buffers after the first one.
|
||||
let zero_copy: BufVec = bufs[1..].into();
|
||||
|
||||
// Call dispatcher to obtain op future.
|
||||
let fut = (dispatcher.clone())(
|
||||
state.clone(),
|
||||
resource_table.clone(),
|
||||
args,
|
||||
zero_copy,
|
||||
);
|
||||
|
||||
// Convert to Op.
|
||||
Op::Async(
|
||||
async move { serialize_result(Some(promise_id), fut.await) }
|
||||
.boxed_local(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stateful_json_op2<D>(
|
||||
&self,
|
||||
dispatcher: D,
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
import { assert, unitTest, assertMatch, unreachable } from "./test_util.ts";
|
||||
import {
|
||||
assertStrictEquals,
|
||||
unitTest,
|
||||
assertMatch,
|
||||
unreachable,
|
||||
} from "./test_util.ts";
|
||||
|
||||
const openErrorStackPattern = new RegExp(
|
||||
`^.*
|
||||
|
@ -28,10 +33,38 @@ declare global {
|
|||
}
|
||||
|
||||
unitTest(function malformedJsonControlBuffer(): void {
|
||||
const opId = Deno.core.ops()["op_open"];
|
||||
const res = Deno.core.send(opId, new Uint8Array([1, 2, 3, 4, 5]));
|
||||
const resText = new TextDecoder().decode(res);
|
||||
const resJson = JSON.parse(resText);
|
||||
assert(!resJson.ok);
|
||||
assert(resJson.err);
|
||||
const opId = Deno.core.ops()["op_open_sync"];
|
||||
const argsBuf = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const resBuf = Deno.core.send(opId, argsBuf);
|
||||
const resText = new TextDecoder().decode(resBuf);
|
||||
const resObj = JSON.parse(resText);
|
||||
assertStrictEquals(resObj.ok, undefined);
|
||||
assertStrictEquals(resObj.err.kind, "TypeError");
|
||||
assertMatch(resObj.err.message, /\bexpected value\b/);
|
||||
});
|
||||
|
||||
unitTest(function invalidPromiseId(): void {
|
||||
const opId = Deno.core.ops()["op_open_async"];
|
||||
const argsObj = {
|
||||
promiseId: "1. NEIN!",
|
||||
path: "/tmp/P.I.S.C.I.X/yeah",
|
||||
mode: 0o666,
|
||||
options: {
|
||||
read: true,
|
||||
write: true,
|
||||
create: true,
|
||||
truncate: false,
|
||||
append: false,
|
||||
createNew: false,
|
||||
},
|
||||
};
|
||||
const argsText = JSON.stringify(argsObj);
|
||||
const argsBuf = new TextEncoder().encode(argsText);
|
||||
const resBuf = Deno.core.send(opId, argsBuf);
|
||||
const resText = new TextDecoder().decode(resBuf);
|
||||
const resObj = JSON.parse(resText);
|
||||
console.error(resText);
|
||||
assertStrictEquals(resObj.ok, undefined);
|
||||
assertStrictEquals(resObj.err.kind, "TypeError");
|
||||
assertMatch(resObj.err.message, /\bpromiseId\b/);
|
||||
});
|
||||
|
|
|
@ -51,6 +51,7 @@ pub use crate::ops::Op;
|
|||
pub use crate::ops::OpAsyncFuture;
|
||||
pub use crate::ops::OpId;
|
||||
pub use crate::resources::ResourceTable;
|
||||
pub use crate::zero_copy_buf::BufVec;
|
||||
pub use crate::zero_copy_buf::ZeroCopyBuf;
|
||||
|
||||
pub fn v8_version() -> &'static str {
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
use crate::bindings;
|
||||
use rusty_v8 as v8;
|
||||
use smallvec::SmallVec;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
|
||||
pub type BufVec = SmallVec<[ZeroCopyBuf; 2]>;
|
||||
|
||||
/// A ZeroCopyBuf encapsulates a slice that's been borrowed from a JavaScript
|
||||
/// ArrayBuffer object. JavaScript objects can normally be garbage collected,
|
||||
/// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It
|
||||
|
|
Loading…
Reference in a new issue