1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(core): Ops can take several zero copy buffers (#4788)

This commit is contained in:
Valentin Anger 2020-06-01 20:20:47 +02:00 committed by GitHub
parent 12d741c2fe
commit becbb56b19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
41 changed files with 322 additions and 215 deletions

View file

@ -80,7 +80,7 @@ declare global {
dispatch(
opId: number,
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
...zeroCopy: ArrayBufferView[]
): Uint8Array | null;
setAsyncHandler(opId: number, cb: (msg: Uint8Array) => void): void;
sharedQueue: {
@ -99,7 +99,7 @@ declare global {
send(
opId: number,
control: null | ArrayBufferView,
data?: ArrayBufferView
...data: ArrayBufferView[]
): null | Uint8Array;
setMacrotaskCallback(cb: () => boolean): void;

View file

@ -59,12 +59,12 @@ export function asyncMsgFromRust(resUi8: Uint8Array): void {
export function sendSync(
opName: string,
args: object = {},
zeroCopy?: Uint8Array
...zeroCopy: Uint8Array[]
): Ok {
const opId = OPS_CACHE[opName];
util.log("sendSync", opName, opId);
const argsUi8 = encode(args);
const resUi8 = core.dispatch(opId, argsUi8, zeroCopy);
const resUi8 = core.dispatch(opId, argsUi8, ...zeroCopy);
util.assert(resUi8 != null);
const res = decode(resUi8);
@ -75,7 +75,7 @@ export function sendSync(
export async function sendAsync(
opName: string,
args: object = {},
zeroCopy?: Uint8Array
...zeroCopy: Uint8Array[]
): Promise<Ok> {
const opId = OPS_CACHE[opName];
util.log("sendAsync", opName, opId);
@ -84,7 +84,7 @@ export async function sendAsync(
const promise = util.createResolvable<Ok>();
const argsUi8 = encode(args);
const buf = core.dispatch(opId, argsUi8, zeroCopy);
const buf = core.dispatch(opId, argsUi8, ...zeroCopy);
if (buf) {
// Sync result.
const res = decode(buf);

View file

@ -24,5 +24,5 @@ export function fetch(
zeroCopy = new Uint8Array(body.buffer, body.byteOffset, body.byteLength);
}
return sendAsync("op_fetch", args, zeroCopy);
return sendAsync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : []));
}

View file

@ -46,17 +46,17 @@ struct AsyncArgs {
pub fn json_op<D>(
d: D,
) -> impl Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(
&mut CoreIsolateState,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>,
{
move |isolate_state: &mut CoreIsolateState,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
zero_copy: &mut [ZeroCopyBuf]| {
let async_args: AsyncArgs = match serde_json::from_slice(control) {
Ok(args) => args,
Err(e) => {

View file

@ -116,13 +116,13 @@ fn test_parse_min_record() {
pub fn minimal_op<D>(
d: D,
) -> impl Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(&mut CoreIsolateState, bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
D: Fn(&mut CoreIsolateState, bool, i32, &mut [ZeroCopyBuf]) -> MinimalOp,
{
move |isolate_state: &mut CoreIsolateState,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
zero_copy: &mut [ZeroCopyBuf]| {
let mut record = match parse_min_record(control) {
Some(r) => r,
None => {

View file

@ -31,7 +31,7 @@ struct ApplySourceMap {
fn op_apply_source_map(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ApplySourceMap = serde_json::from_value(args)?;
@ -55,7 +55,7 @@ fn op_apply_source_map(
fn op_format_diagnostic(
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let diagnostic = serde_json::from_value::<Diagnostic>(args)?;
Ok(JsonOp::Sync(json!(diagnostic.to_string())))

View file

@ -28,7 +28,7 @@ pub fn op_fetch(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: FetchArgs = serde_json::from_value(args)?;
let url = args.url;
@ -57,8 +57,10 @@ pub fn op_fetch(
let mut request = client.request(method, url_);
if let Some(buf) = data {
request = request.body(Vec::from(&*buf));
match data.len() {
0 => {}
1 => request = request.body(Vec::from(&*data[0])),
_ => panic!("Invalid number of arguments"),
}
for (key, value) in args.headers {

View file

@ -72,7 +72,7 @@ fn op_open(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: OpenArgs = serde_json::from_value(args)?;
let path = Path::new(&args.path).to_path_buf();
@ -155,7 +155,7 @@ fn op_seek(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
use std::io::{Seek, SeekFrom};
let args: SeekArgs = serde_json::from_value(args)?;
@ -212,7 +212,7 @@ struct UmaskArgs {
fn op_umask(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.umask");
let args: UmaskArgs = serde_json::from_value(args)?;
@ -250,7 +250,7 @@ struct ChdirArgs {
fn op_chdir(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ChdirArgs = serde_json::from_value(args)?;
let d = PathBuf::from(&args.directory);
@ -271,7 +271,7 @@ struct MkdirArgs {
fn op_mkdir(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: MkdirArgs = serde_json::from_value(args)?;
let path = Path::new(&args.path).to_path_buf();
@ -305,7 +305,7 @@ struct ChmodArgs {
fn op_chmod(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ChmodArgs = serde_json::from_value(args)?;
let path = Path::new(&args.path).to_path_buf();
@ -345,7 +345,7 @@ struct ChownArgs {
fn op_chown(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ChownArgs = serde_json::from_value(args)?;
let path = Path::new(&args.path).to_path_buf();
@ -384,7 +384,7 @@ struct RemoveArgs {
fn op_remove(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: RemoveArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -438,7 +438,7 @@ struct CopyFileArgs {
fn op_copy_file(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: CopyFileArgs = serde_json::from_value(args)?;
let from = PathBuf::from(&args.from);
@ -532,7 +532,7 @@ struct StatArgs {
fn op_stat(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: StatArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -562,7 +562,7 @@ struct RealpathArgs {
fn op_realpath(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: RealpathArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -597,7 +597,7 @@ struct ReadDirArgs {
fn op_read_dir(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ReadDirArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -640,7 +640,7 @@ struct RenameArgs {
fn op_rename(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: RenameArgs = serde_json::from_value(args)?;
let oldpath = PathBuf::from(&args.oldpath);
@ -669,7 +669,7 @@ struct LinkArgs {
fn op_link(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.link");
let args: LinkArgs = serde_json::from_value(args)?;
@ -707,7 +707,7 @@ struct SymlinkOptions {
fn op_symlink(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.symlink");
let args: SymlinkArgs = serde_json::from_value(args)?;
@ -767,7 +767,7 @@ struct ReadLinkArgs {
fn op_read_link(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ReadLinkArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -794,7 +794,7 @@ struct TruncateArgs {
fn op_truncate(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: TruncateArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
@ -868,7 +868,7 @@ struct MakeTempArgs {
fn op_make_temp_dir(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: MakeTempArgs = serde_json::from_value(args)?;
@ -899,7 +899,7 @@ fn op_make_temp_dir(
fn op_make_temp_file(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: MakeTempArgs = serde_json::from_value(args)?;
@ -939,7 +939,7 @@ struct UtimeArgs {
fn op_utime(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.utime");
@ -959,7 +959,7 @@ fn op_utime(
fn op_cwd(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let path = current_dir()?;
state.check_read_blind(&path, "CWD")?;

View file

@ -66,7 +66,7 @@ pub fn op_fs_events_open(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
#[derive(Deserialize)]
struct OpenArgs {
@ -104,7 +104,7 @@ pub fn op_fs_events_poll(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
#[derive(Deserialize)]
struct PollArgs {

View file

@ -211,16 +211,16 @@ pub fn op_read(
_state: &State,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> MinimalOp {
debug!("read rid={}", rid);
if zero_copy.is_none() {
return MinimalOp::Sync(Err(no_buffer_specified()));
match zero_copy.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
}
let resource_table = isolate_state.resource_table.clone();
let mut buf = zero_copy.unwrap();
if is_sync {
MinimalOp::Sync({
// First we look up the rid in the resource table.
@ -229,7 +229,7 @@ pub fn op_read(
Ok(std_file) => {
use std::io::Read;
std_file
.read(&mut buf)
.read(&mut zero_copy[0])
.map(|n: usize| n as i32)
.map_err(OpError::from)
}
@ -239,6 +239,7 @@ pub fn op_read(
})
})
} else {
let mut zero_copy = zero_copy[0].clone();
MinimalOp::Async(
poll_fn(move |cx| {
let mut resource_table = resource_table.borrow_mut();
@ -249,7 +250,7 @@ pub fn op_read(
let mut task_tracker_id: Option<usize> = None;
let nread = match resource_holder
.resource
.poll_read(cx, &mut buf.as_mut()[..])
.poll_read(cx, &mut zero_copy)
.map_err(OpError::from)
{
Poll::Ready(t) => {
@ -335,15 +336,15 @@ pub fn op_write(
_state: &State,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> MinimalOp {
debug!("write rid={}", rid);
if zero_copy.is_none() {
return MinimalOp::Sync(Err(no_buffer_specified()));
match zero_copy.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
}
let buf = zero_copy.unwrap();
if is_sync {
MinimalOp::Sync({
// First we look up the rid in the resource table.
@ -352,7 +353,7 @@ pub fn op_write(
Ok(std_file) => {
use std::io::Write;
std_file
.write(&buf)
.write(&zero_copy[0])
.map(|nwritten: usize| nwritten as i32)
.map_err(OpError::from)
}
@ -362,6 +363,7 @@ pub fn op_write(
})
})
} else {
let zero_copy = zero_copy[0].clone();
let resource_table = isolate_state.resource_table.clone();
MinimalOp::Async(
async move {
@ -370,7 +372,7 @@ pub fn op_write(
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
resource_holder.resource.poll_write(cx, &zero_copy)
})
.await?;

View file

@ -40,7 +40,7 @@ struct AcceptArgs {
fn accept_tcp(
isolate_state: &mut CoreIsolateState,
args: AcceptArgs,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let rid = args.rid as u32;
let resource_table = isolate_state.resource_table.clone();
@ -101,7 +101,7 @@ fn op_accept(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
@ -125,9 +125,10 @@ fn receive_udp(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let mut buf = zero_copy.unwrap();
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
let rid = args.rid as u32;
@ -142,7 +143,9 @@ fn receive_udp(
OpError::bad_resource("Socket has been closed".to_string())
})?;
let socket = &mut resource.socket;
socket.poll_recv_from(cx, &mut buf).map_err(OpError::from)
socket
.poll_recv_from(cx, &mut zero_copy)
.map_err(OpError::from)
});
let (size, remote_addr) = receive_fut.await?;
Ok(json!({
@ -162,9 +165,10 @@ fn op_receive(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"udp" => receive_udp(isolate_state, state, args, zero_copy),
@ -191,10 +195,11 @@ fn op_send(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let zero_copy = zero_copy[0].clone();
let resource_table = isolate_state.resource_table.clone();
match serde_json::from_value(args)? {
SendArgs {
@ -213,7 +218,7 @@ fn op_send(
})?;
let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port)?;
socket.send_to(&buf, addr).await?;
socket.send_to(&zero_copy, addr).await?;
Ok(json!({}))
};
@ -237,7 +242,7 @@ fn op_send(
let socket = &mut resource.socket;
socket
.send_to(&buf, &resource.local_addr.as_pathname().unwrap())
.send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
.await?;
Ok(json!({}))
@ -260,7 +265,7 @@ fn op_connect(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let resource_table = isolate_state.resource_table.clone();
match serde_json::from_value(args)? {
@ -346,7 +351,7 @@ fn op_shutdown(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.shutdown");
@ -488,7 +493,7 @@ fn op_listen(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let mut resource_table = isolate_state.resource_table.borrow_mut();
match serde_json::from_value(args)? {

View file

@ -29,7 +29,7 @@ pub struct UnixListenArgs {
pub fn accept_unix(
isolate_state: &mut CoreIsolateState,
rid: u32,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let resource_table = isolate_state.resource_table.clone();
{
@ -80,9 +80,10 @@ pub fn accept_unix(
pub fn receive_unix_packet(
isolate_state: &mut CoreIsolateState,
rid: u32,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let mut buf = zero_copy.unwrap();
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
let resource_table = isolate_state.resource_table.clone();
let op = async move {
@ -92,7 +93,7 @@ pub fn receive_unix_packet(
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
let (size, remote_addr) = resource.socket.recv_from(&mut zero_copy).await?;
Ok(json!({
"size": size,
"remoteAddr": {

View file

@ -29,7 +29,7 @@ struct GetDirArgs {
fn op_get_dir(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.dir");
state.check_env()?;
@ -80,7 +80,7 @@ fn op_get_dir(
fn op_exec_path(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let current_exe = env::current_exe().unwrap();
state.check_read_blind(&current_exe, "exec_path")?;
@ -100,7 +100,7 @@ struct SetEnv {
fn op_set_env(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: SetEnv = serde_json::from_value(args)?;
state.check_env()?;
@ -111,7 +111,7 @@ fn op_set_env(
fn op_env(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_env()?;
let v = env::vars().collect::<HashMap<String, String>>();
@ -126,7 +126,7 @@ struct GetEnv {
fn op_get_env(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: GetEnv = serde_json::from_value(args)?;
state.check_env()?;
@ -145,7 +145,7 @@ struct Exit {
fn op_exit(
_s: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: Exit = serde_json::from_value(args)?;
std::process::exit(args.code)
@ -154,7 +154,7 @@ fn op_exit(
fn op_loadavg(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.loadavg");
state.check_env()?;
@ -171,7 +171,7 @@ fn op_loadavg(
fn op_hostname(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.hostname");
state.check_env()?;
@ -182,7 +182,7 @@ fn op_hostname(
fn op_os_release(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.osRelease");
state.check_env()?;

View file

@ -31,7 +31,7 @@ struct PermissionArgs {
pub fn op_query_permission(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: PermissionArgs = serde_json::from_value(args)?;
let state = state.borrow();
@ -47,7 +47,7 @@ pub fn op_query_permission(
pub fn op_revoke_permission(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: PermissionArgs = serde_json::from_value(args)?;
let mut state = state.borrow_mut();
@ -74,7 +74,7 @@ pub fn op_revoke_permission(
pub fn op_request_permission(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: PermissionArgs = serde_json::from_value(args)?;
let mut state = state.borrow_mut();

View file

@ -37,7 +37,7 @@ pub fn op_open_plugin(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.openPlugin");
let args: OpenPluginArgs = serde_json::from_value(args).unwrap();

View file

@ -64,7 +64,7 @@ fn op_run(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let run_args: RunArgs = serde_json::from_value(args)?;
@ -178,7 +178,7 @@ fn op_run_status(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: RunStatusArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
@ -228,7 +228,7 @@ struct KillArgs {
fn op_kill(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.kill");
state.check_run()?;

View file

@ -17,15 +17,15 @@ pub fn init(i: &mut CoreIsolate, s: &State) {
fn op_get_random_values(
state: &State,
_args: Value,
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
assert_eq!(zero_copy.len(), 1);
if let Some(ref mut seeded_rng) = state.borrow_mut().seeded_rng {
seeded_rng.fill(&mut zero_copy.unwrap()[..]);
seeded_rng.fill(&mut *zero_copy[0]);
} else {
let mut rng = thread_rng();
rng.fill(&mut zero_copy.unwrap()[..]);
rng.fill(&mut *zero_copy[0]);
}
Ok(JsonOp::Sync(json!({})))

View file

@ -27,7 +27,7 @@ fn op_repl_start(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ReplStartArgs = serde_json::from_value(args)?;
debug!("op_repl_start {}", args.history_file);
@ -50,7 +50,7 @@ fn op_repl_readline(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ReplReadlineArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;

View file

@ -15,7 +15,7 @@ fn op_resources(
isolate_state: &mut CoreIsolateState,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let serialized_resources = isolate_state.resource_table.borrow().entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
@ -26,7 +26,7 @@ fn op_close(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
#[derive(Deserialize)]
struct CloseArgs {

View file

@ -17,7 +17,7 @@ pub fn init(i: &mut CoreIsolate, s: &State) {
fn op_start(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let state = state.borrow();
let gs = &state.global_state;
@ -42,7 +42,7 @@ fn op_start(
fn op_metrics(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let state = state.borrow();
let m = &state.metrics;

View file

@ -26,7 +26,7 @@ struct CompileArgs {
fn op_compile(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.compile");
let args: CompileArgs = serde_json::from_value(args)?;
@ -57,7 +57,7 @@ struct TranspileArgs {
fn op_transpile(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.transpile");
let args: TranspileArgs = serde_json::from_value(args)?;

View file

@ -43,7 +43,7 @@ fn op_signal_bind(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
@ -65,7 +65,7 @@ fn op_signal_poll(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
@ -92,7 +92,7 @@ pub fn op_signal_unbind(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
@ -117,7 +117,7 @@ pub fn op_signal_bind(
_isolate_state: &mut CoreIsolateState,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
unimplemented!();
}
@ -127,7 +127,7 @@ fn op_signal_unbind(
_isolate_state: &mut CoreIsolateState,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
unimplemented!();
}
@ -137,7 +137,7 @@ fn op_signal_poll(
_isolate_state: &mut CoreIsolateState,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
unimplemented!();
}

View file

@ -20,7 +20,7 @@ pub fn init(i: &mut CoreIsolate, s: &State) {
fn op_global_timer_stop(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let mut state = state.borrow_mut();
state.global_timer.cancel();
@ -35,7 +35,7 @@ struct GlobalTimerArgs {
fn op_global_timer(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: GlobalTimerArgs = serde_json::from_value(args)?;
let val = args.timeout;
@ -57,7 +57,7 @@ fn op_global_timer(
fn op_now(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let state = state.borrow();
let seconds = state.start_time.elapsed().as_secs();

View file

@ -57,7 +57,7 @@ pub fn op_start_tls(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.startTls");
let args: StartTLSArgs = serde_json::from_value(args)?;
@ -136,7 +136,7 @@ pub fn op_connect_tls(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ConnectTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file.clone();
@ -310,7 +310,7 @@ fn op_listen_tls(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: ListenTlsArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp");
@ -360,7 +360,7 @@ fn op_accept_tls(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;

View file

@ -50,7 +50,7 @@ pub fn op_set_raw(
isolate_state: &mut CoreIsolateState,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
state.check_unstable("Deno.setRaw");
let args: SetRawArgs = serde_json::from_value(args)?;
@ -219,7 +219,7 @@ pub fn op_isatty(
isolate_state: &mut CoreIsolateState,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid;

View file

@ -17,18 +17,18 @@ pub fn web_worker_op<D>(
) -> impl Fn(
&mut CoreIsolateState,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>
where
D: Fn(
&mpsc::Sender<WorkerEvent>,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>,
{
move |_isolate_state: &mut CoreIsolateState,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) }
}
@ -39,19 +39,19 @@ pub fn web_worker_op2<D>(
) -> impl Fn(
&mut CoreIsolateState,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>
where
D: Fn(
WebWorkerHandle,
&mpsc::Sender<WorkerEvent>,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>,
{
move |_isolate_state: &mut CoreIsolateState,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Result<JsonOp, OpError> {
dispatcher(handle.clone(), &sender, args, zero_copy)
}
@ -84,9 +84,10 @@ pub fn init(
fn op_worker_post_message(
sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
data: Option<ZeroCopyBuf>,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
assert_eq!(data.len(), 1, "Invalid number of arguments");
let d = Vec::from(&*data[0]).into_boxed_slice();
let mut sender = sender.clone();
sender
.try_send(WorkerEvent::Message(d))
@ -99,7 +100,7 @@ fn op_worker_close(
handle: WebWorkerHandle,
sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
_data: Option<ZeroCopyBuf>,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let mut sender = sender.clone();
// Notify parent that we're finished

View file

@ -174,7 +174,7 @@ struct CreateWorkerArgs {
fn op_create_worker(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: CreateWorkerArgs = serde_json::from_value(args)?;
@ -230,7 +230,7 @@ struct WorkerArgs {
fn op_host_terminate_worker(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
@ -296,7 +296,7 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
fn op_host_get_message(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
@ -345,11 +345,12 @@ fn op_host_get_message(
fn op_host_post_message(
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
assert_eq!(data.len(), 1, "Invalid number of arguments");
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let msg = Vec::from(&*data[0]).into_boxed_slice();
debug!("post message to worker {}", id);
let state = state.borrow();

View file

@ -65,9 +65,9 @@ impl State {
pub fn stateful_json_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
D: Fn(&State, Value, &mut [ZeroCopyBuf]) -> Result<JsonOp, OpError>,
{
use crate::ops::json_op;
self.core_op(json_op(self.stateful_op(dispatcher)))
@ -76,13 +76,13 @@ impl State {
pub fn stateful_json_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(
&mut deno_core::CoreIsolateState,
&State,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>,
{
use crate::ops::json_op;
@ -95,19 +95,19 @@ impl State {
pub fn core_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(&mut deno_core::CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op,
D: Fn(&mut deno_core::CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op,
{
let state = self.clone();
move |isolate_state: &mut deno_core::CoreIsolateState,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Op {
let bytes_sent_control = control.len() as u64;
let bytes_sent_zero_copy =
zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64;
zero_copy.iter().map(|b| b.len()).sum::<usize>() as u64;
let op = dispatcher(isolate_state, control, zero_copy);
@ -155,14 +155,14 @@ impl State {
pub fn stateful_minimal_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(
&mut deno_core::CoreIsolateState,
&State,
bool,
i32,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> MinimalOp,
{
let state = self.clone();
@ -170,7 +170,7 @@ impl State {
move |isolate_state: &mut deno_core::CoreIsolateState,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> MinimalOp {
dispatcher(isolate_state, &state, is_sync, rid, zero_copy)
},
@ -188,15 +188,15 @@ impl State {
) -> impl Fn(
&mut deno_core::CoreIsolateState,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>
where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
D: Fn(&State, Value, &mut [ZeroCopyBuf]) -> Result<JsonOp, OpError>,
{
let state = self.clone();
move |_isolate_state: &mut deno_core::CoreIsolateState,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) }
}
@ -206,20 +206,20 @@ impl State {
) -> impl Fn(
&mut deno_core::CoreIsolateState,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>
where
D: Fn(
&mut deno_core::CoreIsolateState,
&State,
Value,
Option<ZeroCopyBuf>,
&mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError>,
{
let state = self.clone();
move |isolate_state: &mut deno_core::CoreIsolateState,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Result<JsonOp, OpError> {
dispatcher(isolate_state, &state, args, zero_copy)
}

View file

@ -452,17 +452,50 @@ fn send(
Err(_) => &[],
};
let zero_copy: Option<ZeroCopyBuf> =
v8::Local::<v8::ArrayBufferView>::try_from(args.get(2))
.map(ZeroCopyBuf::new)
.ok();
let state_rc = CoreIsolate::state(scope.isolate());
let mut state = state_rc.borrow_mut();
assert!(!state.global_context.is_empty());
let mut buf_iter = (2..args.length()).map(|idx| {
v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx))
.map(ZeroCopyBuf::new)
.map_err(|err| {
let msg = format!("Invalid argument at position {}: {}", idx, err);
let msg = v8::String::new(scope, &msg).unwrap();
v8::Exception::type_error(scope, msg)
})
});
let mut buf_one: ZeroCopyBuf;
let mut buf_vec: Vec<ZeroCopyBuf>;
// Collect all ArrayBufferView's
let buf_iter_result = match buf_iter.len() {
0 => Ok(&mut [][..]),
1 => match buf_iter.next().unwrap() {
Ok(buf) => {
buf_one = buf;
Ok(std::slice::from_mut(&mut buf_one))
}
Err(err) => Err(err),
},
_ => match buf_iter.collect::<Result<Vec<_>, _>>() {
Ok(v) => {
buf_vec = v;
Ok(&mut buf_vec[..])
}
Err(err) => Err(err),
},
};
// If response is empty then it's either async op or exception was thrown
let maybe_response = state.dispatch_op(scope, op_id, control, zero_copy);
let maybe_response = match buf_iter_result {
Ok(bufs) => state.dispatch_op(scope, op_id, control, bufs),
Err(exc) => {
scope.isolate().throw_exception(exc);
return;
}
};
if let Some(response) = maybe_response {
// Synchronous response.

View file

@ -59,7 +59,7 @@ SharedQueue Binary Layout
function ops() {
// op id 0 is a special value to retrieve the map of registered ops.
const opsMapBytes = send(0, new Uint8Array([]), null);
const opsMapBytes = send(0, new Uint8Array([]));
const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
return JSON.parse(opsMapJson);
}
@ -181,13 +181,9 @@ SharedQueue Binary Layout
}
}
function dispatch(opId, control, zeroCopy = null) {
return send(opId, control, zeroCopy);
}
Object.assign(window.Deno.core, {
setAsyncHandler,
dispatch,
dispatch: send,
ops,
// sharedQueue is private but exposed for testing.
sharedQueue: {

View file

@ -366,7 +366,7 @@ impl CoreIsolate {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
let state_rc = Self::state(self);
let mut state = state_rc.borrow_mut();
@ -484,7 +484,7 @@ impl CoreIsolateState {
/// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where
F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
self.op_registry.register(name, op)
}
@ -504,10 +504,10 @@ impl CoreIsolateState {
scope: &mut impl v8::ToLocal<'s>,
op_id: OpId,
control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>,
zero_copy_bufs: &mut [ZeroCopyBuf],
) -> Option<(OpId, Box<[u8]>)> {
let op = if let Some(dispatcher) = self.op_registry.get(op_id) {
dispatcher(self, control_buf, zero_copy_buf)
dispatcher(self, control_buf, zero_copy_bufs)
} else {
let message =
v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap();
@ -718,6 +718,7 @@ pub mod tests {
pub enum Mode {
Async,
AsyncUnref,
AsyncZeroCopy(u8),
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
@ -732,7 +733,7 @@ pub mod tests {
let dispatcher = move |_state: &mut CoreIsolateState,
control: &[u8],
_zero_copy: Option<ZeroCopyBuf>|
zero_copy: &mut [ZeroCopyBuf]|
-> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
@ -752,6 +753,18 @@ pub mod tests {
};
Op::AsyncUnref(fut.boxed())
}
Mode::AsyncZeroCopy(count) => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 24);
assert_eq!(zero_copy.len(), count as usize);
zero_copy.iter().enumerate().for_each(|(idx, buf)| {
assert_eq!(buf.len(), 1);
assert_eq!(idx, buf[0] as usize);
});
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
@ -816,6 +829,48 @@ pub mod tests {
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
}
#[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(0));
js_check(isolate.execute(
"filename.js",
r#"
let control = new Uint8Array([24]);
Deno.core.send(1, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
fn test_dispatch_one_zero_copy_buf() {
let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(1));
js_check(isolate.execute(
"filename.js",
r#"
let control = new Uint8Array([24]);
let zero_copy = new Uint8Array([0]);
Deno.core.send(1, control, zero_copy);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
fn test_dispatch_two_zero_copy_bufs() {
let (mut isolate, dispatch_count) = setup(Mode::AsyncZeroCopy(2));
js_check(isolate.execute(
"filename.js",
r#"
let control = new Uint8Array([24]);
let zero_copy_a = new Uint8Array([0]);
let zero_copy_b = new Uint8Array([1]);
Deno.core.send(1, control, zero_copy_a, zero_copy_b);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
#[test]
fn test_poll_async_delayed_ops() {
run_in_task(|cx| {

View file

@ -712,7 +712,7 @@ pub mod tests {
let dispatcher = move |_state: &mut CoreIsolateState,
control: &[u8],
_zero_copy: Option<ZeroCopyBuf>|
_zero_copy: &mut [ZeroCopyBuf]|
-> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);

View file

@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array(
);
assert(scratchBytes.byteLength === 3 * 4);
function send(promiseId, opId, rid, zeroCopy = null) {
function send(promiseId, opId, rid, ...zeroCopy) {
scratch32[0] = promiseId;
scratch32[1] = rid;
scratch32[2] = -1;
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy);
}
/** Returns Promise<number> */
function sendAsync(opId, rid, zeroCopy = null) {
function sendAsync(opId, rid, ...zeroCopy) {
const promiseId = nextPromiseId++;
const p = createResolvable();
const buf = send(promiseId, opId, rid, zeroCopy);
const buf = send(promiseId, opId, rid, ...zeroCopy);
if (buf) {
const record = recordFromBuf(buf);
// Sync result.

View file

@ -113,19 +113,19 @@ impl Isolate {
fn register_sync_op<F>(&mut self, name: &'static str, handler: F)
where
F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
F: 'static + Fn(State, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error>,
{
let state = self.state.clone();
let core_handler = move |_isolate_state: &mut CoreIsolateState,
control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);
let result: i32 = match handler(state, record.rid, zero_copy_buf) {
let result: i32 = match handler(state, record.rid, zero_copy_bufs) {
Ok(r) => r as i32,
Err(_) => -1,
};
@ -139,7 +139,7 @@ impl Isolate {
fn register_op<F>(
&mut self,
name: &'static str,
handler: impl Fn(State, u32, Option<ZeroCopyBuf>) -> F + Copy + 'static,
handler: impl Fn(State, u32, &mut [ZeroCopyBuf]) -> F + Copy + 'static,
) where
F: TryFuture,
F::Ok: TryInto<i32>,
@ -148,15 +148,16 @@ impl Isolate {
let state = self.state.clone();
let core_handler = move |_isolate_state: &mut CoreIsolateState,
control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);
let mut zero_copy = zero_copy_bufs.to_vec();
let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
let op = handler(state, record.rid, &mut zero_copy);
let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
@ -182,7 +183,7 @@ impl Future for Isolate {
fn op_close(
state: State,
rid: u32,
_buf: Option<ZeroCopyBuf>,
_buf: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
debug!("close rid={}", rid);
let resource_table = &mut state.borrow_mut().resource_table;
@ -195,7 +196,7 @@ fn op_close(
fn op_listen(
state: State,
_rid: u32,
_buf: Option<ZeroCopyBuf>,
_buf: &mut [ZeroCopyBuf],
) -> Result<u32, Error> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
@ -209,7 +210,7 @@ fn op_listen(
fn op_accept(
state: State,
rid: u32,
_buf: Option<ZeroCopyBuf>,
_buf: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = u32, Error = Error> {
debug!("accept rid={}", rid);
@ -227,9 +228,11 @@ fn op_accept(
fn op_read(
state: State,
rid: u32,
buf: Option<ZeroCopyBuf>,
bufs: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = usize, Error = Error> {
let mut buf = buf.unwrap();
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let mut buf = bufs[0].clone();
debug!("read rid={}", rid);
poll_fn(move |cx| {
@ -244,9 +247,10 @@ fn op_read(
fn op_write(
state: State,
rid: u32,
buf: Option<ZeroCopyBuf>,
bufs: &mut [ZeroCopyBuf],
) -> impl TryFuture<Ok = usize, Error = Error> {
let buf = buf.unwrap();
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let buf = bufs[0].clone();
debug!("write rid={}", rid);
poll_fn(move |cx| {

View file

@ -22,7 +22,7 @@ pub enum Op {
/// Main type describing op
pub type OpDispatcher =
dyn Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static;
dyn Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static;
#[derive(Default)]
pub struct OpRegistry {
@ -43,7 +43,7 @@ impl OpRegistry {
pub fn register<F>(&mut self, name: &str, op: F) -> OpId
where
F: Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
F: Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op + 'static,
{
let op_id = self.dispatchers.len() as u32;
@ -92,7 +92,7 @@ fn test_op_registry() {
let dispatch = op_registry.get(test_id).unwrap();
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
let res = dispatch(&mut state, &[], None);
let res = dispatch(&mut state, &[], &mut []);
if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0);
} else {
@ -139,7 +139,7 @@ fn register_op_during_call() {
{
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
dispatcher1(&mut state, &[], None);
dispatcher1(&mut state, &[], &mut []);
}
let mut expected = HashMap::new();
@ -157,7 +157,7 @@ fn register_op_during_call() {
};
let state_rc = CoreIsolate::state(&isolate);
let mut state = state_rc.borrow_mut();
let res = dispatcher2(&mut state, &[], None);
let res = dispatcher2(&mut state, &[], &mut []);
if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0);
} else {

View file

@ -15,8 +15,7 @@ pub use crate::ZeroCopyBuf;
pub type InitFn = fn(&mut dyn Interface);
pub type DispatchOpFn =
fn(&mut dyn Interface, &[u8], Option<ZeroCopyBuf>) -> Op;
pub type DispatchOpFn = fn(&mut dyn Interface, &[u8], &mut [ZeroCopyBuf]) -> Op;
pub trait Interface {
fn register_op(&mut self, name: &str, dispatcher: DispatchOpFn) -> OpId;

View file

@ -8,6 +8,7 @@ use std::ops::DerefMut;
/// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It
/// behaves much like an Arc<[u8]>, although a ZeroCopyBuf currently can't be
/// cloned.
#[derive(Clone)]
pub struct ZeroCopyBuf {
backing_store: v8::SharedRef<v8::BackingStore>,
byte_offset: usize,

View file

@ -50,15 +50,15 @@ pub struct TSState {
fn compiler_op<D>(
ts_state: Arc<Mutex<TSState>>,
dispatcher: D,
) -> impl Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op
where
D: Fn(&mut TSState, &[u8]) -> Op,
{
move |_state: &mut CoreIsolateState,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in compiler.
assert!(zero_copy_bufs.is_empty()); // zero_copy_bufs unused in compiler.
let mut s = ts_state.lock().unwrap();
dispatcher(&mut s, control)
}
@ -332,15 +332,15 @@ pub fn trace_serializer() {
/// CoreIsolate.
pub fn op_fetch_asset<S: ::std::hash::BuildHasher>(
custom_assets: HashMap<String, PathBuf, S>,
) -> impl Fn(&mut CoreIsolateState, &[u8], Option<ZeroCopyBuf>) -> Op {
) -> impl Fn(&mut CoreIsolateState, &[u8], &mut [ZeroCopyBuf]) -> Op {
for (_, path) in custom_assets.iter() {
println!("cargo:rerun-if-changed={}", path.display());
}
move |_state: &mut CoreIsolateState,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
zero_copy_bufs: &mut [ZeroCopyBuf]|
-> Op {
assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in this op.
assert!(zero_copy_bufs.is_empty()); // zero_copy_bufs unused in this op.
let name = std::str::from_utf8(control).unwrap();
let asset_code = if let Some(source_code) = get_asset(name) {

View file

@ -13,15 +13,16 @@ pub fn deno_plugin_init(interface: &mut dyn Interface) {
fn op_test_sync(
_interface: &mut dyn Interface,
data: &[u8],
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Op {
if let Some(buf) = zero_copy {
let data_str = std::str::from_utf8(&data[..]).unwrap();
let data_str = std::str::from_utf8(&data[..]).unwrap();
let zero_copy = zero_copy.to_vec();
if !zero_copy.is_empty() {
println!("Hello from plugin. data: {}", data_str);
}
for (idx, buf) in zero_copy.iter().enumerate() {
let buf_str = std::str::from_utf8(&buf[..]).unwrap();
println!(
"Hello from plugin. data: {} | zero_copy: {}",
data_str, buf_str
);
println!("zero_copy[{}]: {}", idx, buf_str);
}
let result = b"test";
let result_box: Buf = Box::new(*result);
@ -31,16 +32,17 @@ fn op_test_sync(
fn op_test_async(
_interface: &mut dyn Interface,
data: &[u8],
zero_copy: Option<ZeroCopyBuf>,
zero_copy: &mut [ZeroCopyBuf],
) -> Op {
let data_str = std::str::from_utf8(&data[..]).unwrap().to_string();
let zero_copy = zero_copy.to_vec();
if !zero_copy.is_empty() {
let data_str = std::str::from_utf8(&data[..]).unwrap().to_string();
println!("Hello from plugin. data: {}", data_str);
}
let fut = async move {
if let Some(buf) = zero_copy {
for (idx, buf) in zero_copy.iter().enumerate() {
let buf_str = std::str::from_utf8(&buf[..]).unwrap();
println!(
"Hello from plugin. data: {} | zero_copy: {}",
data_str, buf_str
);
println!("zero_copy[{}]: {}", idx, buf_str);
}
let (tx, rx) = futures::channel::oneshot::channel::<Result<(), ()>>();
std::thread::spawn(move || {

View file

@ -57,7 +57,7 @@ fn basic() {
println!("stderr {}", stderr);
}
assert!(output.status.success());
let expected = "Hello from plugin. data: test | zero_copy: test\nPlugin Sync Response: test\nHello from plugin. data: test | zero_copy: test\nPlugin Async Response: test\n";
let expected = "Hello from plugin. data: test\nzero_copy[0]: test\nzero_copy[1]: 123\nzero_copy[2]: cba\nPlugin Sync Response: test\nHello from plugin. data: test\nzero_copy[0]: test\nzero_copy[1]: 123\nPlugin Async Response: test\n";
assert_eq!(stdout, expected);
assert_eq!(stderr, "");
}

View file

@ -33,7 +33,9 @@ function runTestSync() {
const response = Deno.core.dispatch(
testSync,
new Uint8Array([116, 101, 115, 116]),
new Uint8Array([116, 101, 115, 116])
new Uint8Array([116, 101, 115, 116]),
new Uint8Array([49, 50, 51]),
new Uint8Array([99, 98, 97])
);
console.log(`Plugin Sync Response: ${textDecoder.decode(response)}`);
@ -47,7 +49,8 @@ function runTestAsync() {
const response = Deno.core.dispatch(
testAsync,
new Uint8Array([116, 101, 115, 116]),
new Uint8Array([116, 101, 115, 116])
new Uint8Array([116, 101, 115, 116]),
new Uint8Array([49, 50, 51])
);
if (response != null || response != undefined) {
@ -80,9 +83,11 @@ function runTestPluginClose() {
const preStr = JSON.stringify(resourcesPre, null, 2);
const postStr = JSON.stringify(resourcesPost, null, 2);
if (preStr !== postStr) {
throw new Error(`Difference in open resources before openPlugin and after Plugin.close():
throw new Error(
`Difference in open resources before openPlugin and after Plugin.close():
Before: ${preStr}
After: ${postStr}`);
After: ${postStr}`
);
}
}