1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

refactor: migrate ops to new dispatch wrapper (#7118)

This commit is contained in:
Bartek Iwańczuk 2020-08-28 17:08:24 +02:00 committed by GitHub
parent 31f32ed8c4
commit 7e946858a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1854 additions and 1253 deletions

View file

@ -119,15 +119,3 @@ where
}
}
}
pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox>
where
F: 'static + Send + FnOnce() -> JsonResult,
{
if is_sync {
Ok(JsonOp::Sync(f()?))
} else {
let fut = async move { tokio::task::spawn_blocking(f).await.unwrap() };
Ok(JsonOp::Async(fut.boxed_local()))
}
}

View file

@ -1,23 +1,26 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::diagnostics::Diagnostic;
use crate::source_maps::get_orig_position;
use crate::source_maps::CachedMaps;
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::collections::HashMap;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_apply_source_map",
s.stateful_json_op(op_apply_source_map),
s.stateful_json_op_sync(t, op_apply_source_map),
);
i.register_op(
"op_format_diagnostic",
s.stateful_json_op(op_format_diagnostic),
s.stateful_json_op_sync(t, op_format_diagnostic),
);
}
@ -30,10 +33,11 @@ struct ApplySourceMap {
}
fn op_apply_source_map(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: ApplySourceMap = serde_json::from_value(args)?;
let mut mappings_map: CachedMaps = HashMap::new();
@ -46,18 +50,19 @@ fn op_apply_source_map(
&state.global_state.ts_compiler,
);
Ok(JsonOp::Sync(json!({
Ok(json!({
"fileName": orig_file_name,
"lineNumber": orig_line_number as u32,
"columnNumber": orig_column_number as u32,
})))
}))
}
fn op_format_diagnostic(
_state: &Rc<State>,
_state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let diagnostic = serde_json::from_value::<Diagnostic>(args)?;
Ok(JsonOp::Sync(json!(diagnostic.to_string())))
Ok(json!(diagnostic.to_string()))
}

View file

@ -1,26 +1,29 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use super::io::{StreamResource, StreamResourceHolder};
use crate::http_util::{create_http_client, HttpBody};
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
use reqwest::Client;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_fetch", s.stateful_json_op2(op_fetch));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_fetch", s.stateful_json_op_async(t, op_fetch));
i.register_op(
"op_create_http_client",
s.stateful_json_op2(op_create_http_client),
s.stateful_json_op_sync(t, op_create_http_client),
);
}
@ -33,25 +36,25 @@ struct FetchArgs {
client_rid: Option<u32>,
}
pub fn op_fetch(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_fetch(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
data: BufVec,
) -> Result<Value, ErrBox> {
let args: FetchArgs = serde_json::from_value(args)?;
let url = args.url;
let resource_table_ = isolate_state.resource_table.borrow();
let resource_table2 = resource_table.clone();
let mut client_ref_mut;
let client = if let Some(rid) = args.client_rid {
let resource_table_ = resource_table.borrow();
let r = resource_table_
.get::<HttpClientResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
&r.client
r.client.clone()
} else {
client_ref_mut = state.http_client.borrow_mut();
&mut *client_ref_mut
let client_ref = state.http_client.borrow_mut();
client_ref.clone()
};
let method = match args.method {
@ -87,36 +90,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let resource_table = isolate_state.resource_table.clone();
let future = async move {
let res = request.send().await?;
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
let res = request.send().await?;
let body = HttpBody::from(res);
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"httpBody",
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),
))),
);
debug!("Fetch response {}", url);
let status = res.status();
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});
let body = HttpBody::from(res);
let mut resource_table = resource_table2.borrow_mut();
let rid = resource_table.add(
"httpBody",
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),
))),
);
Ok(json_res)
};
let json_res = json!({
"bodyRid": rid,
"status": status.as_u16(),
"statusText": status.canonical_reason().unwrap_or(""),
"headers": res_headers
});
Ok(JsonOp::Async(future.boxed_local()))
Ok(json_res)
}
struct HttpClientResource {
@ -137,13 +136,12 @@ struct CreateHttpClientOptions {
}
fn op_create_http_client(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: CreateHttpClientOptions = serde_json::from_value(args)?;
let mut resource_table = isolate_state.resource_table.borrow_mut();
if let Some(ca_file) = args.ca_file.clone() {
state.check_read(&PathBuf::from(ca_file))?;
@ -153,5 +151,5 @@ fn op_create_http_client(
let rid =
resource_table.add("httpClient", Box::new(HttpClientResource::new(client)));
Ok(JsonOp::Sync(json!(rid)))
Ok(json!(rid))
}

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,12 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::FutureExt;
use notify::event::Event as NotifyEvent;
use notify::Error as NotifyError;
use notify::EventKind;
@ -14,14 +14,23 @@ use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use serde::Serialize;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
use std::rc::Rc;
use tokio::sync::mpsc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_fs_events_open", s.stateful_json_op2(op_fs_events_open));
i.register_op("op_fs_events_poll", s.stateful_json_op2(op_fs_events_poll));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_fs_events_open",
s.stateful_json_op_sync(t, op_fs_events_open),
);
i.register_op(
"op_fs_events_poll",
s.stateful_json_op_async(t, op_fs_events_poll),
);
}
struct FsEventsResource {
@ -62,12 +71,12 @@ impl From<NotifyEvent> for FsEvent {
}
}
pub fn op_fs_events_open(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
fn op_fs_events_open(
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
#[derive(Deserialize)]
struct OpenArgs {
recursive: bool,
@ -94,24 +103,22 @@ pub fn op_fs_events_open(
watcher.watch(path, recursive_mode)?;
}
let resource = FsEventsResource { watcher, receiver };
let mut resource_table = isolate_state.resource_table.borrow_mut();
let rid = resource_table.add("fsEvents", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
Ok(json!(rid))
}
pub fn op_fs_events_poll(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
async fn op_fs_events_poll(
_state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
#[derive(Deserialize)]
struct PollArgs {
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
let resource_table = isolate_state.resource_table.clone();
let f = poll_fn(move |cx| {
poll_fn(move |cx| {
let mut resource_table = resource_table.borrow_mut();
let watcher = resource_table
.get_mut::<FsEventsResource>(rid)
@ -124,6 +131,6 @@ pub fn op_fs_events_poll(
Some(Err(err)) => Err(err),
None => Ok(json!({ "done": true })),
})
});
Ok(JsonOp::Async(f.boxed_local()))
})
.await
}

View file

@ -2,16 +2,21 @@
//! https://url.spec.whatwg.org/#idna
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use idna::{domain_to_ascii, domain_to_ascii_strict};
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_domain_to_ascii", s.stateful_json_op(op_domain_to_ascii));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_domain_to_ascii",
s.stateful_json_op_sync(t, op_domain_to_ascii),
);
}
#[derive(Deserialize)]
@ -22,10 +27,11 @@ struct DomainToAscii {
}
fn op_domain_to_ascii(
_state: &Rc<State>,
_state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: DomainToAscii = serde_json::from_value(args)?;
if args.be_strict {
domain_to_ascii_strict(args.domain.as_str())
@ -36,5 +42,5 @@ fn op_domain_to_ascii(
let message = format!("Invalid IDNA encoded domain name: {:?}", err);
ErrBox::new("URIError", message)
})
.map(|domain| JsonOp::Sync(json!(domain)))
.map(|domain| json!(domain))
}

View file

@ -1,15 +1,15 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use super::io::{StreamResource, StreamResourceHolder};
use crate::resolve_addr::resolve_addr;
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
@ -23,15 +23,20 @@ use tokio::net::UdpSocket;
use super::net_unix;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_accept", s.stateful_json_op2(op_accept));
i.register_op("op_connect", s.stateful_json_op2(op_connect));
i.register_op("op_shutdown", s.stateful_json_op2(op_shutdown));
i.register_op("op_listen", s.stateful_json_op2(op_listen));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_accept", s.stateful_json_op_async(t, op_accept));
i.register_op("op_connect", s.stateful_json_op_async(t, op_connect));
i.register_op("op_shutdown", s.stateful_json_op_sync(t, op_shutdown));
i.register_op("op_listen", s.stateful_json_op_sync(t, op_listen));
i.register_op(
"op_datagram_receive",
s.stateful_json_op2(op_datagram_receive),
s.stateful_json_op_async(t, op_datagram_receive),
);
i.register_op(
"op_datagram_send",
s.stateful_json_op_async(t, op_datagram_send),
);
i.register_op("op_datagram_send", s.stateful_json_op2(op_datagram_send));
}
#[derive(Deserialize)]
@ -40,75 +45,72 @@ struct AcceptArgs {
transport: String,
}
fn accept_tcp(
isolate_state: &mut CoreIsolateState,
async fn accept_tcp(
resource_table: Rc<RefCell<ResourceTable>>,
args: AcceptArgs,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let rid = args.rid as u32;
let resource_table = isolate_state.resource_table.clone();
let op = async move {
let accept_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let accept_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
)))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": "tcp",
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "tcp",
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
)))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": "tcp",
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "tcp",
}
}))
}
fn op_accept(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
async fn op_accept(
_state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"tcp" => accept_tcp(isolate_state, args, zero_copy),
"tcp" => accept_tcp(resource_table, args, zero_copy).await,
#[cfg(unix)]
"unix" => net_unix::accept_unix(isolate_state, args.rid as u32, zero_copy),
"unix" => {
net_unix::accept_unix(resource_table, args.rid as u32, zero_copy).await
}
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
args.transport
@ -122,58 +124,53 @@ struct ReceiveArgs {
transport: String,
}
fn receive_udp(
isolate_state: &mut CoreIsolateState,
async fn receive_udp(
resource_table: Rc<RefCell<ResourceTable>>,
_state: &Rc<State>,
args: ReceiveArgs,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
let rid = args.rid as u32;
let resource_table = isolate_state.resource_table.clone();
let op = async move {
let receive_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let socket = &mut resource.socket;
socket
.poll_recv_from(cx, &mut zero_copy)
.map_err(ErrBox::from)
});
let (size, remote_addr) = receive_fut.await?;
Ok(json!({
"size": size,
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "udp",
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
let receive_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let socket = &mut resource.socket;
socket
.poll_recv_from(cx, &mut zero_copy)
.map_err(ErrBox::from)
});
let (size, remote_addr) = receive_fut.await?;
Ok(json!({
"size": size,
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "udp",
}
}))
}
fn op_datagram_receive(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_datagram_receive(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"udp" => receive_udp(isolate_state, state, args, zero_copy),
"udp" => receive_udp(resource_table, &state, args, zero_copy).await,
#[cfg(unix)]
"unixpacket" => {
net_unix::receive_unix_packet(isolate_state, args.rid as u32, zero_copy)
net_unix::receive_unix_packet(resource_table, args.rid as u32, zero_copy)
.await
}
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
@ -190,16 +187,15 @@ struct SendArgs {
transport_args: ArgsEnum,
}
fn op_datagram_send(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_datagram_send(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let zero_copy = zero_copy[0].clone();
let resource_table = isolate_state.resource_table.clone();
match serde_json::from_value(args)? {
SendArgs {
rid,
@ -208,7 +204,7 @@ fn op_datagram_send(
} if transport == "udp" => {
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let f = poll_fn(move |cx| {
poll_fn(move |cx| {
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid as u32)
@ -218,8 +214,8 @@ fn op_datagram_send(
.poll_send_to(cx, &zero_copy, &addr)
.map_ok(|byte_length| json!(byte_length))
.map_err(ErrBox::from)
});
Ok(JsonOp::Async(f.boxed_local()))
})
.await
}
#[cfg(unix)]
SendArgs {
@ -229,22 +225,16 @@ fn op_datagram_send(
} if transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.path);
state.check_read(&address_path)?;
let op = async move {
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
ErrBox::new("NotConnected", "Socket has been closed")
})?;
let socket = &mut resource.socket;
let byte_length = socket
.send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
.await?;
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
let socket = &mut resource.socket;
let byte_length = socket
.send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
.await?;
Ok(json!(byte_length))
};
Ok(JsonOp::Async(op.boxed_local()))
Ok(json!(byte_length))
}
_ => Err(ErrBox::type_error("Wrong argument format!")),
}
@ -257,46 +247,42 @@ struct ConnectArgs {
transport_args: ArgsEnum,
}
fn op_connect(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_connect(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
let resource_table = isolate_state.resource_table.clone();
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
match serde_json::from_value(args)? {
ConnectArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "tcp" => {
state.check_net(&args.hostname, args.port)?;
let op = async move {
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
)))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": transport,
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
)))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": transport,
}
}))
}
#[cfg(unix)]
ConnectArgs {
@ -306,32 +292,29 @@ fn op_connect(
let address_path = net_unix::Path::new(&args.path);
state.check_unstable("Deno.connect");
state.check_read(&address_path)?;
let op = async move {
let path = args.path;
let unix_stream =
net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": transport,
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
let path = args.path;
let unix_stream =
net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": transport,
}
}))
}
_ => Err(ErrBox::type_error("Wrong argument format!")),
}
@ -344,11 +327,11 @@ struct ShutdownArgs {
}
fn op_shutdown(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@ -362,7 +345,6 @@ fn op_shutdown(
_ => unimplemented!(),
};
let mut resource_table = isolate_state.resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
@ -377,7 +359,7 @@ fn op_shutdown(
_ => return Err(ErrBox::bad_resource_id()),
}
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
#[allow(dead_code)]
@ -485,12 +467,11 @@ fn listen_udp(
}
fn op_listen(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
let mut resource_table = isolate_state.resource_table.borrow_mut();
) -> Result<Value, ErrBox> {
match serde_json::from_value(args)? {
ListenArgs {
transport,
@ -502,9 +483,9 @@ fn op_listen(
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let (rid, local_addr) = if transport == "tcp" {
listen_tcp(&mut resource_table, addr)?
listen_tcp(resource_table, addr)?
} else {
listen_udp(&mut resource_table, addr)?
listen_udp(resource_table, addr)?
};
debug!(
"New listener {} {}:{}",
@ -512,14 +493,14 @@ fn op_listen(
local_addr.ip().to_string(),
local_addr.port()
);
Ok(JsonOp::Sync(json!({
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
})))
}))
}
#[cfg(unix)]
ListenArgs {
@ -536,22 +517,22 @@ fn op_listen(
state.check_read(&address_path)?;
state.check_write(&address_path)?;
let (rid, local_addr) = if transport == "unix" {
net_unix::listen_unix(&mut resource_table, &address_path)?
net_unix::listen_unix(resource_table, &address_path)?
} else {
net_unix::listen_unix_packet(&mut resource_table, &address_path)?
net_unix::listen_unix_packet(resource_table, &address_path)?
};
debug!(
"New listener {} {}",
rid,
local_addr.as_pathname().unwrap().display(),
);
Ok(JsonOp::Sync(json!({
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
})))
}))
}
#[cfg(unix)]
_ => Err(ErrBox::type_error("Wrong argument format!")),

View file

@ -1,13 +1,13 @@
use super::dispatch_json::{Deserialize, JsonOp};
use super::dispatch_json::{Deserialize, Value};
use super::io::{StreamResource, StreamResourceHolder};
use deno_core::CoreIsolateState;
use deno_core::BufVec;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
pub use std::path::Path;
use std::rc::Rc;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
@ -26,80 +26,63 @@ pub struct UnixListenArgs {
pub path: String,
}
pub fn accept_unix(
isolate_state: &mut CoreIsolateState,
pub async fn accept_unix(
resource_table: Rc<RefCell<ResourceTable>>,
rid: u32,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
let resource_table = isolate_state.resource_table.clone();
{
let _ = resource_table
.borrow()
.get::<UnixListenerResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
}
let op = async move {
let mut resource_table_ = resource_table.borrow_mut();
let listener_resource = {
resource_table_
.get_mut::<UnixListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?
};
let (unix_stream, _socket_addr) =
listener_resource.listener.accept().await?;
drop(resource_table_);
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": "unix",
},
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": "unix",
}
}))
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let mut resource_table_ = resource_table.borrow_mut();
let listener_resource = {
resource_table_
.get_mut::<UnixListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?
};
Ok(JsonOp::Async(op.boxed_local()))
let (unix_stream, _socket_addr) = listener_resource.listener.accept().await?;
drop(resource_table_);
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": "unix",
},
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": "unix",
}
}))
}
pub fn receive_unix_packet(
isolate_state: &mut CoreIsolateState,
pub async fn receive_unix_packet(
resource_table: Rc<RefCell<ResourceTable>>,
rid: u32,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
let resource_table = isolate_state.resource_table.clone();
let op = async move {
let mut resource_table_ = resource_table.borrow_mut();
let resource = resource_table_
.get_mut::<UnixDatagramResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let (size, remote_addr) = resource.socket.recv_from(&mut zero_copy).await?;
Ok(json!({
"size": size,
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": "unixpacket",
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
let mut resource_table_ = resource_table.borrow_mut();
let resource = resource_table_
.get_mut::<UnixDatagramResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let (size, remote_addr) = resource.socket.recv_from(&mut zero_copy).await?;
Ok(json!({
"size": size,
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": "unixpacket",
}
}))
}
pub fn listen_unix(

View file

@ -1,8 +1,9 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::collections::HashMap;
use std::env;
@ -10,29 +11,32 @@ use std::rc::Rc;
use url::Url;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_exit", s.stateful_json_op(op_exit));
i.register_op("op_env", s.stateful_json_op(op_env));
i.register_op("op_exec_path", s.stateful_json_op(op_exec_path));
i.register_op("op_set_env", s.stateful_json_op(op_set_env));
i.register_op("op_get_env", s.stateful_json_op(op_get_env));
i.register_op("op_delete_env", s.stateful_json_op(op_delete_env));
i.register_op("op_hostname", s.stateful_json_op(op_hostname));
i.register_op("op_loadavg", s.stateful_json_op(op_loadavg));
i.register_op("op_os_release", s.stateful_json_op(op_os_release));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_exit", s.stateful_json_op_sync(t, op_exit));
i.register_op("op_env", s.stateful_json_op_sync(t, op_env));
i.register_op("op_exec_path", s.stateful_json_op_sync(t, op_exec_path));
i.register_op("op_set_env", s.stateful_json_op_sync(t, op_set_env));
i.register_op("op_get_env", s.stateful_json_op_sync(t, op_get_env));
i.register_op("op_delete_env", s.stateful_json_op_sync(t, op_delete_env));
i.register_op("op_hostname", s.stateful_json_op_sync(t, op_hostname));
i.register_op("op_loadavg", s.stateful_json_op_sync(t, op_loadavg));
i.register_op("op_os_release", s.stateful_json_op_sync(t, op_os_release));
}
fn op_exec_path(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let current_exe = env::current_exe().unwrap();
state.check_read_blind(&current_exe, "exec_path")?;
// Now apply URL parser to current exe to get fully resolved path, otherwise
// we might get `./` and `../` bits in `exec_path`
let exe_url = Url::from_file_path(current_exe).unwrap();
let path = exe_url.to_file_path().unwrap();
Ok(JsonOp::Sync(json!(path)))
Ok(json!(path))
}
#[derive(Deserialize)]
@ -42,24 +46,26 @@ struct SetEnv {
}
fn op_set_env(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: SetEnv = serde_json::from_value(args)?;
state.check_env()?;
env::set_var(args.key, args.value);
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
fn op_env(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_env()?;
let v = env::vars().collect::<HashMap<String, String>>();
Ok(JsonOp::Sync(json!(v)))
Ok(json!(v))
}
#[derive(Deserialize)]
@ -68,17 +74,18 @@ struct GetEnv {
}
fn op_get_env(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: GetEnv = serde_json::from_value(args)?;
state.check_env()?;
let r = match env::var(args.key) {
Err(env::VarError::NotPresent) => json!([]),
v => json!([v?]),
};
Ok(JsonOp::Sync(r))
Ok(r)
}
#[derive(Deserialize)]
@ -87,14 +94,15 @@ struct DeleteEnv {
}
fn op_delete_env(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: DeleteEnv = serde_json::from_value(args)?;
state.check_env()?;
env::remove_var(args.key);
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
#[derive(Deserialize)]
@ -103,49 +111,49 @@ struct Exit {
}
fn op_exit(
_s: &Rc<State>,
_state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: Exit = serde_json::from_value(args)?;
std::process::exit(args.code)
}
fn op_loadavg(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.loadavg");
state.check_env()?;
match sys_info::loadavg() {
Ok(loadavg) => Ok(JsonOp::Sync(json!([
loadavg.one,
loadavg.five,
loadavg.fifteen
]))),
Err(_) => Ok(JsonOp::Sync(json!([0f64, 0f64, 0f64]))),
Ok(loadavg) => Ok(json!([loadavg.one, loadavg.five, loadavg.fifteen])),
Err(_) => Ok(json!([0f64, 0f64, 0f64])),
}
}
fn op_hostname(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.hostname");
state.check_env()?;
let hostname = sys_info::hostname().unwrap_or_else(|_| "".to_string());
Ok(JsonOp::Sync(json!(hostname)))
Ok(json!(hostname))
}
fn op_os_release(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.osRelease");
state.check_env()?;
let release = sys_info::os_release().unwrap_or_else(|_| "".to_string());
Ok(JsonOp::Sync(json!(release)))
Ok(json!(release))
}

View file

@ -1,24 +1,27 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::path::Path;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_query_permission",
s.stateful_json_op(op_query_permission),
s.stateful_json_op_sync(t, op_query_permission),
);
i.register_op(
"op_revoke_permission",
s.stateful_json_op(op_revoke_permission),
s.stateful_json_op_sync(t, op_revoke_permission),
);
i.register_op(
"op_request_permission",
s.stateful_json_op(op_request_permission),
s.stateful_json_op_sync(t, op_request_permission),
);
}
@ -30,10 +33,11 @@ struct PermissionArgs {
}
pub fn op_query_permission(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?;
let permissions = state.permissions.borrow();
let path = args.path.as_deref();
@ -52,14 +56,15 @@ pub fn op_query_permission(
))
}
};
Ok(JsonOp::Sync(json!({ "state": perm.to_string() })))
Ok(json!({ "state": perm.to_string() }))
}
pub fn op_revoke_permission(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?;
let mut permissions = state.permissions.borrow_mut();
let path = args.path.as_deref();
@ -78,14 +83,15 @@ pub fn op_revoke_permission(
))
}
};
Ok(JsonOp::Sync(json!({ "state": perm.to_string() })))
Ok(json!({ "state": perm.to_string() }))
}
pub fn op_request_permission(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: PermissionArgs = serde_json::from_value(args)?;
let permissions = &mut state.permissions.borrow_mut();
let path = args.path.as_deref();
@ -104,5 +110,5 @@ pub fn op_request_permission(
))
}
};
Ok(JsonOp::Sync(json!({ "state": perm.to_string() })))
Ok(json!({ "state": perm.to_string() }))
}

View file

@ -2,7 +2,6 @@
use crate::ops::dispatch_json::Deserialize;
use crate::ops::dispatch_json::JsonOp;
use crate::ops::dispatch_json::Value;
use crate::ops::json_op;
use crate::state::State;
use deno_core::plugin_api;
use deno_core::CoreIsolate;
@ -21,10 +20,7 @@ use std::task::Context;
use std::task::Poll;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op(
"op_open_plugin",
s.core_op(json_op(s.stateful_op2(op_open_plugin))),
);
i.register_op("op_open_plugin", s.stateful_json_op2(op_open_plugin));
}
#[derive(Deserialize)]

View file

@ -1,15 +1,16 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use super::io::{std_file_resource, StreamResource, StreamResourceHolder};
use crate::signal::kill;
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::rc::Rc;
use tokio::process::Command;
@ -17,9 +18,11 @@ use tokio::process::Command;
use std::os::unix::process::ExitStatusExt;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_run", s.stateful_json_op2(op_run));
i.register_op("op_run_status", s.stateful_json_op2(op_run_status));
i.register_op("op_kill", s.stateful_json_op(op_kill));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_run", s.stateful_json_op_sync(t, op_run));
i.register_op("op_run_status", s.stateful_json_op_async(t, op_run_status));
i.register_op("op_kill", s.stateful_json_op_sync(t, op_kill));
}
fn clone_file(
@ -60,15 +63,14 @@ struct ChildResource {
}
fn op_run(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let args = run_args.cmd;
let env = run_args.env;
@ -88,21 +90,21 @@ fn op_run(
if run_args.stdin != "" {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())?);
} else {
let file = clone_file(run_args.stdin_rid, &mut resource_table)?;
let file = clone_file(run_args.stdin_rid, resource_table)?;
c.stdin(file);
}
if run_args.stdout != "" {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())?);
} else {
let file = clone_file(run_args.stdout_rid, &mut resource_table)?;
let file = clone_file(run_args.stdout_rid, resource_table)?;
c.stdout(file);
}
if run_args.stderr != "" {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())?);
} else {
let file = clone_file(run_args.stderr_rid, &mut resource_table)?;
let file = clone_file(run_args.stderr_rid, resource_table)?;
c.stderr(file);
}
@ -155,13 +157,13 @@ fn op_run(
let child_resource = ChildResource { child };
let child_rid = resource_table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
Ok(json!({
"rid": child_rid,
"pid": pid,
"stdinRid": stdin_rid,
"stdoutRid": stdout_rid,
"stderrRid": stderr_rid,
})))
}))
}
#[derive(Deserialize)]
@ -170,49 +172,44 @@ struct RunStatusArgs {
rid: i32,
}
fn op_run_status(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_run_status(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: RunStatusArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
state.check_run()?;
let resource_table = isolate_state.resource_table.clone();
let future = async move {
let run_status = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let child_resource = resource_table
.get_mut::<ChildResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
let child = &mut child_resource.child;
child.poll_unpin(cx).map_err(ErrBox::from)
})
.await?;
let run_status = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let child_resource = resource_table
.get_mut::<ChildResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
let child = &mut child_resource.child;
child.poll_unpin(cx).map_err(ErrBox::from)
})
.await?;
let code = run_status.code();
let code = run_status.code();
#[cfg(unix)]
let signal = run_status.signal();
#[cfg(not(unix))]
let signal = None;
#[cfg(unix)]
let signal = run_status.signal();
#[cfg(not(unix))]
let signal = None;
code
.or(signal)
.expect("Should have either an exit code or a signal.");
let got_signal = signal.is_some();
code
.or(signal)
.expect("Should have either an exit code or a signal.");
let got_signal = signal.is_some();
Ok(json!({
"gotSignal": got_signal,
"exitCode": code.unwrap_or(-1),
"exitSignal": signal.unwrap_or(-1),
}))
};
Ok(JsonOp::Async(future.boxed_local()))
Ok(json!({
"gotSignal": got_signal,
"exitCode": code.unwrap_or(-1),
"exitSignal": signal.unwrap_or(-1),
}))
}
#[derive(Deserialize)]
@ -222,14 +219,15 @@ struct KillArgs {
}
fn op_kill(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.kill");
state.check_run()?;
let args: KillArgs = serde_json::from_value(args)?;
kill(args.pid, args.signo)?;
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}

View file

@ -1,25 +1,29 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use super::dispatch_json::Value;
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use rand::thread_rng;
use rand::Rng;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_get_random_values",
s.stateful_json_op(op_get_random_values),
s.stateful_json_op_sync(t, op_get_random_values),
);
}
fn op_get_random_values(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1);
if let Some(seeded_rng) = &state.seeded_rng {
@ -29,5 +33,5 @@ fn op_get_random_values(
rng.fill(&mut *zero_copy[0]);
}
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}

View file

@ -1,19 +1,26 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::repl;
use crate::repl::Repl;
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_repl_start", s.stateful_json_op2(op_repl_start));
i.register_op("op_repl_readline", s.stateful_json_op2(op_repl_readline));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_repl_start", s.stateful_json_op_sync(t, op_repl_start));
i.register_op(
"op_repl_readline",
s.stateful_json_op_async(t, op_repl_readline),
);
}
struct ReplResource(Arc<Mutex<Repl>>);
@ -25,20 +32,19 @@ struct ReplStartArgs {
}
fn op_repl_start(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: ReplStartArgs = serde_json::from_value(args)?;
debug!("op_repl_start {}", args.history_file);
let history_path =
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut resource_table = isolate_state.resource_table.borrow_mut();
let rid = resource_table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
Ok(json!(rid))
}
#[derive(Deserialize)]
@ -47,24 +53,26 @@ struct ReplReadlineArgs {
prompt: String,
}
fn op_repl_readline(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
async fn op_repl_readline(
_state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: ReplReadlineArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
let resource_table = isolate_state.resource_table.borrow();
let resource_table = resource_table.borrow();
let resource = resource_table
.get::<ReplResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
let repl = resource.0.clone();
blocking_json(false, move || {
drop(resource_table);
tokio::task::spawn_blocking(move || {
let line = repl.lock().unwrap().readline(&prompt)?;
Ok(json!(line))
})
.await
.unwrap()
}

View file

@ -1,42 +1,43 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_resources", s.stateful_json_op2(op_resources));
i.register_op("op_close", s.stateful_json_op2(op_close));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_resources", s.stateful_json_op_sync(t, op_resources));
i.register_op("op_close", s.stateful_json_op_sync(t, op_close));
}
fn op_resources(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
_state: &State,
resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
let serialized_resources = isolate_state.resource_table.borrow().entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
) -> Result<Value, ErrBox> {
let serialized_resources = resource_table.entries();
Ok(json!(serialized_resources))
}
/// op_close removes a resource from the resource table.
fn op_close(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
_state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
#[derive(Deserialize)]
struct CloseArgs {
rid: i32,
}
let args: CloseArgs = serde_json::from_value(args)?;
let mut resource_table = isolate_state.resource_table.borrow_mut();
resource_table
.close(args.rid as u32)
.ok_or_else(ErrBox::bad_resource_id)?;
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}

View file

@ -1,5 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use super::dispatch_json::Value;
use crate::colors;
use crate::state::State;
use crate::version;
@ -7,24 +7,28 @@ use crate::DenoSubcommand;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::env;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_start", s.stateful_json_op(op_start));
i.register_op("op_main_module", s.stateful_json_op(op_main_module));
i.register_op("op_metrics", s.stateful_json_op(op_metrics));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_start", s.stateful_json_op_sync(t, op_start));
i.register_op("op_main_module", s.stateful_json_op_sync(t, op_main_module));
i.register_op("op_metrics", s.stateful_json_op_sync(t, op_metrics));
}
fn op_start(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let gs = &state.global_state;
Ok(JsonOp::Sync(json!({
Ok(json!({
// TODO(bartlomieju): `cwd` field is not used in JS, remove?
"args": gs.flags.argv.clone(),
"cwd": &env::current_dir().unwrap(),
@ -39,31 +43,33 @@ fn op_start(
"unstableFlag": gs.flags.unstable,
"v8Version": version::v8(),
"versionFlag": gs.flags.version,
})))
}))
}
fn op_main_module(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let main = &state.main_module.to_string();
let main_url = ModuleSpecifier::resolve_url_or_path(&main)?;
if main_url.as_url().scheme() == "file" {
let main_path = std::env::current_dir().unwrap().join(main_url.to_string());
state.check_read_blind(&main_path, "main_module")?;
}
Ok(JsonOp::Sync(json!(&main)))
Ok(json!(&main))
}
fn op_metrics(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let m = &state.metrics.borrow();
Ok(JsonOp::Sync(json!({
Ok(json!({
"opsDispatched": m.ops_dispatched,
"opsDispatchedSync": m.ops_dispatched_sync,
"opsDispatchedAsync": m.ops_dispatched_async,
@ -75,7 +81,7 @@ fn op_metrics(
"bytesSentControl": m.bytes_sent_control,
"bytesSentData": m.bytes_sent_data,
"bytesReceived": m.bytes_received
})))
}))
}
fn ppid() -> Value {

View file

@ -1,19 +1,23 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::futures::FutureExt;
use crate::state::State;
use crate::tsc::runtime_bundle;
use crate::tsc::runtime_compile;
use crate::tsc::runtime_transpile;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ZeroCopyBuf;
use deno_core::ResourceTable;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_compile", s.stateful_json_op(op_compile));
i.register_op("op_transpile", s.stateful_json_op(op_transpile));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_compile", s.stateful_json_op_async(t, op_compile));
i.register_op("op_transpile", s.stateful_json_op_async(t, op_transpile));
}
#[derive(Deserialize, Debug)]
@ -25,40 +29,37 @@ struct CompileArgs {
options: Option<String>,
}
fn op_compile(
state: &Rc<State>,
async fn op_compile(
state: Rc<State>,
_resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_data: BufVec,
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.compile");
let args: CompileArgs = serde_json::from_value(args)?;
let global_state = state.global_state.clone();
let permissions = state.permissions.borrow().clone();
let fut = async move {
let fut = if args.bundle {
runtime_bundle(
&global_state,
permissions,
&args.root_name,
&args.sources,
&args.options,
)
.boxed_local()
} else {
runtime_compile(
&global_state,
permissions,
&args.root_name,
&args.sources,
&args.options,
)
.boxed_local()
};
fut.await
}
.boxed_local();
Ok(JsonOp::Async(fut))
let fut = if args.bundle {
runtime_bundle(
&global_state,
permissions,
&args.root_name,
&args.sources,
&args.options,
)
.boxed_local()
} else {
runtime_compile(
&global_state,
permissions,
&args.root_name,
&args.sources,
&args.options,
)
.boxed_local()
};
let result = fut.await?;
Ok(result)
}
#[derive(Deserialize, Debug)]
@ -67,19 +68,18 @@ struct TranspileArgs {
options: Option<String>,
}
fn op_transpile(
state: &Rc<State>,
async fn op_transpile(
state: Rc<State>,
_resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_data: BufVec,
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.transpile");
let args: TranspileArgs = serde_json::from_value(args)?;
let global_state = state.global_state.clone();
let permissions = state.permissions.borrow().clone();
let fut = async move {
let result =
runtime_transpile(&global_state, permissions, &args.sources, &args.options)
.await
}
.boxed_local();
Ok(JsonOp::Async(fut))
.await?;
Ok(result)
}

View file

@ -1,26 +1,35 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use super::dispatch_json::Value;
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use std::cell::RefCell;
use std::rc::Rc;
#[cfg(unix)]
use super::dispatch_json::Deserialize;
#[cfg(unix)]
#[cfg(unix)]
use futures::future::{poll_fn, FutureExt};
use futures::future::poll_fn;
#[cfg(unix)]
use std::task::Waker;
#[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind};
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_signal_bind", s.stateful_json_op2(op_signal_bind));
i.register_op("op_signal_unbind", s.stateful_json_op2(op_signal_unbind));
i.register_op("op_signal_poll", s.stateful_json_op2(op_signal_poll));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_signal_bind", s.stateful_json_op_sync(t, op_signal_bind));
i.register_op(
"op_signal_unbind",
s.stateful_json_op_sync(t, op_signal_unbind),
);
i.register_op(
"op_signal_poll",
s.stateful_json_op_async(t, op_signal_poll),
);
}
#[cfg(unix)]
@ -42,14 +51,13 @@ struct SignalArgs {
#[cfg(unix)]
fn op_signal_bind(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let rid = resource_table.add(
"signal",
Box::new(SignalStreamResource(
@ -57,22 +65,21 @@ fn op_signal_bind(
None,
)),
);
Ok(JsonOp::Sync(json!({
Ok(json!({
"rid": rid,
})))
}))
}
#[cfg(unix)]
fn op_signal_poll(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_signal_poll(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let resource_table = isolate_state.resource_table.clone();
let future = poll_fn(move |cx| {
let mut resource_table = resource_table.borrow_mut();
@ -83,23 +90,21 @@ fn op_signal_poll(
return signal.0.poll_recv(cx);
}
std::task::Poll::Ready(None)
})
.then(|result| async move { Ok(json!({ "done": result.is_none() })) });
Ok(JsonOp::AsyncUnref(future.boxed_local()))
});
let result = future.await;
Ok(json!({ "done": result.is_none() }))
}
#[cfg(unix)]
pub fn op_signal_unbind(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let resource = resource_table.get::<SignalStreamResource>(rid);
if let Some(signal) = resource {
if let Some(waker) = &signal.1 {
@ -111,35 +116,35 @@ pub fn op_signal_unbind(
resource_table
.close(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
#[cfg(not(unix))]
pub fn op_signal_bind(
_isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
_state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
unimplemented!();
}
#[cfg(not(unix))]
fn op_signal_unbind(
_isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
_state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
unimplemented!();
}
#[cfg(not(unix))]
fn op_signal_poll(
_isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
async fn op_signal_poll(
_state: Rc<State>,
_resource_table: Rc<RefCell<ResourceTable>>,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
unimplemented!();
}

View file

@ -1,30 +1,39 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_global_timer_stop",
s.stateful_json_op(op_global_timer_stop),
s.stateful_json_op_sync(t, op_global_timer_stop),
);
i.register_op("op_global_timer", s.stateful_json_op(op_global_timer));
i.register_op("op_now", s.stateful_json_op(op_now));
i.register_op(
"op_global_timer",
s.stateful_json_op_async(t, op_global_timer),
);
i.register_op("op_now", s.stateful_json_op_sync(t, op_now));
}
fn op_global_timer_stop(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.global_timer.borrow_mut().cancel();
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
#[derive(Deserialize)]
@ -32,22 +41,23 @@ struct GlobalTimerArgs {
timeout: u64,
}
fn op_global_timer(
state: &Rc<State>,
async fn op_global_timer(
state: Rc<State>,
_resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: GlobalTimerArgs = serde_json::from_value(args)?;
let val = args.timeout;
let deadline = Instant::now() + Duration::from_millis(val);
let f = state
let timer_fut = state
.global_timer
.borrow_mut()
.new_timeout(deadline)
.then(move |_| futures::future::ok(json!({})));
Ok(JsonOp::Async(f.boxed_local()))
.boxed_local();
let _ = timer_fut.await;
Ok(json!({}))
}
// Returns a milliseconds and nanoseconds subsec
@ -55,10 +65,11 @@ fn op_global_timer(
// If the High precision flag is not set, the
// nanoseconds are rounded on 2ms.
fn op_now(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
let reduced_time_precision = 2_000_000; // 2ms in nanoseconds
@ -70,8 +81,8 @@ fn op_now(
subsec_nanos -= subsec_nanos % reduced_time_precision;
}
Ok(JsonOp::Sync(json!({
Ok(json!({
"seconds": seconds,
"subsecNanos": subsec_nanos,
})))
}))
}

View file

@ -1,14 +1,15 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use super::io::{StreamResource, StreamResourceHolder};
use crate::resolve_addr::resolve_addr;
use crate::state::State;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
use std::io::BufReader;
@ -31,10 +32,15 @@ use tokio_rustls::{
use webpki::DNSNameRef;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_start_tls", s.stateful_json_op2(op_start_tls));
i.register_op("op_connect_tls", s.stateful_json_op2(op_connect_tls));
i.register_op("op_listen_tls", s.stateful_json_op2(op_listen_tls));
i.register_op("op_accept_tls", s.stateful_json_op2(op_accept_tls));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_start_tls", s.stateful_json_op_async(t, op_start_tls));
i.register_op(
"op_connect_tls",
s.stateful_json_op_async(t, op_connect_tls),
);
i.register_op("op_listen_tls", s.stateful_json_op_sync(t, op_listen_tls));
i.register_op("op_accept_tls", s.stateful_json_op_async(t, op_accept_tls));
}
#[derive(Deserialize)]
@ -54,17 +60,16 @@ struct StartTLSArgs {
hostname: String,
}
pub fn op_start_tls(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
async fn op_start_tls(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.startTls");
let args: StartTLSArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let cert_file = args.cert_file.clone();
let resource_table = isolate_state.resource_table.clone();
let mut domain = args.hostname;
if domain.is_empty() {
@ -76,85 +81,18 @@ pub fn op_start_tls(
state.check_read(Path::new(&path))?;
}
let op = async move {
let mut resource_holder = {
let mut resource_table_ = resource_table.borrow_mut();
match resource_table_.remove::<StreamResourceHolder>(rid) {
Some(resource) => *resource,
None => return Err(ErrBox::bad_resource_id()),
}
};
if let StreamResource::TcpStream(ref mut tcp_stream) =
resource_holder.resource
{
let tcp_stream = tcp_stream.take().unwrap();
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
if let Some(path) = cert_file {
let key_file = File::open(path)?;
let reader = &mut BufReader::new(key_file);
config.root_store.add_pem_file(reader).unwrap();
}
let tls_connector = TlsConnector::from(Arc::new(config));
let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"clientTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": "tcp",
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "tcp",
}
}))
} else {
Err(ErrBox::bad_resource_id())
let mut resource_holder = {
let mut resource_table_ = resource_table.borrow_mut();
match resource_table_.remove::<StreamResourceHolder>(rid) {
Some(resource) => *resource,
None => return Err(ErrBox::bad_resource_id()),
}
};
Ok(JsonOp::Async(op.boxed_local()))
}
pub fn op_connect_tls(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
let args: ConnectTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file.clone();
let resource_table = isolate_state.resource_table.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(Path::new(&path))?;
}
let mut domain = args.hostname.clone();
if domain.is_empty() {
domain.push_str("localhost");
}
let op = async move {
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
if let StreamResource::TcpStream(ref mut tcp_stream) =
resource_holder.resource
{
let tcp_stream = tcp_stream.take().unwrap();
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut config = ClientConfig::new();
@ -166,10 +104,12 @@ pub fn op_connect_tls(
let reader = &mut BufReader::new(key_file);
config.root_store.add_pem_file(reader).unwrap();
}
let tls_connector = TlsConnector::from(Arc::new(config));
let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"clientTlsStream",
@ -182,17 +122,74 @@ pub fn op_connect_tls(
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": args.transport,
"transport": "tcp",
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": args.transport,
"transport": "tcp",
}
}))
};
} else {
Err(ErrBox::bad_resource_id())
}
}
Ok(JsonOp::Async(op.boxed_local()))
async fn op_connect_tls(
state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: ConnectTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(Path::new(&path))?;
}
let mut domain = args.hostname.clone();
if domain.is_empty() {
domain.push_str("localhost");
}
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
if let Some(path) = cert_file {
let key_file = File::open(path)?;
let reader = &mut BufReader::new(key_file);
config.root_store.add_pem_file(reader).unwrap();
}
let tls_connector = TlsConnector::from(Arc::new(config));
let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"clientTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": args.transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": args.transport,
}
}))
}
fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> {
@ -308,11 +305,11 @@ struct ListenTlsArgs {
}
fn op_listen_tls(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: ListenTlsArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp");
@ -339,17 +336,16 @@ fn op_listen_tls(
local_addr,
};
let mut resource_table = isolate_state.resource_table.borrow_mut();
let rid = resource_table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": args.transport,
},
})))
}))
}
#[derive(Deserialize)]
@ -357,72 +353,67 @@ struct AcceptTlsArgs {
rid: i32,
}
fn op_accept_tls(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
async fn op_accept_tls(
_state: Rc<State>,
resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let resource_table = isolate_state.resource_table.clone();
let op = async move {
let accept_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TlsListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
let accept_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TlsListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
match listener.poll_accept(cx).map_err(ErrBox::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let tls_acceptor = {
let resource_table = resource_table.borrow();
let resource = resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)
.expect("Can't find tls listener");
resource.tls_acceptor.clone()
};
let tls_stream = tls_acceptor.accept(tcp_stream).await?;
let rid = {
let mut resource_table = resource_table.borrow_mut();
resource_table.add(
"serverTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),
))),
)
};
Ok(json!({
"rid": rid,
"localAddr": {
"transport": "tcp",
"hostname": local_addr.ip().to_string(),
"port": local_addr.port()
},
"remoteAddr": {
"transport": "tcp",
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port()
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
}))
Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
Poll::Ready(Err(e))
}
}
});
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let tls_acceptor = {
let resource_table = resource_table.borrow();
let resource = resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(ErrBox::bad_resource_id)
.expect("Can't find tls listener");
resource.tls_acceptor.clone()
};
Ok(JsonOp::Async(op.boxed_local()))
let tls_stream = tls_acceptor.accept(tcp_stream).await?;
let rid = {
let mut resource_table = resource_table.borrow_mut();
resource_table.add(
"serverTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),
))),
)
};
Ok(json!({
"rid": rid,
"localAddr": {
"transport": "tcp",
"hostname": local_addr.ip().to_string(),
"port": local_addr.port()
},
"remoteAddr": {
"transport": "tcp",
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port()
}
}))
}

View file

@ -1,10 +1,10 @@
use super::dispatch_json::JsonOp;
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::io::std_file_resource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::state::State;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
#[cfg(unix)]
use nix::sys::termios;
@ -37,9 +37,14 @@ fn get_windows_handle(
}
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_set_raw", s.stateful_json_op2(op_set_raw));
i.register_op("op_isatty", s.stateful_json_op2(op_isatty));
i.register_op("op_console_size", s.stateful_json_op2(op_console_size));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op("op_set_raw", s.stateful_json_op_sync(t, op_set_raw));
i.register_op("op_isatty", s.stateful_json_op_sync(t, op_isatty));
i.register_op(
"op_console_size",
s.stateful_json_op_sync(t, op_console_size),
);
}
#[derive(Deserialize)]
@ -48,12 +53,12 @@ struct SetRawArgs {
mode: bool,
}
pub fn op_set_raw(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
fn op_set_raw(
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.setRaw");
let args: SetRawArgs = serde_json::from_value(args)?;
let rid = args.rid;
@ -70,7 +75,6 @@ pub fn op_set_raw(
use winapi::shared::minwindef::FALSE;
use winapi::um::{consoleapi, handleapi};
let mut resource_table = isolate_state.resource_table.borrow_mut();
let resource_holder = resource_table.get_mut::<StreamResourceHolder>(rid);
if resource_holder.is_none() {
return Err(ErrBox::bad_resource_id());
@ -130,13 +134,12 @@ pub fn op_set_raw(
return Err(ErrBox::last_os_error());
}
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let resource_holder = resource_table.get_mut::<StreamResourceHolder>(rid);
if resource_holder.is_none() {
return Err(ErrBox::bad_resource_id());
@ -161,7 +164,7 @@ pub fn op_set_raw(
if maybe_tty_mode.is_some() {
// Already raw. Skip.
return Ok(JsonOp::Sync(json!({})));
return Ok(json!({}));
}
let original_mode = termios::tcgetattr(raw_fd)?;
@ -184,7 +187,7 @@ pub fn op_set_raw(
raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1;
raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0;
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
} else {
// Try restore saved mode.
let (raw_fd, maybe_tty_mode) =
@ -207,7 +210,7 @@ pub fn op_set_raw(
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
}
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
}
}
@ -217,18 +220,17 @@ struct IsattyArgs {
rid: u32,
}
pub fn op_isatty(
isolate_state: &mut CoreIsolateState,
_state: &Rc<State>,
fn op_isatty(
_state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let isatty: bool =
std_file_resource(&mut resource_table, rid as u32, move |r| match r {
std_file_resource(resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
#[cfg(windows)]
{
@ -250,7 +252,7 @@ pub fn op_isatty(
Err(StreamResource::Stdin(..)) => Ok(atty::is(atty::Stream::Stdin)),
_ => Ok(false),
})?;
Ok(JsonOp::Sync(json!(isatty)))
Ok(json!(isatty))
}
#[derive(Deserialize)]
@ -264,65 +266,63 @@ struct ConsoleSize {
rows: u32,
}
pub fn op_console_size(
isolate_state: &mut CoreIsolateState,
state: &Rc<State>,
fn op_console_size(
state: &State,
resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
state.check_unstable("Deno.consoleSize");
let args: ConsoleSizeArgs = serde_json::from_value(args)?;
let rid = args.rid;
let mut resource_table = isolate_state.resource_table.borrow_mut();
let size =
std_file_resource(&mut resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
#[cfg(windows)]
{
use std::os::windows::io::AsRawHandle;
let handle = std_file.as_raw_handle();
let size = std_file_resource(resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
#[cfg(windows)]
{
use std::os::windows::io::AsRawHandle;
let handle = std_file.as_raw_handle();
unsafe {
let mut bufinfo: winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO =
std::mem::zeroed();
unsafe {
let mut bufinfo: winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO =
std::mem::zeroed();
if winapi::um::wincon::GetConsoleScreenBufferInfo(
handle,
&mut bufinfo,
) == 0
{
return Err(ErrBox::last_os_error());
}
Ok(ConsoleSize {
columns: bufinfo.dwSize.X as u32,
rows: bufinfo.dwSize.Y as u32,
})
if winapi::um::wincon::GetConsoleScreenBufferInfo(
handle,
&mut bufinfo,
) == 0
{
return Err(ErrBox::last_os_error());
}
}
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let fd = std_file.as_raw_fd();
unsafe {
let mut size: libc::winsize = std::mem::zeroed();
if libc::ioctl(fd, libc::TIOCGWINSZ, &mut size as *mut _) != 0 {
return Err(ErrBox::last_os_error());
}
// TODO (caspervonb) return a tuple instead
Ok(ConsoleSize {
columns: size.ws_col as u32,
rows: size.ws_row as u32,
})
}
Ok(ConsoleSize {
columns: bufinfo.dwSize.X as u32,
rows: bufinfo.dwSize.Y as u32,
})
}
}
Err(_) => Err(ErrBox::bad_resource_id()),
})?;
Ok(JsonOp::Sync(json!(size)))
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let fd = std_file.as_raw_fd();
unsafe {
let mut size: libc::winsize = std::mem::zeroed();
if libc::ioctl(fd, libc::TIOCGWINSZ, &mut size as *mut _) != 0 {
return Err(ErrBox::last_os_error());
}
// TODO (caspervonb) return a tuple instead
Ok(ConsoleSize {
columns: size.ws_col as u32,
rows: size.ws_row as u32,
})
}
}
}
Err(_) => Err(ErrBox::bad_resource_id()),
})?;
Ok(json!(size))
}

View file

@ -1,5 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{Deserialize, Value};
use crate::fmt_errors::JSError;
use crate::global_state::GlobalState;
use crate::ops::io::get_stdio;
@ -10,29 +10,37 @@ use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use std::cell::RefCell;
use std::convert::From;
use std::rc::Rc;
use std::sync::Arc;
use std::thread::JoinHandle;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_create_worker", s.stateful_json_op(op_create_worker));
let t = &CoreIsolate::state(i).borrow().resource_table.clone();
i.register_op(
"op_create_worker",
s.stateful_json_op_sync(t, op_create_worker),
);
i.register_op(
"op_host_terminate_worker",
s.stateful_json_op(op_host_terminate_worker),
s.stateful_json_op_sync(t, op_host_terminate_worker),
);
i.register_op(
"op_host_post_message",
s.stateful_json_op(op_host_post_message),
s.stateful_json_op_sync(t, op_host_post_message),
);
i.register_op(
"op_host_get_message",
s.stateful_json_op(op_host_get_message),
s.stateful_json_op_async(t, op_host_get_message),
);
}
@ -180,10 +188,11 @@ struct CreateWorkerArgs {
/// Create worker as the host
fn op_create_worker(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.clone();
@ -197,7 +206,6 @@ fn op_create_worker(
if use_deno_namespace {
state.check_unstable("Worker.deno");
}
let parent_state = state.clone();
let global_state = state.global_state.clone();
let permissions = state.permissions.borrow().clone();
let worker_id = state.next_worker_id.get();
@ -217,12 +225,12 @@ fn op_create_worker(
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
parent_state
state
.workers
.borrow_mut()
.insert(worker_id, (join_handle, worker_handle));
Ok(JsonOp::Sync(json!({ "id": worker_id })))
Ok(json!({ "id": worker_id }))
}
#[derive(Deserialize)]
@ -231,10 +239,11 @@ struct WorkerArgs {
}
fn op_host_terminate_worker(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let (join_handle, worker_handle) = state
@ -244,7 +253,7 @@ fn op_host_terminate_worker(
.expect("No worker handle found");
worker_handle.terminate();
join_handle.join().expect("Panic in worker thread");
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}
fn serialize_worker_event(event: WorkerEvent) -> Value {
@ -298,52 +307,61 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
}
/// Get message from guest worker as host
fn op_host_get_message(
state: &Rc<State>,
async fn op_host_get_message(
state: Rc<State>,
_resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state = state.clone();
let worker_handle = state.workers.borrow()[&id].1.clone();
let op = async move {
let response = match worker_handle.get_event().await? {
Some(event) => {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
if let Some((join_handle, mut worker_handle)) =
state.workers.borrow_mut().remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
}
serialize_worker_event(event)
}
None => {
// Worker shuts down
let mut workers = state.workers.borrow_mut();
// Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
// already meaning that we won't find worker in table - in that case ignore.
if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
let workers_table = state.workers.borrow();
let maybe_handle = workers_table.get(&id);
let worker_handle = if let Some(handle) = maybe_handle {
handle.1.clone()
} else {
// If handle was not found it means worker has already shutdown
return Ok(json!({ "type": "close" }));
};
drop(workers_table);
let response = match worker_handle.get_event().await? {
Some(event) => {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
if let Some((join_handle, mut worker_handle)) =
state.workers.borrow_mut().remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
json!({ "type": "close" })
}
};
Ok(response)
serialize_worker_event(event)
}
None => {
// Worker shuts down
let mut workers = state.workers.borrow_mut();
// Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
// already meaning that we won't find worker in table - in that case ignore.
if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
json!({ "type": "close" })
}
};
Ok(JsonOp::Async(op.boxed_local()))
Ok(response)
}
/// Post message to guest worker as host
fn op_host_post_message(
state: &Rc<State>,
state: &State,
_resource_table: &mut ResourceTable,
args: Value,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, ErrBox> {
) -> Result<Value, ErrBox> {
assert_eq!(data.len(), 1, "Invalid number of arguments");
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
@ -353,5 +371,5 @@ fn op_host_post_message(
let workers = state.workers.borrow();
let worker_handle = workers[&id].1.clone();
worker_handle.post_message(msg)?;
Ok(JsonOp::Sync(json!({})))
Ok(json!({}))
}

View file

@ -11,7 +11,7 @@
offset,
whence,
) {
return sendSync("op_seek", { rid, offset, whence });
return sendSync("op_seek_sync", { rid, offset, whence });
}
function seek(
@ -19,7 +19,7 @@
offset,
whence,
) {
return sendAsync("op_seek", { rid, offset, whence });
return sendAsync("op_seek_async", { rid, offset, whence });
}
function openSync(

View file

@ -6,11 +6,11 @@
const build = window.__bootstrap.build.build;
function chmodSync(path, mode) {
sendSync("op_chmod", { path: pathFromURL(path), mode });
sendSync("op_chmod_sync", { path: pathFromURL(path), mode });
}
async function chmod(path, mode) {
await sendAsync("op_chmod", { path: pathFromURL(path), mode });
await sendAsync("op_chmod_async", { path: pathFromURL(path), mode });
}
function chownSync(
@ -18,7 +18,7 @@
uid,
gid,
) {
sendSync("op_chown", { path: pathFromURL(path), uid, gid });
sendSync("op_chown_sync", { path: pathFromURL(path), uid, gid });
}
async function chown(
@ -26,14 +26,14 @@
uid,
gid,
) {
await sendAsync("op_chown", { path: pathFromURL(path), uid, gid });
await sendAsync("op_chown_async", { path: pathFromURL(path), uid, gid });
}
function copyFileSync(
fromPath,
toPath,
) {
sendSync("op_copy_file", {
sendSync("op_copy_file_sync", {
from: pathFromURL(fromPath),
to: pathFromURL(toPath),
});
@ -43,7 +43,7 @@
fromPath,
toPath,
) {
await sendAsync("op_copy_file", {
await sendAsync("op_copy_file_async", {
from: pathFromURL(fromPath),
to: pathFromURL(toPath),
});
@ -58,19 +58,19 @@
}
function makeTempDirSync(options = {}) {
return sendSync("op_make_temp_dir", options);
return sendSync("op_make_temp_dir_sync", options);
}
function makeTempDir(options = {}) {
return sendAsync("op_make_temp_dir", options);
return sendAsync("op_make_temp_dir_async", options);
}
function makeTempFileSync(options = {}) {
return sendSync("op_make_temp_file", options);
return sendSync("op_make_temp_file_sync", options);
}
function makeTempFile(options = {}) {
return sendAsync("op_make_temp_file", options);
return sendAsync("op_make_temp_file_async", options);
}
function mkdirArgs(path, options) {
@ -87,14 +87,14 @@
}
function mkdirSync(path, options) {
sendSync("op_mkdir", mkdirArgs(path, options));
sendSync("op_mkdir_sync", mkdirArgs(path, options));
}
async function mkdir(
path,
options,
) {
await sendAsync("op_mkdir", mkdirArgs(path, options));
await sendAsync("op_mkdir_async", mkdirArgs(path, options));
}
function res(response) {
@ -102,15 +102,16 @@
}
function readDirSync(path) {
return res(sendSync("op_read_dir", { path: pathFromURL(path) }))[
return res(sendSync("op_read_dir_sync", { path: pathFromURL(path) }))[
Symbol.iterator
]();
}
function readDir(path) {
const array = sendAsync("op_read_dir", { path: pathFromURL(path) }).then(
res,
);
const array = sendAsync("op_read_dir_async", { path: pathFromURL(path) })
.then(
res,
);
return {
async *[Symbol.asyncIterator]() {
yield* await array;
@ -119,26 +120,26 @@
}
function readLinkSync(path) {
return sendSync("op_read_link", { path });
return sendSync("op_read_link_sync", { path });
}
function readLink(path) {
return sendAsync("op_read_link", { path });
return sendAsync("op_read_link_async", { path });
}
function realPathSync(path) {
return sendSync("op_realpath", { path });
return sendSync("op_realpath_sync", { path });
}
function realPath(path) {
return sendAsync("op_realpath", { path });
return sendAsync("op_realpath_async", { path });
}
function removeSync(
path,
options = {},
) {
sendSync("op_remove", {
sendSync("op_remove_sync", {
path: pathFromURL(path),
recursive: !!options.recursive,
});
@ -148,18 +149,18 @@
path,
options = {},
) {
await sendAsync("op_remove", {
await sendAsync("op_remove_async", {
path: pathFromURL(path),
recursive: !!options.recursive,
});
}
function renameSync(oldpath, newpath) {
sendSync("op_rename", { oldpath, newpath });
sendSync("op_rename_sync", { oldpath, newpath });
}
async function rename(oldpath, newpath) {
await sendAsync("op_rename", { oldpath, newpath });
await sendAsync("op_rename_async", { oldpath, newpath });
}
function parseFileInfo(response) {
@ -188,15 +189,15 @@
}
function fstatSync(rid) {
return parseFileInfo(sendSync("op_fstat", { rid }));
return parseFileInfo(sendSync("op_fstat_sync", { rid }));
}
async function fstat(rid) {
return parseFileInfo(await sendAsync("op_fstat", { rid }));
return parseFileInfo(await sendAsync("op_fstat_async", { rid }));
}
async function lstat(path) {
const res = await sendAsync("op_stat", {
const res = await sendAsync("op_stat_async", {
path: pathFromURL(path),
lstat: true,
});
@ -204,7 +205,7 @@
}
function lstatSync(path) {
const res = sendSync("op_stat", {
const res = sendSync("op_stat_sync", {
path: pathFromURL(path),
lstat: true,
});
@ -212,7 +213,7 @@
}
async function stat(path) {
const res = await sendAsync("op_stat", {
const res = await sendAsync("op_stat_async", {
path: pathFromURL(path),
lstat: false,
});
@ -220,7 +221,7 @@
}
function statSync(path) {
const res = sendSync("op_stat", {
const res = sendSync("op_stat_sync", {
path: pathFromURL(path),
lstat: false,
});
@ -236,19 +237,19 @@
}
function ftruncateSync(rid, len) {
sendSync("op_ftruncate", { rid, len: coerceLen(len) });
sendSync("op_ftruncate_sync", { rid, len: coerceLen(len) });
}
async function ftruncate(rid, len) {
await sendAsync("op_ftruncate", { rid, len: coerceLen(len) });
await sendAsync("op_ftruncate_async", { rid, len: coerceLen(len) });
}
function truncateSync(path, len) {
sendSync("op_truncate", { path, len: coerceLen(len) });
sendSync("op_truncate_sync", { path, len: coerceLen(len) });
}
async function truncate(path, len) {
await sendAsync("op_truncate", { path, len: coerceLen(len) });
await sendAsync("op_truncate_async", { path, len: coerceLen(len) });
}
function umask(mask) {
@ -256,11 +257,11 @@
}
function linkSync(oldpath, newpath) {
sendSync("op_link", { oldpath, newpath });
sendSync("op_link_sync", { oldpath, newpath });
}
async function link(oldpath, newpath) {
await sendAsync("op_link", { oldpath, newpath });
await sendAsync("op_link_async", { oldpath, newpath });
}
function toSecondsFromEpoch(v) {
@ -272,7 +273,7 @@
atime,
mtime,
) {
sendSync("op_utime", {
sendSync("op_utime_sync", {
path,
// TODO(ry) split atime, mtime into [seconds, nanoseconds] tuple
atime: toSecondsFromEpoch(atime),
@ -285,7 +286,7 @@
atime,
mtime,
) {
await sendAsync("op_utime", {
await sendAsync("op_utime_async", {
path,
// TODO(ry) split atime, mtime into [seconds, nanoseconds] tuple
atime: toSecondsFromEpoch(atime),
@ -298,7 +299,7 @@
newpath,
options,
) {
sendSync("op_symlink", { oldpath, newpath, options });
sendSync("op_symlink_sync", { oldpath, newpath, options });
}
async function symlink(
@ -306,23 +307,23 @@
newpath,
options,
) {
await sendAsync("op_symlink", { oldpath, newpath, options });
await sendAsync("op_symlink_async", { oldpath, newpath, options });
}
function fdatasyncSync(rid) {
sendSync("op_fdatasync", { rid });
sendSync("op_fdatasync_sync", { rid });
}
async function fdatasync(rid) {
await sendAsync("op_fdatasync", { rid });
await sendAsync("op_fdatasync_async", { rid });
}
function fsyncSync(rid) {
sendSync("op_fsync", { rid });
sendSync("op_fsync_sync", { rid });
}
async function fsync(rid) {
await sendAsync("op_fsync", { rid });
await sendAsync("op_fsync_async", { rid });
}
window.__bootstrap.fs = {

View file

@ -58,17 +58,6 @@ pub struct State {
}
impl State {
pub fn stateful_json_op<D>(
self: &Rc<Self>,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
where
D: Fn(&Rc<State>, Value, &mut [ZeroCopyBuf]) -> Result<JsonOp, ErrBox>,
{
use crate::ops::json_op;
self.core_op(json_op(self.stateful_op(dispatcher)))
}
pub fn stateful_json_op_sync<D>(
self: &Rc<Self>,
resource_table: &Rc<RefCell<ResourceTable>>,
@ -85,7 +74,8 @@ impl State {
let state = self.clone();
let resource_table = resource_table.clone();
move |isolate_state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| {
let f = move |isolate_state: &mut CoreIsolateState,
bufs: &mut [ZeroCopyBuf]| {
let get_error_class_fn = isolate_state.get_error_class_fn;
// The first buffer should contain JSON encoded op arguments; parse them.
@ -108,7 +98,8 @@ impl State {
// Convert to Op.
Op::Sync(serialize_result(None, result, get_error_class_fn))
}
};
self.core_op(f)
}
pub fn stateful_json_op_async<D, F>(
@ -124,7 +115,8 @@ impl State {
let state = self.clone();
let resource_table = resource_table.clone();
move |isolate_state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| {
let f = move |isolate_state: &mut CoreIsolateState,
bufs: &mut [ZeroCopyBuf]| {
let get_error_class_fn = isolate_state.get_error_class_fn;
// The first buffer should contain JSON encoded op arguments; parse them.
@ -163,9 +155,12 @@ impl State {
}
.boxed_local(),
)
}
};
self.core_op(f)
}
// TODO(bartlomieju): remove me - still used by `op_open_plugin` which
// needs access to isolate_state
pub fn stateful_json_op2<D>(
self: &Rc<Self>,
dispatcher: D,
@ -185,7 +180,7 @@ impl State {
/// Wrap core `OpDispatcher` to collect metrics.
// TODO(ry) this should be private. Is called by stateful_json_op or
// stateful_minimal_op
pub fn core_op<D>(
pub(crate) fn core_op<D>(
self: &Rc<Self>,
dispatcher: D,
) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
@ -324,7 +319,7 @@ impl State {
///
/// This is intentionally a non-recoverable check so that people cannot probe
/// for unstable APIs from stable programs.
pub fn check_unstable(self: &Rc<Self>, api_name: &str) {
pub fn check_unstable(&self, api_name: &str) {
// TODO(ry) Maybe use IsolateHandle::terminate_execution here to provide a
// stack trace in JS.
if !self.global_state.flags.unstable {