1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-12 00:54:02 -05:00

refactor: per-worker resource table (#3306)

- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker`
  in `State`
- renames `CliResource` to `StreamResource` and moves all logic related
  to it to `cli/ops/io.rs`
- removes `cli/resources.rs`
- adds `state` argument to `op_read` and `op_write` and consequently adds
  `stateful_minimal_op` to `State`
- IMPORTANT NOTE: workers don't have access to process stdio - this is
  caused by fact that dropping worker would close stdout for process
  (because it's constructed from raw handle, which closes underlying file
  descriptor on drop)
This commit is contained in:
Bartek Iwańczuk 2019-11-14 04:16:57 +01:00 committed by Ry Dahl
parent af448e864c
commit fd62379eaf
14 changed files with 358 additions and 336 deletions

View file

@ -43,7 +43,6 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
@ -57,6 +56,7 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@ -128,6 +128,15 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
let state_ = state.clone();
{
let mut resource_table = state_.lock_resource_table();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr));
}
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),

View file

@ -15,7 +15,6 @@ use deno::PinnedBuf;
use futures::Future;
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@ -112,9 +111,10 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}
pub fn minimal_op(
d: Dispatcher,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,

View file

@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
@ -54,6 +55,7 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
@ -61,8 +63,9 @@ pub fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
let body = res.into_body();
let rid = resources::add_reqwest_body(body);
let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));
let json_res = json!({
"bodyRid": rid,

View file

@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
@ -38,7 +37,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();
let state_ = state.clone();
let mut open_options = tokio::fs::OpenOptions::new();
match mode {
@ -91,7 +90,8 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
let rid = resources::add_fs_file(fs_file);
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
},
);
@ -110,21 +110,21 @@ struct CloseArgs {
}
fn op_close(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;
let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}
#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for SeekFuture {
@ -132,13 +132,13 @@ impl Future for SeekFuture {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let resource = table
.get_mut::<CliResource>(self.rid)
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
CliResource::FsFile(ref mut file) => file,
StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
@ -156,7 +156,7 @@ struct SeekArgs {
}
fn op_seek(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -177,7 +177,11 @@ fn op_seek(
}
};
let fut = SeekFuture { seek_from, rid };
let fut = SeekFuture {
state: state.clone(),
seek_from,
rid,
};
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {

View file

@ -1,19 +1,101 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::deno_error::bad_resource;
use crate::http_body::HttpBody;
use crate::ops::minimal_op;
use crate::resources;
use crate::resources::CliResource;
use crate::resources::DenoAsyncRead;
use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
use futures::Future;
use futures::Poll;
use std;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
#[cfg(not(windows))]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
#[cfg(windows)]
extern crate winapi;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("read", s.core_op(minimal_op(op_read)));
i.register_op("write", s.core_op(minimal_op(op_write)));
i.register_op(
"read",
s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
);
i.register_op(
"write",
s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
);
}
pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
let stdin = StreamResource::Stdin(tokio::io::stdin());
let stdout = StreamResource::Stdout({
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
#[cfg(windows)]
let stdout = unsafe {
std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
winapi::um::winbase::STD_OUTPUT_HANDLE,
))
};
tokio::fs::File::from_std(stdout)
});
let stderr = StreamResource::Stderr(tokio::io::stderr());
(stdin, stdout, stderr)
}
pub enum StreamResource {
Stdin(tokio::io::Stdin),
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(HttpBody),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
}
impl Resource for StreamResource {}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
}
impl DenoAsyncRead for StreamResource {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
let r = match self {
StreamResource::FsFile(ref mut f) => f.poll_read(buf),
StreamResource::Stdin(ref mut f) => f.poll_read(buf),
StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
}
#[derive(Debug, PartialEq)]
@ -27,14 +109,15 @@ enum IoState {
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
where
T: AsMut<[u8]>,
{
Read {
rid,
buf,
state: IoState::Pending,
io_state: IoState::Pending,
state: state.clone(),
}
}
@ -42,11 +125,11 @@ where
/// a buffer.
///
/// Created by the [`read`] function.
#[derive(Debug)]
pub struct Read<T> {
rid: ResourceId,
buf: T,
state: IoState,
io_state: IoState,
state: ThreadSafeState,
}
impl<T> Future for Read<T>
@ -57,21 +140,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.state == IoState::Done {
if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let resource = table
.get_mut::<CliResource>(self.rid)
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
self.state = IoState::Done;
self.io_state = IoState::Done;
Ok(nread.into())
}
}
pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
pub fn op_read(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
@ -80,19 +167,50 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
let fut = read(rid as u32, zero_copy)
let fut = read(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nread| Ok(nread as i32));
Box::new(fut)
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
fn shutdown(&mut self) -> Poll<(), ErrBox>;
}
impl DenoAsyncWrite for StreamResource {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
let r = match self {
StreamResource::FsFile(ref mut f) => f.poll_write(buf),
StreamResource::Stdout(ref mut f) => f.poll_write(buf),
StreamResource::Stderr(ref mut f) => f.poll_write(buf),
StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
unimplemented!()
}
}
/// A future used to write some data to a stream.
#[derive(Debug)]
pub struct Write<T> {
rid: ResourceId,
buf: T,
state: IoState,
io_state: IoState,
state: ThreadSafeState,
}
/// Creates a future that will write some of the buffer `buf` to
@ -100,14 +218,15 @@ pub struct Write<T> {
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
where
T: AsRef<[u8]>,
{
Write {
rid,
buf,
state: IoState::Pending,
io_state: IoState::Pending,
state: state.clone(),
}
}
@ -121,21 +240,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.state == IoState::Done {
if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let resource = table
.get_mut::<CliResource>(self.rid)
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
self.state = IoState::Done;
self.io_state = IoState::Done;
Ok(nwritten.into())
}
}
pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
pub fn op_write(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
@ -144,7 +267,7 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
let fut = write(rid as u32, zero_copy)
let fut = write(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nwritten| Ok(nwritten as i32));

View file

@ -5,6 +5,7 @@ mod dispatch_minimal;
pub use dispatch_json::json_op;
pub use dispatch_json::JsonOp;
pub use dispatch_minimal::minimal_op;
pub use dispatch_minimal::MinimalOp;
pub mod compiler;
pub mod errors;

View file

@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
use crate::resources;
use crate::resources::CliResource;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@ -34,18 +33,19 @@ enum AcceptState {
}
/// Simply accepts a connection.
pub fn accept(rid: ResourceId) -> Accept {
pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
Accept {
state: AcceptState::Eager,
accept_state: AcceptState::Eager,
rid,
state: state.clone(),
}
}
/// A future representing state of accepting a TCP connection.
#[derive(Debug)]
pub struct Accept {
state: AcceptState,
accept_state: AcceptState,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for Accept {
@ -53,11 +53,11 @@ impl Future for Accept {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.state == AcceptState::Done {
if self.accept_state == AcceptState::Done {
panic!("poll Accept after it's done");
}
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let listener_resource = table
.get_mut::<TcpListenerResource>(self.rid)
.ok_or_else(|| {
@ -70,22 +70,22 @@ impl Future for Accept {
let listener = &mut listener_resource.listener;
if self.state == AcceptState::Eager {
if self.accept_state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
self.state = AcceptState::Done;
self.accept_state = AcceptState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
self.state = AcceptState::Pending;
self.accept_state = AcceptState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
self.state = AcceptState::Done;
self.accept_state = AcceptState::Done;
return Err(e);
}
}
@ -94,7 +94,7 @@ impl Future for Accept {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
self.state = AcceptState::Done;
self.accept_state = AcceptState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@ -103,7 +103,7 @@ impl Future for Accept {
}
Err(e) => {
listener_resource.untrack_task();
self.state = AcceptState::Done;
self.accept_state = AcceptState::Done;
Err(e)
}
}
@ -116,23 +116,25 @@ struct AcceptArgs {
}
fn op_accept(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let table = resources::lock_resource_table();
let state_ = state.clone();
let table = state.lock_resource_table();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
let op = accept(rid)
let op = accept(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let rid = resources::add_tcp_stream(tcp_stream);
let mut table = state_.lock_resource_table();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@ -161,7 +163,7 @@ fn op_dial(
) -> Result<JsonOp, ErrBox> {
let args: DialArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
@ -170,7 +172,9 @@ fn op_dial(
.and_then(move |tcp_stream| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let rid = resources::add_tcp_stream(tcp_stream);
let mut table = state_.lock_resource_table();
let rid = table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@ -193,7 +197,7 @@ struct ShutdownArgs {
}
fn op_shutdown(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -208,10 +212,12 @@ fn op_shutdown(
_ => unimplemented!(),
};
let mut table = resources::lock_resource_table();
let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
let mut table = state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?;
match resource {
CliResource::TcpStream(ref mut stream) => {
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?;
}
_ => return Err(bad_resource()),
@ -299,7 +305,7 @@ fn op_listen(
task: None,
local_addr,
};
let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({

View file

@ -1,9 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CloneFileFuture;
use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
@ -28,6 +27,41 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
struct CloneFileFuture {
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for CloneFileFuture {
type Item = tokio::fs::File;
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = self.state.lock_resource_table();
let repr = table
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
match repr {
StreamResource::FsFile(ref mut file) => {
file.poll_try_clone().map_err(ErrBox::from)
}
_ => Err(bad_resource()),
}
}
}
fn clone_file(
rid: u32,
state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> {
(CloneFileFuture {
rid,
state: state.clone(),
})
.wait()
.map(|f| f.into_std())
}
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
match s {
"inherit" => std::process::Stdio::inherit(),
@ -65,6 +99,7 @@ fn op_run(
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
let state_ = state.clone();
let args = run_args.args;
let env = run_args.env;
@ -83,7 +118,7 @@ fn op_run(
// TODO: make this work with other resources, eg. sockets
let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std();
let file = clone_file(stdin_rid, &state_)?;
c.stdin(file);
} else {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
@ -91,7 +126,7 @@ fn op_run(
let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std();
let file = clone_file(stdout_rid, &state_)?;
c.stdout(file);
} else {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
@ -99,7 +134,7 @@ fn op_run(
let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std();
let file = clone_file(stderr_rid, &state_)?;
c.stderr(file);
} else {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
@ -109,29 +144,42 @@ fn op_run(
let mut child = c.spawn_async().map_err(ErrBox::from)?;
let pid = child.id();
let stdin_rid = if child.stdin().is_some() {
let rid = resources::add_child_stdin(child.stdin().take().unwrap());
Some(rid)
} else {
None
let mut table = state_.lock_resource_table();
let stdin_rid = match child.stdin().take() {
Some(child_stdin) => {
let rid = table.add(
"childStdin",
Box::new(StreamResource::ChildStdin(child_stdin)),
);
Some(rid)
}
None => None,
};
let stdout_rid = if child.stdout().is_some() {
let rid = resources::add_child_stdout(child.stdout().take().unwrap());
Some(rid)
} else {
None
let stdout_rid = match child.stdout().take() {
Some(child_stdout) => {
let rid = table.add(
"childStdout",
Box::new(StreamResource::ChildStdout(child_stdout)),
);
Some(rid)
}
None => None,
};
let stderr_rid = if child.stderr().is_some() {
let rid = resources::add_child_stderr(child.stderr().take().unwrap());
Some(rid)
} else {
None
let stderr_rid = match child.stderr().take() {
Some(child_stderr) => {
let rid = table.add(
"childStderr",
Box::new(StreamResource::ChildStderr(child_stderr)),
);
Some(rid)
}
None => None,
};
let child_resource = ChildResource { child };
let mut table = resources::lock_resource_table();
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
@ -145,6 +193,7 @@ fn op_run(
pub struct ChildStatus {
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for ChildStatus {
@ -152,7 +201,7 @@ impl Future for ChildStatus {
type Error = ErrBox;
fn poll(&mut self) -> Poll<ExitStatus, ErrBox> {
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let child_resource = table
.get_mut::<ChildResource>(self.rid)
.ok_or_else(bad_resource)?;
@ -177,7 +226,10 @@ fn op_run_status(
state.check_run()?;
let future = ChildStatus { rid };
let future = ChildStatus {
rid,
state: state.clone(),
};
let future = future.and_then(move |run_status| {
let code = run_status.code();

View file

@ -4,9 +4,8 @@ use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::repl;
use crate::repl::Repl;
use crate::resources;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use std::sync::Arc;
use std::sync::Mutex;
@ -44,7 +43,7 @@ fn op_repl_start(
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
@ -56,7 +55,7 @@ struct ReplReadlineArgs {
}
fn op_repl_readline(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -64,9 +63,10 @@ fn op_repl_readline(
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
let state = state.clone();
blocking_json(false, move || {
let table = resources::lock_resource_table();
let table = state.lock_resource_table();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();
let line = repl.lock().unwrap().readline(&prompt)?;

View file

@ -1,7 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op;
use crate::resources::lock_resource_table;
use crate::state::ThreadSafeState;
use deno::*;
@ -10,11 +9,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
}
fn op_resources(
_state: &ThreadSafeState,
state: &ThreadSafeState,
_args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let resource_table = lock_resource_table();
let resource_table = state.lock_resource_table();
let serialized_resources = resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}

View file

@ -1,13 +1,13 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
use crate::resources;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@ -60,7 +60,7 @@ pub fn op_dial_tls(
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file;
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(&path)?;
@ -99,7 +99,11 @@ pub fn op_dial_tls(
.connect(dnsname, tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
let rid = resources::add_tls_stream(tls_stream);
let mut table = state_.lock_resource_table();
let rid = table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
);
futures::future::ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
@ -265,7 +269,7 @@ fn op_listen_tls(
task: None,
local_addr,
};
let mut table = resources::lock_resource_table();
let mut table = state.lock_resource_table();
let rid = table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
@ -282,18 +286,19 @@ enum AcceptTlsState {
}
/// Simply accepts a TLS connection.
pub fn accept_tls(rid: ResourceId) -> AcceptTls {
pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls {
AcceptTls {
state: AcceptTlsState::Eager,
accept_state: AcceptTlsState::Eager,
rid,
state: state.clone(),
}
}
/// A future representing state of accepting a TLS connection.
#[derive(Debug)]
pub struct AcceptTls {
state: AcceptTlsState,
accept_state: AcceptTlsState,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for AcceptTls {
@ -301,11 +306,11 @@ impl Future for AcceptTls {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.state == AcceptTlsState::Done {
if self.accept_state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
let mut table = resources::lock_resource_table();
let mut table = self.state.lock_resource_table();
let listener_resource = table
.get_mut::<TlsListenerResource>(self.rid)
.ok_or_else(|| {
@ -318,22 +323,22 @@ impl Future for AcceptTls {
let listener = &mut listener_resource.listener;
if self.state == AcceptTlsState::Eager {
if self.accept_state == AcceptTlsState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
self.state = AcceptTlsState::Done;
self.accept_state = AcceptTlsState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
self.state = AcceptTlsState::Pending;
self.accept_state = AcceptTlsState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
self.state = AcceptTlsState::Done;
self.accept_state = AcceptTlsState::Done;
return Err(e);
}
}
@ -342,7 +347,7 @@ impl Future for AcceptTls {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
self.state = AcceptTlsState::Done;
self.accept_state = AcceptTlsState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@ -351,7 +356,7 @@ impl Future for AcceptTls {
}
Err(e) => {
listener_resource.untrack_task();
self.state = AcceptTlsState::Done;
self.accept_state = AcceptTlsState::Done;
Err(e)
}
}
@ -364,21 +369,22 @@ struct AcceptTlsArgs {
}
fn op_accept_tls(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let op = accept_tls(rid)
let state1 = state.clone();
let state2 = state.clone();
let op = accept_tls(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
Ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
let table = resources::lock_resource_table();
let table = state1.lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
@ -389,7 +395,11 @@ fn op_accept_tls(
.accept(tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
let rid = resources::add_server_tls_stream(tls_stream);
let mut table = state2.lock_resource_table();
let rid = table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
);
Ok((rid, local_addr, remote_addr))
})
})

View file

@ -1,209 +0,0 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// Think of Resources as File Descriptors. They are integers that are allocated
// by the privileged side of Deno to refer to various resources. The simplest
// example are standard file system files and stdio - but there will be other
// resources added in the future that might not correspond to operating system
// level File Descriptors. To avoid confusion we call them "resources" not "file
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.
use crate::deno_error::bad_resource;
use crate::http_body::HttpBody;
use deno::ErrBox;
pub use deno::Resource;
pub use deno::ResourceId;
use deno::ResourceTable;
use futures;
use futures::Future;
use futures::Poll;
use reqwest::r#async::Decoder as ReqwestDecoder;
use std;
use std::sync::Mutex;
use std::sync::MutexGuard;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
#[cfg(not(windows))]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
#[cfg(windows)]
extern crate winapi;
lazy_static! {
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new({
let mut table = ResourceTable::default();
// TODO Load these lazily during lookup?
table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin())));
table.add("stdout", Box::new(CliResource::Stdout({
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
#[cfg(windows)]
let stdout = unsafe {
std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
winapi::um::winbase::STD_OUTPUT_HANDLE))
};
tokio::fs::File::from_std(stdout)
})));
table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr())));
table
});
}
// TODO: rename to `StreamResource`
pub enum CliResource {
Stdin(tokio::io::Stdin),
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(HttpBody),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
}
impl Resource for CliResource {}
pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
}
impl DenoAsyncRead for CliResource {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
let r = match self {
CliResource::FsFile(ref mut f) => f.poll_read(buf),
CliResource::Stdin(ref mut f) => f.poll_read(buf),
CliResource::TcpStream(ref mut f) => f.poll_read(buf),
CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
CliResource::HttpBody(ref mut f) => f.poll_read(buf),
CliResource::ChildStdout(ref mut f) => f.poll_read(buf),
CliResource::ChildStderr(ref mut f) => f.poll_read(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
fn shutdown(&mut self) -> Poll<(), ErrBox>;
}
impl DenoAsyncWrite for CliResource {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
let r = match self {
CliResource::FsFile(ref mut f) => f.poll_write(buf),
CliResource::Stdout(ref mut f) => f.poll_write(buf),
CliResource::Stderr(ref mut f) => f.poll_write(buf),
CliResource::TcpStream(ref mut f) => f.poll_write(buf),
CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
CliResource::ChildStdin(ref mut f) => f.poll_write(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
unimplemented!()
}
}
pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId {
let mut table = lock_resource_table();
table.add("fsFile", Box::new(CliResource::FsFile(fs_file)))
}
pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId {
let mut table = lock_resource_table();
table.add("tcpStream", Box::new(CliResource::TcpStream(stream)))
}
pub fn add_tls_stream(stream: ClientTlsStream<TcpStream>) -> ResourceId {
let mut table = lock_resource_table();
table.add(
"clientTlsStream",
Box::new(CliResource::ClientTlsStream(Box::new(stream))),
)
}
pub fn add_server_tls_stream(stream: ServerTlsStream<TcpStream>) -> ResourceId {
let mut table = lock_resource_table();
table.add(
"serverTlsStream",
Box::new(CliResource::ServerTlsStream(Box::new(stream))),
)
}
pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId {
let body = HttpBody::from(body);
let mut table = lock_resource_table();
table.add("httpBody", Box::new(CliResource::HttpBody(body)))
}
pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStdin", Box::new(CliResource::ChildStdin(stdin)))
}
pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStdout", Box::new(CliResource::ChildStdout(stdout)))
}
pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStderr", Box::new(CliResource::ChildStderr(stderr)))
}
pub struct CloneFileFuture {
pub rid: ResourceId,
}
impl Future for CloneFileFuture {
type Item = tokio::fs::File;
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = lock_resource_table();
let repr = table
.get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
match repr {
CliResource::FsFile(ref mut file) => {
file.poll_try_clone().map_err(ErrBox::from)
}
_ => Err(bad_resource()),
}
}
}

View file

@ -5,6 +5,7 @@ use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
@ -15,6 +16,7 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use deno::ResourceTable;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@ -27,6 +29,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
use tokio::sync::mpsc;
@ -52,6 +55,7 @@ pub struct State {
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
pub resource_table: Mutex<ResourceTable>,
}
impl Clone for ThreadSafeState {
@ -68,6 +72,10 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> {
self.resource_table.lock().unwrap()
}
/// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>(
&self,
@ -103,6 +111,21 @@ impl ThreadSafeState {
}
}
/// This is a special function that provides `state` argument to dispatcher.
pub fn stateful_minimal_op<D>(
&self,
dispatcher: D,
) -> impl Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>
where
D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
let state = self.clone();
move |rid: i32, zero_copy: Option<PinnedBuf>| -> Box<MinimalOp> {
dispatcher(&state, rid, zero_copy)
}
}
/// This is a special function that provides `state` argument to dispatcher.
///
/// NOTE: This only works with JSON dispatcher.
@ -220,6 +243,7 @@ impl ThreadSafeState {
start_time: Instant::now(),
seeded_rng,
include_deno_namespace,
resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))

View file

@ -65,7 +65,7 @@ impl ResourceTable {
}
// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
// the resource from the resource table.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
self.map.remove(&rid).map(|(_name, _resource)| ())
}