diff --git a/cli/deno_error.rs b/cli/deno_error.rs index e024a396c6..3b7dbcde80 100644 --- a/cli/deno_error.rs +++ b/cli/deno_error.rs @@ -205,6 +205,18 @@ impl GetErrorKind for ReadlineError { } } +impl GetErrorKind for serde_json::error::Error { + fn kind(&self) -> ErrorKind { + use serde_json::error::*; + match self.classify() { + Category::Io => ErrorKind::InvalidInput, + Category::Syntax => ErrorKind::InvalidInput, + Category::Data => ErrorKind::InvalidData, + Category::Eof => ErrorKind::UnexpectedEof, + } + } +} + #[cfg(unix)] mod unix { use super::{ErrorKind, GetErrorKind}; @@ -251,6 +263,11 @@ impl GetErrorKind for dyn AnyError { .or_else(|| self.downcast_ref::().map(Get::kind)) .or_else(|| self.downcast_ref::().map(Get::kind)) .or_else(|| self.downcast_ref::().map(Get::kind)) + .or_else(|| { + self + .downcast_ref::() + .map(Get::kind) + }) .or_else(|| unix_error_kind(self)) .unwrap_or_else(|| { panic!("Can't get ErrorKind for {:?}", self); diff --git a/cli/msg.fbs b/cli/msg.fbs index 26ff612782..a7359c5272 100644 --- a/cli/msg.fbs +++ b/cli/msg.fbs @@ -1,37 +1,14 @@ union Any { - Accept, Chdir, Chmod, Chown, CopyFile, - CreateWorker, - CreateWorkerRes, Cwd, CwdRes, - Dial, - GetRandomValues, - GlobalTimer, - GlobalTimerRes, - GlobalTimerStop, - HostGetMessage, - HostGetMessageRes, - HostGetWorkerClosed, - HostPostMessage, - Kill, Link, - Listen, - ListenRes, MakeTempDir, MakeTempDirRes, - Metrics, - MetricsRes, Mkdir, - NewConn, - Now, - NowRes, - PermissionRevoke, - Permissions, - PermissionsRes, Read, ReadDir, ReadDirRes, @@ -40,25 +17,11 @@ union Any { ReadlinkRes, Remove, Rename, - ReplReadline, - ReplReadlineRes, - ReplStart, - ReplStartRes, - Resources, - ResourcesRes, - Run, - RunRes, - RunStatus, - RunStatusRes, Seek, - Shutdown, Stat, StatRes, Symlink, Truncate, - WorkerGetMessage, - WorkerGetMessageRes, - WorkerPostMessage, Write, WriteRes, } @@ -159,83 +122,15 @@ table FormatErrorRes { error: string; } -// Create worker as host -table CreateWorker { - specifier: string; - include_deno_namespace: bool; - has_source_code: bool; - source_code: string; -} - -table CreateWorkerRes { - rid: uint32; -} - -table HostGetWorkerClosed { - rid: uint32; -} - -// Get message from guest worker as host -table HostGetMessage { - rid: uint32; -} - -table HostGetMessageRes { - data: [ubyte]; -} - -// Post message to guest worker as host -table HostPostMessage { - rid: uint32; - // data passed thru the zero-copy data parameter. -} - -// Get message from host as guest worker -table WorkerGetMessage { - unused: int8; -} - -table WorkerGetMessageRes { - data: [ubyte]; -} - -// Post message to host as guest worker -table WorkerPostMessage { - // data passed thru the zero-copy data parameter. -} - table Chdir { directory: string; } -table GlobalTimer { - timeout: int; -} - -table GlobalTimerRes { } - -table GlobalTimerStop { } - table KeyValue { key: string; value: string; } -table Permissions {} - -table PermissionRevoke { - permission: string; -} - -table PermissionsRes { - run: bool; - read: bool; - write: bool; - net: bool; - env: bool; - hrtime: bool; -} - table MakeTempDir { dir: string; prefix: string; @@ -294,35 +189,6 @@ table ReadlinkRes { path: string; } -table ReplStart { - history_file: string; - // TODO add config -} - -table ReplStartRes { - rid: uint32; -} - -table ReplReadline { - rid: uint32; - prompt: string; -} - -table ReplReadlineRes { - line: string; -} - -table Resources {} - -table Resource { - rid: uint32; - repr: string; -} - -table ResourcesRes { - resources: [Resource]; -} - table Symlink { oldname: string; newname: string; @@ -373,99 +239,10 @@ table WriteRes { nbyte: uint; } -table Kill { - pid: int32; - signo: int32; -} - -table Shutdown { - rid: uint32; - how: uint; -} - -table Listen { - network: string; - address: string; -} - -table ListenRes { - rid: uint32; -} - -table Accept { - rid: uint32; -} - -table Dial { - network: string; - address: string; -} - -// Response to Accept and Dial. -table NewConn { - rid: uint32; - remote_addr: string; - local_addr: string; -} - -table Metrics {} - -table MetricsRes { - ops_dispatched: uint64; - ops_completed: uint64; - bytes_sent_control: uint64; - bytes_sent_data: uint64; - bytes_received: uint64; -} - -enum ProcessStdio: byte { Inherit, Piped, Null } - -table Run { - args: [string]; - cwd: string; - env: [KeyValue]; - stdin: ProcessStdio; - stdout: ProcessStdio; - stderr: ProcessStdio; - stdin_rid: uint32; - stdout_rid: uint32; - stderr_rid: uint32; -} - -table RunRes { - rid: uint32; - pid: uint32; - // The following stdio rids are only valid if "Piped" was specified for the - // corresponding stdio stream. The caller MUST issue a close op for all valid - // stdio streams. - stdin_rid: uint32; - stdout_rid: uint32; - stderr_rid: uint32; -} - -table RunStatus { - rid: uint32; -} - -table RunStatusRes { - got_signal: bool; - exit_code: int; - exit_signal: int; -} - -table Now {} - -table NowRes { - seconds: uint64; - subsec_nanos: uint32; -} - table Seek { rid: uint32; offset: int; whence: uint; } -table GetRandomValues {} - root_type Base; diff --git a/cli/msg.rs b/cli/msg.rs index 51726b5726..db4c771f89 100644 --- a/cli/msg.rs +++ b/cli/msg.rs @@ -1,22 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. #![allow(dead_code)] #![cfg_attr(feature = "cargo-clippy", allow(clippy::all, clippy::pedantic))] -use crate::state; use flatbuffers; -use std::sync::atomic::Ordering; // GN_OUT_DIR is set either by build.rs (for the Cargo build), or by // build_extra/rust/run.py (for the GN+Ninja build). include!(concat!(env!("GN_OUT_DIR"), "/gen/cli/msg_generated.rs")); - -impl<'a> From<&'a state::Metrics> for MetricsResArgs { - fn from(m: &'a state::Metrics) -> Self { - MetricsResArgs { - ops_dispatched: m.ops_dispatched.load(Ordering::SeqCst) as u64, - ops_completed: m.ops_completed.load(Ordering::SeqCst) as u64, - bytes_sent_control: m.bytes_sent_control.load(Ordering::SeqCst) as u64, - bytes_sent_data: m.bytes_sent_data.load(Ordering::SeqCst) as u64, - bytes_received: m.bytes_received.load(Ordering::SeqCst) as u64, - } - } -} diff --git a/cli/ops/dispatch_flatbuffers.rs b/cli/ops/dispatch_flatbuffers.rs index c785d6c066..bd01587510 100644 --- a/cli/ops/dispatch_flatbuffers.rs +++ b/cli/ops/dispatch_flatbuffers.rs @@ -12,19 +12,6 @@ use super::fs::{ op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename, op_stat, op_symlink, op_truncate, }; -use super::metrics::op_metrics; -use super::net::{op_accept, op_dial, op_listen, op_shutdown}; -use super::performance::op_now; -use super::permissions::{op_permissions, op_revoke_permission}; -use super::process::{op_kill, op_run, op_run_status}; -use super::random::op_get_random_values; -use super::repl::{op_repl_readline, op_repl_start}; -use super::resources::op_resources; -use super::timers::{op_global_timer, op_global_timer_stop}; -use super::workers::{ - op_create_worker, op_host_get_message, op_host_get_worker_closed, - op_host_post_message, op_worker_get_message, op_worker_post_message, -}; type CliDispatchFn = fn( state: &ThreadSafeState, @@ -138,50 +125,24 @@ pub fn serialize_response( /// Standard ops set for most isolates pub fn op_selector_std(inner_type: msg::Any) -> Option { match inner_type { - msg::Any::Accept => Some(op_accept), msg::Any::Chdir => Some(op_chdir), msg::Any::Chmod => Some(op_chmod), msg::Any::Chown => Some(op_chown), msg::Any::CopyFile => Some(op_copy_file), - msg::Any::CreateWorker => Some(op_create_worker), msg::Any::Cwd => Some(op_cwd), - msg::Any::Dial => Some(op_dial), - msg::Any::GetRandomValues => Some(op_get_random_values), - msg::Any::GlobalTimer => Some(op_global_timer), - msg::Any::GlobalTimerStop => Some(op_global_timer_stop), - msg::Any::HostGetMessage => Some(op_host_get_message), - msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), - msg::Any::HostPostMessage => Some(op_host_post_message), - msg::Any::Kill => Some(op_kill), msg::Any::Link => Some(op_link), - msg::Any::Listen => Some(op_listen), msg::Any::MakeTempDir => Some(op_make_temp_dir), - msg::Any::Metrics => Some(op_metrics), msg::Any::Mkdir => Some(op_mkdir), - msg::Any::Now => Some(op_now), - msg::Any::PermissionRevoke => Some(op_revoke_permission), - msg::Any::Permissions => Some(op_permissions), msg::Any::Read => Some(op_read), msg::Any::ReadDir => Some(op_read_dir), msg::Any::Readlink => Some(op_read_link), msg::Any::Remove => Some(op_remove), msg::Any::Rename => Some(op_rename), - msg::Any::ReplReadline => Some(op_repl_readline), - msg::Any::ReplStart => Some(op_repl_start), - msg::Any::Resources => Some(op_resources), - msg::Any::Run => Some(op_run), - msg::Any::RunStatus => Some(op_run_status), - msg::Any::Shutdown => Some(op_shutdown), msg::Any::Stat => Some(op_stat), msg::Any::Symlink => Some(op_symlink), msg::Any::Truncate => Some(op_truncate), msg::Any::Write => Some(op_write), - // TODO(ry) split these out so that only the appropriate Workers can access - // them. - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), - _ => None, } } diff --git a/cli/ops/metrics.rs b/cli/ops/metrics.rs index 76f36c3904..e1a23f6c81 100644 --- a/cli/ops/metrics.rs +++ b/cli/ops/metrics.rs @@ -1,31 +1,21 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; -use crate::msg; +use super::dispatch_json::{JsonOp, Value}; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; +use std::sync::atomic::Ordering; pub fn op_metrics( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); + _args: Value, + _zero_copy: Option, +) -> Result { + let m = &state.metrics; - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::MetricsRes::create( - builder, - &msg::MetricsResArgs::from(&state.metrics), - ); - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::MetricsRes, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!({ + "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, + "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, + "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, + "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, + "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 + }))) } diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 6a80e610f0..4636754c9b 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -45,6 +45,29 @@ pub const OP_OPEN: OpId = 15; pub const OP_CLOSE: OpId = 16; pub const OP_SEEK: OpId = 17; pub const OP_FETCH: OpId = 18; +pub const OP_METRICS: OpId = 19; +pub const OP_REPL_START: OpId = 20; +pub const OP_REPL_READLINE: OpId = 21; +pub const OP_ACCEPT: OpId = 22; +pub const OP_DIAL: OpId = 23; +pub const OP_SHUTDOWN: OpId = 24; +pub const OP_LISTEN: OpId = 25; +pub const OP_RESOURCES: OpId = 26; +pub const OP_GET_RANDOM_VALUES: OpId = 27; +pub const OP_GLOBAL_TIMER_STOP: OpId = 28; +pub const OP_GLOBAL_TIMER: OpId = 29; +pub const OP_NOW: OpId = 30; +pub const OP_PERMISSIONS: OpId = 31; +pub const OP_REVOKE_PERMISSION: OpId = 32; +pub const OP_CREATE_WORKER: OpId = 33; +pub const OP_HOST_GET_WORKER_CLOSED: OpId = 34; +pub const OP_HOST_POST_MESSAGE: OpId = 35; +pub const OP_HOST_GET_MESSAGE: OpId = 36; +pub const OP_WORKER_POST_MESSAGE: OpId = 37; +pub const OP_WORKER_GET_MESSAGE: OpId = 38; +pub const OP_RUN: OpId = 39; +pub const OP_RUN_STATUS: OpId = 40; +pub const OP_KILL: OpId = 41; pub fn dispatch( state: &ThreadSafeState, @@ -112,9 +135,113 @@ pub fn dispatch( OP_SEEK => { dispatch_json::dispatch(files::op_seek, state, control, zero_copy) } + OP_METRICS => { + dispatch_json::dispatch(metrics::op_metrics, state, control, zero_copy) + } OP_FETCH => { dispatch_json::dispatch(fetch::op_fetch, state, control, zero_copy) } + OP_REPL_START => { + dispatch_json::dispatch(repl::op_repl_start, state, control, zero_copy) + } + OP_REPL_READLINE => { + dispatch_json::dispatch(repl::op_repl_readline, state, control, zero_copy) + } + OP_ACCEPT => { + dispatch_json::dispatch(net::op_accept, state, control, zero_copy) + } + OP_DIAL => dispatch_json::dispatch(net::op_dial, state, control, zero_copy), + OP_SHUTDOWN => { + dispatch_json::dispatch(net::op_shutdown, state, control, zero_copy) + } + OP_LISTEN => { + dispatch_json::dispatch(net::op_listen, state, control, zero_copy) + } + OP_RESOURCES => dispatch_json::dispatch( + resources::op_resources, + state, + control, + zero_copy, + ), + OP_GET_RANDOM_VALUES => dispatch_json::dispatch( + random::op_get_random_values, + state, + control, + zero_copy, + ), + OP_GLOBAL_TIMER_STOP => dispatch_json::dispatch( + timers::op_global_timer_stop, + state, + control, + zero_copy, + ), + OP_GLOBAL_TIMER => dispatch_json::dispatch( + timers::op_global_timer, + state, + control, + zero_copy, + ), + OP_NOW => { + dispatch_json::dispatch(performance::op_now, state, control, zero_copy) + } + OP_PERMISSIONS => dispatch_json::dispatch( + permissions::op_permissions, + state, + control, + zero_copy, + ), + OP_REVOKE_PERMISSION => dispatch_json::dispatch( + permissions::op_revoke_permission, + state, + control, + zero_copy, + ), + OP_CREATE_WORKER => dispatch_json::dispatch( + workers::op_create_worker, + state, + control, + zero_copy, + ), + OP_HOST_GET_WORKER_CLOSED => dispatch_json::dispatch( + workers::op_host_get_worker_closed, + state, + control, + zero_copy, + ), + OP_HOST_POST_MESSAGE => dispatch_json::dispatch( + workers::op_host_post_message, + state, + control, + zero_copy, + ), + OP_HOST_GET_MESSAGE => dispatch_json::dispatch( + workers::op_host_get_message, + state, + control, + zero_copy, + ), + // TODO: make sure these two ops are only accessible to appropriate Workers + OP_WORKER_POST_MESSAGE => dispatch_json::dispatch( + workers::op_worker_post_message, + state, + control, + zero_copy, + ), + OP_WORKER_GET_MESSAGE => dispatch_json::dispatch( + workers::op_worker_get_message, + state, + control, + zero_copy, + ), + OP_RUN => { + dispatch_json::dispatch(process::op_run, state, control, zero_copy) + } + OP_RUN_STATUS => { + dispatch_json::dispatch(process::op_run_status, state, control, zero_copy) + } + OP_KILL => { + dispatch_json::dispatch(process::op_kill, state, control, zero_copy) + } OP_FLATBUFFER => dispatch_flatbuffers::dispatch(state, control, zero_copy), _ => panic!("bad op_id"), }; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 5ce5624922..650127fadc 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,15 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error; -use crate::msg; use crate::resolve_addr::resolve_addr; use crate::resources; use crate::resources::Resource; use crate::state::ThreadSafeState; use crate::tokio_util; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures::Future; use std; use std::convert::From; @@ -18,15 +15,18 @@ use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; +#[derive(Deserialize)] +struct AcceptArgs { + rid: i32, +} + pub fn op_accept( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_accept().unwrap(); - let server_rid = inner.rid(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: AcceptArgs = serde_json::from_value(args)?; + let server_rid = args.rid as u32; match resources::lookup(server_rid) { None => Err(deno_error::bad_resource()), @@ -34,55 +34,65 @@ pub fn op_accept( let op = tokio_util::accept(server_resource) .map_err(ErrBox::from) .and_then(move |(tcp_stream, _socket_addr)| { - new_conn(cmd_id, tcp_stream) + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + futures::future::ok(json!({ + "rid": tcp_stream_resource.rid + })) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + + Ok(JsonOp::Async(Box::new(op))) } } } +#[derive(Deserialize)] +struct DialArgs { + network: String, + address: String, +} + pub fn op_dial( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_dial().unwrap(); - let network = inner.network().unwrap(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: DialArgs = serde_json::from_value(args)?; + let network = args.network; assert_eq!(network, "tcp"); // TODO Support others. - let address = inner.address().unwrap(); + let address = args.address; state.check_net(&address)?; - let op = resolve_addr(address).and_then(move |addr| { - TcpStream::connect(&addr) - .map_err(ErrBox::from) - .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) + let op = resolve_addr(&address).and_then(move |addr| { + TcpStream::connect(&addr).map_err(ErrBox::from).and_then( + move |tcp_stream| { + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + futures::future::ok(json!({ + "rid": tcp_stream_resource.rid + })) + }, + ) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct ShutdownArgs { + rid: i32, + how: i32, } pub fn op_shutdown( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let inner = base.inner_as_shutdown().unwrap(); - let rid = inner.rid(); - let how = inner.how(); - match resources::lookup(rid) { + args: Value, + _zero_copy: Option, +) -> Result { + let args: ShutdownArgs = serde_json::from_value(args)?; + + let rid = args.rid; + let how = args.how; + match resources::lookup(rid as u32) { None => Err(deno_error::bad_resource()), Some(mut resource) => { let shutdown_mode = match how { @@ -90,67 +100,36 @@ pub fn op_shutdown( 1 => Shutdown::Write, _ => unimplemented!(), }; - blocking(base.sync(), move || { - // Use UFCS for disambiguation - Resource::shutdown(&mut resource, shutdown_mode)?; - Ok(empty_buf()) - }) + + // Use UFCS for disambiguation + Resource::shutdown(&mut resource, shutdown_mode)?; + Ok(JsonOp::Sync(json!({}))) } } } +#[derive(Deserialize)] +struct ListenArgs { + network: String, + address: String, +} + pub fn op_listen( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_listen().unwrap(); - let network = inner.network().unwrap(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: ListenArgs = serde_json::from_value(args)?; + + let network = args.network; assert_eq!(network, "tcp"); - let address = inner.address().unwrap(); + let address = args.address; state.check_net(&address)?; - let addr = resolve_addr(address).wait()?; + let addr = resolve_addr(&address).wait()?; let listener = TcpListener::bind(&addr)?; let resource = resources::add_tcp_listener(listener); - let builder = &mut FlatBufferBuilder::new(); - let inner = - msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); - let response_buf = serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ListenRes, - ..Default::default() - }, - ); - ok_buf(response_buf) -} - -fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> Result { - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - // TODO forward socket_addr to client. - - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::NewConn::create( - builder, - &msg::NewConnArgs { - rid: tcp_stream_resource.rid, - ..Default::default() - }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::NewConn, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!(resource.rid))) } diff --git a/cli/ops/performance.rs b/cli/ops/performance.rs index 94f6dbc387..090fc33232 100644 --- a/cli/ops/performance.rs +++ b/cli/ops/performance.rs @@ -1,10 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; -use crate::msg; +use super::dispatch_json::{JsonOp, Value}; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; // Returns a milliseconds and nanoseconds subsec // since the start time of the deno runtime. @@ -12,10 +9,9 @@ use flatbuffers::FlatBufferBuilder; // nanoseconds are rounded on 2ms. pub fn op_now( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); + _args: Value, + _zero_copy: Option, +) -> Result { let seconds = state.start_time.elapsed().as_secs(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); let reduced_time_precision = 2_000_000; // 2ms in nanoseconds @@ -27,22 +23,8 @@ pub fn op_now( subsec_nanos -= subsec_nanos % reduced_time_precision } - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::NowRes::create( - builder, - &msg::NowResArgs { - seconds, - subsec_nanos, - }, - ); - - ok_buf(serialize_response( - base.cmd_id(), - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::NowRes, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!({ + "seconds": seconds, + "subsecNanos": subsec_nanos, + }))) } diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs index 6249581fb0..5d14f39be5 100644 --- a/cli/ops/permissions.rs +++ b/cli/ops/permissions.rs @@ -1,50 +1,35 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; -use crate::msg; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; pub fn op_permissions( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::PermissionsRes::create( - builder, - &msg::PermissionsResArgs { - run: state.permissions.allows_run(), - read: state.permissions.allows_read(), - write: state.permissions.allows_write(), - net: state.permissions.allows_net(), - env: state.permissions.allows_env(), - hrtime: state.permissions.allows_hrtime(), - }, - ); - let response_buf = serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::PermissionsRes, - ..Default::default() - }, - ); - ok_buf(response_buf) + _args: Value, + _zero_copy: Option, +) -> Result { + Ok(JsonOp::Sync(json!({ + "run": state.permissions.allows_run(), + "read": state.permissions.allows_read(), + "write": state.permissions.allows_write(), + "net": state.permissions.allows_net(), + "env": state.permissions.allows_env(), + "hrtime": state.permissions.allows_hrtime(), + }))) +} + +#[derive(Deserialize)] +struct RevokePermissionArgs { + permission: String, } pub fn op_revoke_permission( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let inner = base.inner_as_permission_revoke().unwrap(); - let permission = inner.permission().unwrap(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: RevokePermissionArgs = serde_json::from_value(args)?; + let permission = args.permission.as_ref(); match permission { "run" => state.permissions.revoke_run(), "read" => state.permissions.revoke_read(), @@ -54,5 +39,6 @@ pub fn op_revoke_permission( "hrtime" => state.permissions.revoke_hrtime(), _ => Ok(()), }?; - ok_buf(empty_buf()) + + Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/process.rs b/cli/ops/process.rs index d7b326d143..8dff53c6e9 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,13 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; -use crate::deno_error; -use crate::msg; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::resources; use crate::signal::kill; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures; use futures::Future; use std; @@ -18,63 +14,72 @@ use tokio_process::CommandExt; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; -fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { - match v { - msg::ProcessStdio::Inherit => std::process::Stdio::inherit(), - msg::ProcessStdio::Piped => std::process::Stdio::piped(), - msg::ProcessStdio::Null => std::process::Stdio::null(), +fn subprocess_stdio_map(s: &str) -> std::process::Stdio { + match s { + "inherit" => std::process::Stdio::inherit(), + "piped" => std::process::Stdio::piped(), + "null" => std::process::Stdio::null(), + _ => unreachable!(), } } +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RunArgs { + args: Vec, + cwd: Option, + env: Vec<(String, String)>, + stdin: String, + stdout: String, + stderr: String, + stdin_rid: u32, + stdout_rid: u32, + stderr_rid: u32, +} + pub fn op_run( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if !base.sync() { - return Err(deno_error::no_async_support()); - } - let cmd_id = base.cmd_id(); + args: Value, + _zero_copy: Option, +) -> Result { + let run_args: RunArgs = serde_json::from_value(args)?; state.check_run()?; - assert!(data.is_none()); - let inner = base.inner_as_run().unwrap(); - let args = inner.args().unwrap(); - let env = inner.env().unwrap(); - let cwd = inner.cwd(); + let args = run_args.args; + let env = run_args.env; + let cwd = run_args.cwd; - let mut c = Command::new(args.get(0)); + let mut c = Command::new(args.get(0).unwrap()); (1..args.len()).for_each(|i| { - let arg = args.get(i); + let arg = args.get(i).unwrap(); c.arg(arg); }); cwd.map(|d| c.current_dir(d)); - (0..env.len()).for_each(|i| { - let entry = env.get(i); - c.env(entry.key().unwrap(), entry.value().unwrap()); - }); + for (key, value) in &env { + c.env(key, value); + } // TODO: make this work with other resources, eg. sockets - let stdin_rid = inner.stdin_rid(); + let stdin_rid = run_args.stdin_rid; if stdin_rid > 0 { c.stdin(resources::get_file(stdin_rid)?); } else { - c.stdin(subprocess_stdio_map(inner.stdin())); + c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())); } - let stdout_rid = inner.stdout_rid(); + let stdout_rid = run_args.stdout_rid; if stdout_rid > 0 { c.stdout(resources::get_file(stdout_rid)?); } else { - c.stdout(subprocess_stdio_map(inner.stdout())); + c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())); } - let stderr_rid = inner.stderr_rid(); + let stderr_rid = run_args.stderr_rid; if stderr_rid > 0 { c.stderr(resources::get_file(stderr_rid)?); } else { - c.stderr(subprocess_stdio_map(inner.stderr())); + c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); } // Spawn the command. @@ -83,44 +88,28 @@ pub fn op_run( let pid = child.id(); let resources = resources::add_child(child); - let mut res_args = msg::RunResArgs { - rid: resources.child_rid, - pid, - ..Default::default() - }; + Ok(JsonOp::Sync(json!({ + "rid": resources.child_rid, + "pid": pid, + "stdinRid": resources.stdin_rid, + "stdoutRid": resources.stdout_rid, + "stderrRid": resources.stderr_rid, + }))) +} - if let Some(stdin_rid) = resources.stdin_rid { - res_args.stdin_rid = stdin_rid; - } - if let Some(stdout_rid) = resources.stdout_rid { - res_args.stdout_rid = stdout_rid; - } - if let Some(stderr_rid) = resources.stderr_rid { - res_args.stderr_rid = stderr_rid; - } - - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::RunRes::create(builder, &res_args); - Ok(Op::Sync(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::RunRes, - ..Default::default() - }, - ))) +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RunStatusArgs { + rid: i32, } pub fn op_run_status( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_run_status().unwrap(); - let rid = inner.rid(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: RunStatusArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; state.check_run()?; @@ -139,44 +128,30 @@ pub fn op_run_status( .expect("Should have either an exit code or a signal."); let got_signal = signal.is_some(); - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::RunStatusRes::create( - builder, - &msg::RunStatusResArgs { - got_signal, - exit_code: code.unwrap_or(-1), - exit_signal: signal.unwrap_or(-1), - }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::RunStatusRes, - ..Default::default() - }, - )) + futures::future::ok(json!({ + "gotSignal": got_signal, + "exitCode": code.unwrap_or(-1), + "exitSignal": signal.unwrap_or(-1), + })) }); - if base.sync() { - let buf = future.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(future))) - } + + Ok(JsonOp::Async(Box::new(future))) +} + +#[derive(Deserialize)] +struct KillArgs { + pid: i32, + signo: i32, } pub fn op_kill( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { + args: Value, + _zero_copy: Option, +) -> Result { state.check_run()?; - assert!(data.is_none()); - let inner = base.inner_as_kill().unwrap(); - let pid = inner.pid(); - let signo = inner.signo(); - kill(pid, signo)?; - ok_buf(empty_buf()) + let args: KillArgs = serde_json::from_value(args)?; + kill(args.pid, args.signo)?; + Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/random.rs b/cli/ops/random.rs index 0c302a0808..7470eab402 100644 --- a/cli/ops/random.rs +++ b/cli/ops/random.rs @@ -1,6 +1,5 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::utils::*; -use crate::msg; +use super::dispatch_json::{JsonOp, Value}; use crate::state::ThreadSafeState; use deno::*; use rand::thread_rng; @@ -8,16 +7,18 @@ use rand::Rng; pub fn op_get_random_values( state: &ThreadSafeState, - _base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { + _args: Value, + zero_copy: Option, +) -> Result { + assert!(zero_copy.is_some()); + if let Some(ref seeded_rng) = state.seeded_rng { let mut rng = seeded_rng.lock().unwrap(); - rng.fill(&mut data.unwrap()[..]); + rng.fill(&mut zero_copy.unwrap()[..]); } else { let mut rng = thread_rng(); - rng.fill(&mut data.unwrap()[..]); + rng.fill(&mut zero_copy.unwrap()[..]); } - ok_buf(empty_buf()) + Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index affe787391..7ab7509dea 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -1,78 +1,50 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::blocking; -use super::utils::ok_buf; -use super::utils::CliOpResult; -use crate::msg; +use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value}; use crate::repl; use crate::resources; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReplStartArgs { + history_file: String, +} pub fn op_repl_start( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let inner = base.inner_as_repl_start().unwrap(); - let cmd_id = base.cmd_id(); - let history_file = String::from(inner.history_file().unwrap()); + args: Value, + _zero_copy: Option, +) -> Result { + let args: ReplStartArgs = serde_json::from_value(args)?; - debug!("op_repl_start {}", history_file); - let history_path = repl::history_path(&state.dir, &history_file); + debug!("op_repl_start {}", args.history_file); + let history_path = repl::history_path(&state.dir, &args.history_file); let repl = repl::Repl::new(history_path); let resource = resources::add_repl(repl); - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::ReplStartRes::create( - builder, - &msg::ReplStartResArgs { rid: resource.rid }, - ); - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ReplStartRes, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!(resource.rid))) +} + +#[derive(Deserialize)] +struct ReplReadlineArgs { + rid: i32, + prompt: String, } pub fn op_repl_readline( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let inner = base.inner_as_repl_readline().unwrap(); - let cmd_id = base.cmd_id(); - let rid = inner.rid(); - let prompt = inner.prompt().unwrap().to_owned(); + args: Value, + _zero_copy: Option, +) -> Result { + let args: ReplReadlineArgs = serde_json::from_value(args)?; + let rid = args.rid; + let prompt = args.prompt; debug!("op_repl_readline {} {}", rid, prompt); - blocking(base.sync(), move || { - let repl = resources::get_repl(rid)?; + blocking_json(false, move || { + let repl = resources::get_repl(rid as u32)?; let line = repl.lock().unwrap().readline(&prompt)?; - - let builder = &mut FlatBufferBuilder::new(); - let line_off = builder.create_string(&line); - let inner = msg::ReplReadlineRes::create( - builder, - &msg::ReplReadlineResArgs { - line: Some(line_off), - }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ReplReadlineRes, - ..Default::default() - }, - )) + Ok(json!(line)) }) } diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index 975d94490b..dafd01d08c 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,54 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::ok_buf; -use super::utils::CliOpResult; -use crate::msg; +use super::dispatch_json::{JsonOp, Value}; use crate::resources::table_entries; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; pub fn op_resources( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - - let builder = &mut FlatBufferBuilder::new(); + _args: Value, + _zero_copy: Option, +) -> Result { let serialized_resources = table_entries(); - - let res: Vec<_> = serialized_resources - .iter() - .map(|(key, value)| { - let repr = builder.create_string(value); - - msg::Resource::create( - builder, - &msg::ResourceArgs { - rid: *key, - repr: Some(repr), - }, - ) - }) - .collect(); - - let resources = builder.create_vector(&res); - let inner = msg::ResourcesRes::create( - builder, - &msg::ResourcesResArgs { - resources: Some(resources), - }, - ); - - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ResourcesRes, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!(serialized_resources))) } diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 550d91f2c4..46217a1882 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -1,12 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::empty_buf; -use super::utils::CliOpResult; -use crate::deno_error; -use crate::msg; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::state::ThreadSafeState; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures::Future; use std; use std::time::Duration; @@ -14,50 +9,34 @@ use std::time::Instant; pub fn op_global_timer_stop( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if !base.sync() { - return Err(deno_error::no_async_support()); - } - assert!(data.is_none()); + _args: Value, + _zero_copy: Option, +) -> Result { let state = state; let mut t = state.global_timer.lock().unwrap(); t.cancel(); - Ok(Op::Sync(empty_buf())) + Ok(JsonOp::Sync(json!({}))) +} + +#[derive(Deserialize)] +struct GlobalTimerArgs { + timeout: u64, } pub fn op_global_timer( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_global_timer().unwrap(); - let val = inner.timeout(); - assert!(val >= 0); + args: Value, + _zero_copy: Option, +) -> Result { + let args: GlobalTimerArgs = serde_json::from_value(args)?; + let val = args.timeout; let state = state; let mut t = state.global_timer.lock().unwrap(); let deadline = Instant::now() + Duration::from_millis(val as u64); - let f = t.new_timeout(deadline); + let f = t + .new_timeout(deadline) + .then(move |_| futures::future::ok(json!({}))); - Ok(Op::Async(Box::new(f.then(move |_| { - let builder = &mut FlatBufferBuilder::new(); - let inner = - msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {}); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::GlobalTimerRes, - ..Default::default() - }, - )) - })))) + Ok(JsonOp::Async(Box::new(f))) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 1eb11420fd..4eeecd068c 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,17 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::ok_buf; -use super::utils::CliOpResult; -use crate::deno_error; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::msg; use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::worker::Worker; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures; use futures::Async; use futures::Future; @@ -39,48 +34,32 @@ impl Future for GetMessageFuture { /// Get message from host as guest worker pub fn op_worker_get_message( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - + _args: Value, + _data: Option, +) -> Result { let op = GetMessageFuture { state: state.clone(), }; - let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); - let op = op.and_then(move |maybe_buf| -> Result { - debug!("op_worker_get_message"); - let builder = &mut FlatBufferBuilder::new(); - let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); - let inner = msg::WorkerGetMessageRes::create( - builder, - &msg::WorkerGetMessageResArgs { data }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::WorkerGetMessageRes, - ..Default::default() - }, - )) - }); - Ok(Op::Async(Box::new(op))) + let op = op + .map_err(move |_| -> ErrBox { unimplemented!() }) + .and_then(move |maybe_buf| { + debug!("op_worker_get_message"); + + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); + + Ok(JsonOp::Async(Box::new(op))) } /// Post message to host as guest worker pub fn op_worker_post_message( state: &ThreadSafeState, - base: &msg::Base<'_>, + _args: Value, data: Option, -) -> CliOpResult { - let cmd_id = base.cmd_id(); +) -> Result { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { @@ -90,33 +69,34 @@ pub fn op_worker_post_message( tx.send(d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!({}))) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateWorkerArgs { + specifier: String, + include_deno_namespace: bool, + has_source_code: bool, + source_code: String, } /// Create worker as the host pub fn op_create_worker( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_create_worker().unwrap(); - let specifier = inner.specifier().unwrap(); + args: Value, + _data: Option, +) -> Result { + let args: CreateWorkerArgs = serde_json::from_value(args)?; + + let specifier = args.specifier.as_ref(); // Only include deno namespace if requested AND current worker // has included namespace (to avoid escalation). let include_deno_namespace = - inner.include_deno_namespace() && state.include_deno_namespace; - let has_source_code = inner.has_source_code(); - let source_code = inner.source_code().unwrap(); + args.include_deno_namespace && state.include_deno_namespace; + let has_source_code = args.has_source_code; + let source_code = args.source_code; let parent_state = state.clone(); @@ -150,24 +130,13 @@ pub fn op_create_worker( let exec_cb = move |worker: Worker| { let mut workers_tl = parent_state.workers.lock().unwrap(); workers_tl.insert(rid, worker.shared()); - let builder = &mut FlatBufferBuilder::new(); - let msg_inner = - msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid }); - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::CreateWorkerRes, - ..Default::default() - }, - ) + json!(rid) }; // Has provided source code, execute immediately. if has_source_code { worker.execute(&source_code).unwrap(); - return ok_buf(exec_cb(worker)); + return Ok(JsonOp::Sync(exec_cb(worker))); } let op = worker @@ -175,22 +144,23 @@ pub fn op_create_worker( .and_then(move |()| Ok(exec_cb(worker))); let result = op.wait()?; - Ok(Op::Sync(result)) + Ok(JsonOp::Sync(result)) +} + +#[derive(Deserialize)] +struct HostGetWorkerClosedArgs { + rid: i32, } /// Return when the worker closes pub fn op_host_get_worker_closed( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_get_worker_closed().unwrap(); - let rid = inner.rid(); + args: Value, + _data: Option, +) -> Result { + let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; let state = state.clone(); let shared_worker_future = { @@ -199,79 +169,58 @@ pub fn op_host_get_worker_closed( worker.clone() }; - let op = Box::new(shared_worker_future.then(move |_result| { - let builder = &mut FlatBufferBuilder::new(); + let op = Box::new( + shared_worker_future.then(move |_result| futures::future::ok(json!({}))), + ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) - })); - Ok(Op::Async(Box::new(op))) + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct HostGetMessageArgs { + rid: i32, } /// Get message from guest worker as host pub fn op_host_get_message( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_get_message().unwrap(); - let rid = inner.rid(); + args: Value, + _data: Option, +) -> Result { + let args: HostGetMessageArgs = serde_json::from_value(args)?; - let op = resources::get_message_from_worker(rid); - let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); - let op = op.and_then(move |maybe_buf| -> Result { - let builder = &mut FlatBufferBuilder::new(); + let rid = args.rid as u32; + let op = resources::get_message_from_worker(rid) + .map_err(move |_| -> ErrBox { unimplemented!() }) + .and_then(move |maybe_buf| { + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); - let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); - let msg_inner = msg::HostGetMessageRes::create( - builder, - &msg::HostGetMessageResArgs { data }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::HostGetMessageRes, - ..Default::default() - }, - )) - }); - Ok(Op::Async(Box::new(op))) + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct HostPostMessageArgs { + rid: i32, } /// Post message to guest worker as host pub fn op_host_post_message( _state: &ThreadSafeState, - base: &msg::Base<'_>, + args: Value, data: Option, -) -> CliOpResult { - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_post_message().unwrap(); - let rid = inner.rid(); +) -> Result { + let args: HostPostMessageArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); resources::post_message_to_worker(rid, d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!({}))) } diff --git a/js/dispatch.ts b/js/dispatch.ts index b59274f91b..6c7551441a 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -23,6 +23,29 @@ export const OP_OPEN = 15; export const OP_CLOSE = 16; export const OP_SEEK = 17; export const OP_FETCH = 18; +export const OP_METRICS = 19; +export const OP_REPL_START = 20; +export const OP_REPL_READLINE = 21; +export const OP_ACCEPT = 22; +export const OP_DIAL = 23; +export const OP_SHUTDOWN = 24; +export const OP_LISTEN = 25; +export const OP_RESOURCES = 26; +export const OP_GET_RANDOM_VALUES = 27; +export const OP_GLOBAL_TIMER_STOP = 28; +export const OP_GLOBAL_TIMER = 29; +export const OP_NOW = 30; +export const OP_PERMISSIONS = 31; +export const OP_REVOKE_PERMISSION = 32; +export const OP_CREATE_WORKER = 33; +export const OP_HOST_GET_WORKER_CLOSED = 34; +export const OP_HOST_POST_MESSAGE = 35; +export const OP_HOST_GET_MESSAGE = 36; +export const OP_WORKER_POST_MESSAGE = 37; +export const OP_WORKER_GET_MESSAGE = 38; +export const OP_RUN = 39; +export const OP_RUN_STATUS = 40; +export const OP_KILL = 41; export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void { switch (opId) { @@ -41,6 +64,15 @@ export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void { case OP_OPEN: case OP_SEEK: case OP_FETCH: + case OP_REPL_START: + case OP_REPL_READLINE: + case OP_ACCEPT: + case OP_DIAL: + case OP_GLOBAL_TIMER: + case OP_HOST_GET_WORKER_CLOSED: + case OP_HOST_GET_MESSAGE: + case OP_WORKER_GET_MESSAGE: + case OP_RUN_STATUS: json.asyncMsgFromRust(opId, ui8); break; default: diff --git a/js/get_random_values.ts b/js/get_random_values.ts index d5c0828c55..154e77f753 100644 --- a/js/get_random_values.ts +++ b/js/get_random_values.ts @@ -1,15 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; +import * as dispatch from "./dispatch"; +import { sendSync } from "./dispatch_json"; import { assert } from "./util"; -function req( - typedArray: ArrayBufferView -): [flatbuffers.Builder, msg.Any, flatbuffers.Offset, ArrayBufferView] { - const builder = flatbuffers.createBuilder(); - const inner = msg.GetRandomValues.createGetRandomValues(builder); - return [builder, msg.Any.GetRandomValues, inner, typedArray]; -} - /** Synchronously collects cryptographically secure random values. The * underlying CSPRNG in use is Rust's `rand::rngs::ThreadRng`. * @@ -28,6 +21,11 @@ export function getRandomValues< >(typedArray: T): T { assert(typedArray !== null, "Input must not be null"); assert(typedArray.length <= 65536, "Input must not be longer than 65536"); - sendSync(...req(typedArray as ArrayBufferView)); + const ui8 = new Uint8Array( + typedArray.buffer, + typedArray.byteOffset, + typedArray.byteLength + ); + sendSync(dispatch.OP_GET_RANDOM_VALUES, {}, ui8); return typedArray; } diff --git a/js/metrics.ts b/js/metrics.ts index e93e9528c7..48e3102e52 100644 --- a/js/metrics.ts +++ b/js/metrics.ts @@ -1,6 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { assert } from "./util"; -import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; +import * as dispatch from "./dispatch"; +import { sendSync } from "./dispatch_json"; export interface Metrics { opsDispatched: number; @@ -10,27 +10,6 @@ export interface Metrics { bytesReceived: number; } -function req(): [flatbuffers.Builder, msg.Any, flatbuffers.Offset] { - const builder = flatbuffers.createBuilder(); - const inner = msg.Metrics.createMetrics(builder); - return [builder, msg.Any.Metrics, inner]; -} - -function res(baseRes: null | msg.Base): Metrics { - assert(baseRes !== null); - assert(msg.Any.MetricsRes === baseRes!.innerType()); - const res = new msg.MetricsRes(); - assert(baseRes!.inner(res) !== null); - - return { - opsDispatched: res.opsDispatched().toFloat64(), - opsCompleted: res.opsCompleted().toFloat64(), - bytesSentControl: res.bytesSentControl().toFloat64(), - bytesSentData: res.bytesSentData().toFloat64(), - bytesReceived: res.bytesReceived().toFloat64() - }; -} - /** Receive metrics from the privileged side of Deno. * * > console.table(Deno.metrics()) @@ -45,5 +24,5 @@ function res(baseRes: null | msg.Base): Metrics { * └──────────────────┴────────┘ */ export function metrics(): Metrics { - return res(sendSync(...req())); + return sendSync(dispatch.OP_METRICS); } diff --git a/js/net.ts b/js/net.ts index 9c3bfbba50..b478ae6132 100644 --- a/js/net.ts +++ b/js/net.ts @@ -1,8 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { EOF, Reader, Writer, Closer } from "./io"; -import { assert, notImplemented } from "./util"; -import { sendSync, sendAsync, msg, flatbuffers } from "./dispatch_flatbuffers"; +import { notImplemented } from "./util"; import { read, write, close } from "./files"; +import * as dispatch from "./dispatch"; +import { sendSync, sendAsync } from "./dispatch_json"; export type Network = "tcp"; // TODO support other types: @@ -36,10 +37,7 @@ enum ShutdownMode { } function shutdown(rid: number, how: ShutdownMode): void { - const builder = flatbuffers.createBuilder(); - const inner = msg.Shutdown.createShutdown(builder, rid, how); - const baseRes = sendSync(builder, msg.Any.Shutdown, inner); - assert(baseRes == null); + sendSync(dispatch.OP_SHUTDOWN, { rid, how }); } class ConnImpl implements Conn { @@ -80,14 +78,9 @@ class ListenerImpl implements Listener { constructor(readonly rid: number) {} async accept(): Promise { - const builder = flatbuffers.createBuilder(); - const inner = msg.Accept.createAccept(builder, this.rid); - const baseRes = await sendAsync(builder, msg.Any.Accept, inner); - assert(baseRes != null); - assert(msg.Any.NewConn === baseRes!.innerType()); - const res = new msg.NewConn(); - assert(baseRes!.inner(res) != null); - return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!); + const res = await sendAsync(dispatch.OP_ACCEPT, { rid: this.rid }); + // TODO(bartlomieju): add remoteAddr and localAddr on Rust side + return new ConnImpl(res.rid, res.remoteAddr!, res.localAddr!); } close(): void { @@ -143,16 +136,8 @@ export interface Conn extends Reader, Writer, Closer { * See `dial()` for a description of the network and address parameters. */ export function listen(network: Network, address: string): Listener { - const builder = flatbuffers.createBuilder(); - const network_ = builder.createString(network); - const address_ = builder.createString(address); - const inner = msg.Listen.createListen(builder, network_, address_); - const baseRes = sendSync(builder, msg.Any.Listen, inner); - assert(baseRes != null); - assert(msg.Any.ListenRes === baseRes!.innerType()); - const res = new msg.ListenRes(); - assert(baseRes!.inner(res) != null); - return new ListenerImpl(res.rid()); + const rid = sendSync(dispatch.OP_LISTEN, { network, address }); + return new ListenerImpl(rid); } /** Dial connects to the address on the named network. @@ -183,16 +168,9 @@ export function listen(network: Network, address: string): Listener { * dial("tcp", ":80") */ export async function dial(network: Network, address: string): Promise { - const builder = flatbuffers.createBuilder(); - const network_ = builder.createString(network); - const address_ = builder.createString(address); - const inner = msg.Dial.createDial(builder, network_, address_); - const baseRes = await sendAsync(builder, msg.Any.Dial, inner); - assert(baseRes != null); - assert(msg.Any.NewConn === baseRes!.innerType()); - const res = new msg.NewConn(); - assert(baseRes!.inner(res) != null); - return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!); + const res = await sendAsync(dispatch.OP_DIAL, { network, address }); + // TODO(bartlomieju): add remoteAddr and localAddr on Rust side + return new ConnImpl(res.rid, res.remoteAddr!, res.localAddr!); } /** **RESERVED** */ diff --git a/js/performance.ts b/js/performance.ts index 7aaa7ae45a..d2f339c467 100644 --- a/js/performance.ts +++ b/js/performance.ts @@ -1,6 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; -import { assert } from "./util"; +import * as dispatch from "./dispatch"; +import { sendSync } from "./dispatch_json"; + +interface NowResponse { + seconds: number; + subsecNanos: number; +} export class Performance { /** Returns a current time from Deno's start in milliseconds. @@ -11,12 +16,7 @@ export class Performance { * console.log(`${t} ms since start!`); */ now(): number { - const builder = flatbuffers.createBuilder(); - const inner = msg.Now.createNow(builder); - const baseRes = sendSync(builder, msg.Any.Now, inner)!; - assert(msg.Any.NowRes === baseRes.innerType()); - const res = new msg.NowRes(); - assert(baseRes.inner(res) != null); - return res.seconds().toFloat64() * 1e3 + res.subsecNanos() / 1e6; + const res = sendSync(dispatch.OP_NOW) as NowResponse; + return res.seconds * 1e3 + res.subsecNanos / 1e6; } } diff --git a/js/permissions.ts b/js/permissions.ts index 822ae8cbd3..bc969f3a83 100644 --- a/js/permissions.ts +++ b/js/permissions.ts @@ -1,6 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; -import { assert } from "./util"; +import * as dispatch from "./dispatch"; +import { sendSync } from "./dispatch_json"; /** Permissions as granted by the caller */ export interface Permissions { @@ -15,23 +15,6 @@ export interface Permissions { export type Permission = keyof Permissions; -function getReq(): [flatbuffers.Builder, msg.Any, flatbuffers.Offset] { - const builder = flatbuffers.createBuilder(); - const inner = msg.Permissions.createPermissions(builder); - return [builder, msg.Any.Permissions, inner]; -} - -function createPermissions(inner: msg.PermissionsRes): Permissions { - return { - read: inner.read(), - write: inner.write(), - net: inner.net(), - env: inner.env(), - run: inner.run(), - hrtime: inner.hrtime() - }; -} - /** Inspect granted permissions for the current program. * * if (Deno.permissions().read) { @@ -40,24 +23,7 @@ function createPermissions(inner: msg.PermissionsRes): Permissions { * } */ export function permissions(): Permissions { - const baseRes = sendSync(...getReq())!; - assert(msg.Any.PermissionsRes === baseRes.innerType()); - const res = new msg.PermissionsRes(); - assert(baseRes.inner(res) != null); - // TypeScript cannot track assertion above, therefore not null assertion - return createPermissions(res); -} - -function revokeReq( - permission: string -): [flatbuffers.Builder, msg.Any, flatbuffers.Offset] { - const builder = flatbuffers.createBuilder(); - const permission_ = builder.createString(permission); - const inner = msg.PermissionRevoke.createPermissionRevoke( - builder, - permission_ - ); - return [builder, msg.Any.PermissionRevoke, inner]; + return sendSync(dispatch.OP_PERMISSIONS) as Permissions; } /** Revoke a permission. When the permission was already revoked nothing changes @@ -69,5 +35,5 @@ function revokeReq( * Deno.readFile("example.test"); // -> error or permission prompt */ export function revokePermission(permission: Permission): void { - sendSync(...revokeReq(permission)); + sendSync(dispatch.OP_REVOKE_PERMISSION, { permission }); } diff --git a/js/process.ts b/js/process.ts index b2b6d47348..dd4f701035 100644 --- a/js/process.ts +++ b/js/process.ts @@ -1,6 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { sendSync, sendAsync, msg, flatbuffers } from "./dispatch_flatbuffers"; - +import { sendSync, sendAsync } from "./dispatch_json"; +import * as dispatch from "./dispatch"; import { File, close } from "./files"; import { ReadCloser, WriteCloser } from "./io"; import { readAll } from "./buffer"; @@ -31,21 +31,22 @@ export interface RunOptions { stdin?: ProcessStdio | number; } +interface RunStatusResponse { + gotSignal: boolean; + exitCode: number; + exitSignal: number; +} + async function runStatus(rid: number): Promise { - const builder = flatbuffers.createBuilder(); - const inner = msg.RunStatus.createRunStatus(builder, rid); + const res = (await sendAsync(dispatch.OP_RUN_STATUS, { + rid + })) as RunStatusResponse; - const baseRes = await sendAsync(builder, msg.Any.RunStatus, inner); - assert(baseRes != null); - assert(msg.Any.RunStatusRes === baseRes!.innerType()); - const res = new msg.RunStatusRes(); - assert(baseRes!.inner(res) != null); - - if (res.gotSignal()) { - const signal = res.exitSignal(); + if (res.gotSignal) { + const signal = res.exitSignal; return { signal, success: false }; } else { - const code = res.exitCode(); + const code = res.exitCode; return { code, success: code === 0 }; } } @@ -56,9 +57,7 @@ async function runStatus(rid: number): Promise { * Requires the `--allow-run` flag. */ export function kill(pid: number, signo: number): void { - const builder = flatbuffers.createBuilder(); - const inner = msg.Kill.createKill(builder, pid, signo); - sendSync(builder, msg.Any.Kill, inner); + sendSync(dispatch.OP_KILL, { pid, signo }); } export class Process { @@ -69,20 +68,20 @@ export class Process { readonly stderr?: ReadCloser; // @internal - constructor(res: msg.RunRes) { - this.rid = res.rid(); - this.pid = res.pid(); + constructor(res: RunResponse) { + this.rid = res.rid; + this.pid = res.pid; - if (res.stdinRid() > 0) { - this.stdin = new File(res.stdinRid()); + if (res.stdinRid && res.stdinRid > 0) { + this.stdin = new File(res.stdinRid); } - if (res.stdoutRid() > 0) { - this.stdout = new File(res.stdoutRid()); + if (res.stdoutRid && res.stdoutRid > 0) { + this.stdout = new File(res.stdoutRid); } - if (res.stderrRid() > 0) { - this.stderr = new File(res.stderrRid()); + if (res.stderrRid && res.stderrRid > 0) { + this.stderr = new File(res.stderrRid); } } @@ -135,14 +134,13 @@ export interface ProcessStatus { signal?: number; // TODO: Make this a string, e.g. 'SIGTERM'. } -function stdioMap(s: ProcessStdio): msg.ProcessStdio { +// TODO: this method is only used to validate proper option, probably can be renamed +function stdioMap(s: string): string { switch (s) { case "inherit": - return msg.ProcessStdio.Inherit; case "piped": - return msg.ProcessStdio.Piped; case "null": - return msg.ProcessStdio.Null; + return s; default: return unreachable(); } @@ -152,6 +150,13 @@ function isRid(arg: unknown): arg is number { return !isNaN(arg as number); } +interface RunResponse { + rid: number; + pid: number; + stdinRid: number | null; + stdoutRid: number | null; + stderrRid: number | null; +} /** * Spawns new subprocess. * @@ -166,71 +171,56 @@ function isRid(arg: unknown): arg is number { * they can be set to either `ProcessStdio` or `rid` of open file. */ export function run(opt: RunOptions): Process { - const builder = flatbuffers.createBuilder(); - const argsOffset = msg.Run.createArgsVector( - builder, - opt.args.map((a): number => builder.createString(a)) - ); - const cwdOffset = opt.cwd == null ? 0 : builder.createString(opt.cwd); - const kvOffset: flatbuffers.Offset[] = []; + assert(opt.args.length > 0); + let env: Array<[string, string]> = []; if (opt.env) { - for (const [key, val] of Object.entries(opt.env)) { - const keyOffset = builder.createString(key); - const valOffset = builder.createString(String(val)); - kvOffset.push(msg.KeyValue.createKeyValue(builder, keyOffset, valOffset)); - } + env = Array.from(Object.entries(opt.env)); } - const envOffset = msg.Run.createEnvVector(builder, kvOffset); - let stdInOffset = stdioMap("inherit"); - let stdOutOffset = stdioMap("inherit"); - let stdErrOffset = stdioMap("inherit"); - let stdinRidOffset = 0; - let stdoutRidOffset = 0; - let stderrRidOffset = 0; + let stdin = stdioMap("inherit"); + let stdout = stdioMap("inherit"); + let stderr = stdioMap("inherit"); + let stdinRid = 0; + let stdoutRid = 0; + let stderrRid = 0; if (opt.stdin) { if (isRid(opt.stdin)) { - stdinRidOffset = opt.stdin; + stdinRid = opt.stdin; } else { - stdInOffset = stdioMap(opt.stdin); + stdin = stdioMap(opt.stdin); } } if (opt.stdout) { if (isRid(opt.stdout)) { - stdoutRidOffset = opt.stdout; + stdoutRid = opt.stdout; } else { - stdOutOffset = stdioMap(opt.stdout); + stdout = stdioMap(opt.stdout); } } if (opt.stderr) { if (isRid(opt.stderr)) { - stderrRidOffset = opt.stderr; + stderrRid = opt.stderr; } else { - stdErrOffset = stdioMap(opt.stderr); + stderr = stdioMap(opt.stderr); } } - const inner = msg.Run.createRun( - builder, - argsOffset, - cwdOffset, - envOffset, - stdInOffset, - stdOutOffset, - stdErrOffset, - stdinRidOffset, - stdoutRidOffset, - stderrRidOffset - ); - const baseRes = sendSync(builder, msg.Any.Run, inner); - assert(baseRes != null); - assert(msg.Any.RunRes === baseRes!.innerType()); - const res = new msg.RunRes(); - assert(baseRes!.inner(res) != null); + const req = { + args: opt.args.map(String), + cwd: opt.cwd, + env, + stdin, + stdout, + stderr, + stdinRid, + stdoutRid, + stderrRid + }; + const res = sendSync(dispatch.OP_RUN, req) as RunResponse; return new Process(res); } diff --git a/js/repl.ts b/js/repl.ts index c971e44200..ac67006576 100644 --- a/js/repl.ts +++ b/js/repl.ts @@ -1,12 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { assert } from "./util"; import { close } from "./files"; -import { sendSync, sendAsync, msg, flatbuffers } from "./dispatch_flatbuffers"; import { exit } from "./os"; import { window } from "./window"; import { core } from "./core"; import { formatError } from "./format_error"; import { stringifyArgs } from "./console"; +import * as dispatch from "./dispatch"; +import { sendSync, sendAsync } from "./dispatch_json"; /** * REPL logging. @@ -43,34 +43,12 @@ const replCommands = { }; function startRepl(historyFile: string): number { - const builder = flatbuffers.createBuilder(); - const historyFile_ = builder.createString(historyFile); - const inner = msg.ReplStart.createReplStart(builder, historyFile_); - - const baseRes = sendSync(builder, msg.Any.ReplStart, inner); - assert(baseRes != null); - assert(msg.Any.ReplStartRes === baseRes!.innerType()); - const innerRes = new msg.ReplStartRes(); - assert(baseRes!.inner(innerRes) != null); - const rid = innerRes.rid(); - return rid; + return sendSync(dispatch.OP_REPL_START, { historyFile }); } // @internal export async function readline(rid: number, prompt: string): Promise { - const builder = flatbuffers.createBuilder(); - const prompt_ = builder.createString(prompt); - const inner = msg.ReplReadline.createReplReadline(builder, rid, prompt_); - - const baseRes = await sendAsync(builder, msg.Any.ReplReadline, inner); - - assert(baseRes != null); - assert(msg.Any.ReplReadlineRes === baseRes!.innerType()); - const innerRes = new msg.ReplReadlineRes(); - assert(baseRes!.inner(innerRes) != null); - const line = innerRes.line(); - assert(line !== null); - return line || ""; + return sendAsync(dispatch.OP_REPL_READLINE, { rid, prompt }); } // Error messages that allow users to continue input diff --git a/js/resources.ts b/js/resources.ts index 49093fab1a..6e2ec202b7 100644 --- a/js/resources.ts +++ b/js/resources.ts @@ -1,6 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { assert } from "./util"; -import { sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; +import * as dispatch from "./dispatch"; +import { sendSync } from "./dispatch_json"; export interface ResourceMap { [rid: number]: string; @@ -10,20 +10,10 @@ export interface ResourceMap { * representation. */ export function resources(): ResourceMap { - const builder = flatbuffers.createBuilder(); - const inner = msg.Resource.createResource(builder, 0, 0); - const baseRes = sendSync(builder, msg.Any.Resources, inner); - assert(baseRes !== null); - assert(msg.Any.ResourcesRes === baseRes!.innerType()); - const res = new msg.ResourcesRes(); - assert(baseRes!.inner(res) !== null); - + const res = sendSync(dispatch.OP_RESOURCES) as Array<[number, string]>; const resources: ResourceMap = {}; - - for (let i = 0; i < res.resourcesLength(); i++) { - const item = res.resources(i)!; - resources[item.rid()!] = item.repr()!; + for (const resourceTuple of res) { + resources[resourceTuple[0]] = resourceTuple[1]; } - return resources; } diff --git a/js/timers.ts b/js/timers.ts index cb0fd531c6..079e779c45 100644 --- a/js/timers.ts +++ b/js/timers.ts @@ -1,7 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { assert } from "./util"; -import { sendAsync, sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; import { window } from "./window"; +import * as dispatch from "./dispatch"; +import { sendSync, sendAsync } from "./dispatch_json"; interface Timer { id: number; @@ -37,11 +38,8 @@ function getTime(): number { } function clearGlobalTimeout(): void { - const builder = flatbuffers.createBuilder(); - const inner = msg.GlobalTimerStop.createGlobalTimerStop(builder); globalTimeoutDue = null; - let res = sendSync(builder, msg.Any.GlobalTimerStop, inner); - assert(res == null); + sendSync(dispatch.OP_GLOBAL_TIMER_STOP); } async function setGlobalTimeout(due: number, now: number): Promise { @@ -52,12 +50,8 @@ async function setGlobalTimeout(due: number, now: number): Promise { assert(timeout >= 0); // Send message to the backend. - const builder = flatbuffers.createBuilder(); - msg.GlobalTimer.startGlobalTimer(builder); - msg.GlobalTimer.addTimeout(builder, timeout); - const inner = msg.GlobalTimer.endGlobalTimer(builder); globalTimeoutDue = due; - await sendAsync(builder, msg.Any.GlobalTimer, inner); + await sendAsync(dispatch.OP_GLOBAL_TIMER, { timeout }); // eslint-disable-next-line @typescript-eslint/no-use-before-define fireTimers(); } diff --git a/js/workers.ts b/js/workers.ts index e59e853c50..7bcbe6279a 100644 --- a/js/workers.ts +++ b/js/workers.ts @@ -1,7 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. /* eslint-disable @typescript-eslint/no-explicit-any */ -import { sendAsync, sendSync, msg, flatbuffers } from "./dispatch_flatbuffers"; -import { assert, log } from "./util"; +import * as dispatch from "./dispatch"; +import { sendAsync, sendSync } from "./dispatch_json"; +import { log } from "./util"; import { TextDecoder, TextEncoder } from "./text_encoding"; import { window } from "./window"; import { blobURLMap } from "./url"; @@ -26,61 +27,28 @@ function createWorker( hasSourceCode: boolean, sourceCode: Uint8Array ): number { - const builder = flatbuffers.createBuilder(); - const specifier_ = builder.createString(specifier); - const sourceCode_ = builder.createString(sourceCode); - const inner = msg.CreateWorker.createCreateWorker( - builder, - specifier_, + return sendSync(dispatch.OP_CREATE_WORKER, { + specifier, includeDenoNamespace, hasSourceCode, - sourceCode_ - ); - const baseRes = sendSync(builder, msg.Any.CreateWorker, inner); - assert(baseRes != null); - assert( - msg.Any.CreateWorkerRes === baseRes!.innerType(), - `base.innerType() unexpectedly is ${baseRes!.innerType()}` - ); - const res = new msg.CreateWorkerRes(); - assert(baseRes!.inner(res) != null); - return res.rid(); + sourceCode: new TextDecoder().decode(sourceCode) + }); } async function hostGetWorkerClosed(rid: number): Promise { - const builder = flatbuffers.createBuilder(); - const inner = msg.HostGetWorkerClosed.createHostGetWorkerClosed(builder, rid); - await sendAsync(builder, msg.Any.HostGetWorkerClosed, inner); + await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { rid }); } function hostPostMessage(rid: number, data: any): void { const dataIntArray = encodeMessage(data); - const builder = flatbuffers.createBuilder(); - const inner = msg.HostPostMessage.createHostPostMessage(builder, rid); - const baseRes = sendSync( - builder, - msg.Any.HostPostMessage, - inner, - dataIntArray - ); - assert(baseRes != null); + sendSync(dispatch.OP_HOST_POST_MESSAGE, { rid }, dataIntArray); } async function hostGetMessage(rid: number): Promise { - const builder = flatbuffers.createBuilder(); - const inner = msg.HostGetMessage.createHostGetMessage(builder, rid); - const baseRes = await sendAsync(builder, msg.Any.HostGetMessage, inner); - assert(baseRes != null); - assert( - msg.Any.HostGetMessageRes === baseRes!.innerType(), - `base.innerType() unexpectedly is ${baseRes!.innerType()}` - ); - const res = new msg.HostGetMessageRes(); - assert(baseRes!.inner(res) != null); + const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { rid }); - const dataArray = res.dataArray(); - if (dataArray != null) { - return decodeMessage(dataArray); + if (res.data != null) { + return decodeMessage(new Uint8Array(res.data)); } else { return null; } @@ -91,36 +59,15 @@ export let onmessage: (e: { data: any }) => void = (): void => {}; export function postMessage(data: any): void { const dataIntArray = encodeMessage(data); - const builder = flatbuffers.createBuilder(); - const inner = msg.WorkerPostMessage.createWorkerPostMessage(builder); - const baseRes = sendSync( - builder, - msg.Any.WorkerPostMessage, - inner, - dataIntArray - ); - assert(baseRes != null); + sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); } export async function getMessage(): Promise { log("getMessage"); - const builder = flatbuffers.createBuilder(); - const inner = msg.WorkerGetMessage.createWorkerGetMessage( - builder, - 0 /* unused */ - ); - const baseRes = await sendAsync(builder, msg.Any.WorkerGetMessage, inner); - assert(baseRes != null); - assert( - msg.Any.WorkerGetMessageRes === baseRes!.innerType(), - `base.innerType() unexpectedly is ${baseRes!.innerType()}` - ); - const res = new msg.WorkerGetMessageRes(); - assert(baseRes!.inner(res) != null); + const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - const dataArray = res.dataArray(); - if (dataArray != null) { - return decodeMessage(dataArray); + if (res.data != null) { + return decodeMessage(new Uint8Array(res.data)); } else { return null; }