From 8b90b8e88325d28fe41d8312ea91417b6e66a12e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 14 Nov 2019 18:10:25 +0100 Subject: [PATCH] refactor: per-worker resource table, take 2 (#3342) - removes global `RESOURCE_TABLE` - resource tables are now created per `Worker` in `State` - renames `CliResource` to `StreamResource` and moves all logic related to it to `cli/ops/io.rs` - removes `cli/resources.rs` - adds `state` argument to `op_read` and `op_write` and consequently adds `stateful_minimal_op` to `State` - IMPORTANT NOTE: workers don't have access to process stdio - this is caused by fact that dropping worker would close stdout for process (because it's constructed from raw handle, which closes underlying file descriptor on drop) --- cli/lib.rs | 11 +- cli/ops/dispatch_minimal.rs | 8 +- cli/ops/fetch.rs | 9 +- cli/ops/files.rs | 28 ++--- cli/ops/io.rs | 195 ++++++++++++++++++++++++++++----- cli/ops/mod.rs | 1 + cli/ops/net.rs | 60 ++++++----- cli/ops/process.rs | 98 +++++++++++++---- cli/ops/repl.rs | 10 +- cli/ops/resources.rs | 5 +- cli/ops/tls.rs | 54 ++++++---- cli/resources.rs | 209 ------------------------------------ cli/state.rs | 24 +++++ core/resources.rs | 2 +- 14 files changed, 376 insertions(+), 338 deletions(-) delete mode 100644 cli/resources.rs diff --git a/cli/lib.rs b/cli/lib.rs index 17ca94b555..3d772bb83f 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -43,7 +43,6 @@ pub mod permissions; mod progress; mod repl; pub mod resolve_addr; -pub mod resources; mod shell; mod signal; pub mod source_maps; @@ -57,6 +56,7 @@ pub mod worker; use crate::deno_error::js_check; use crate::deno_error::print_err_and_exit; use crate::global_state::ThreadSafeGlobalState; +use crate::ops::io::get_stdio; use crate::progress::Progress; use crate::state::ThreadSafeState; use crate::worker::Worker; @@ -128,6 +128,15 @@ fn create_worker_and_state( .map_err(deno_error::print_err_and_exit) .unwrap(); + let state_ = state.clone(); + { + let mut resource_table = state_.lock_resource_table(); + let (stdin, stdout, stderr) = get_stdio(); + resource_table.add("stdin", Box::new(stdin)); + resource_table.add("stdout", Box::new(stdout)); + resource_table.add("stderr", Box::new(stderr)); + } + let worker = Worker::new( "main".to_string(), startup_data::deno_isolate_init(), diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index c19521bf12..355a246340 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -15,7 +15,6 @@ use deno::PinnedBuf; use futures::Future; pub type MinimalOp = dyn Future + Send; -pub type Dispatcher = fn(i32, Option) -> Box; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -112,9 +111,10 @@ fn test_parse_min_record() { assert_eq!(parse_min_record(&buf), None); } -pub fn minimal_op( - d: Dispatcher, -) -> impl Fn(&[u8], Option) -> CoreOp { +pub fn minimal_op(d: D) -> impl Fn(&[u8], Option) -> CoreOp +where + D: Fn(i32, Option) -> Box, +{ move |control: &[u8], zero_copy: Option| { let mut record = match parse_min_record(control) { Some(r) => r, diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 1433311710..a1c0fe29cb 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,8 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; +use crate::http_body::HttpBody; use crate::http_util::get_client; use crate::ops::json_op; -use crate::resources; use crate::state::ThreadSafeState; use deno::*; use http::header::HeaderName; @@ -54,6 +55,7 @@ pub fn op_fetch( request = request.header(name, v); } debug!("Before fetch {}", url); + let state_ = state.clone(); let future = request.send().map_err(ErrBox::from).and_then(move |res| { let status = res.status(); let mut res_headers = Vec::new(); @@ -61,8 +63,9 @@ pub fn op_fetch( res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); } - let body = res.into_body(); - let rid = resources::add_reqwest_body(body); + let body = HttpBody::from(res.into_body()); + let mut table = state_.lock_resource_table(); + let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body))); let json_res = json!({ "bodyRid": rid, diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 04b5f98bfd..fc1b8e7d8c 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,12 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::json_op; -use crate::resources; -use crate::resources::CliResource; use crate::state::ThreadSafeState; use deno::*; use futures::Future; @@ -38,7 +37,7 @@ fn op_open( let args: OpenArgs = serde_json::from_value(args)?; let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?; let mode = args.mode.as_ref(); - + let state_ = state.clone(); let mut open_options = tokio::fs::OpenOptions::new(); match mode { @@ -91,7 +90,8 @@ fn op_open( let is_sync = args.promise_id.is_none(); let op = open_options.open(filename).map_err(ErrBox::from).and_then( move |fs_file| { - let rid = resources::add_fs_file(fs_file); + let mut table = state_.lock_resource_table(); + let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); futures::future::ok(json!(rid)) }, ); @@ -110,21 +110,21 @@ struct CloseArgs { } fn op_close( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: CloseArgs = serde_json::from_value(args)?; - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); table.close(args.rid as u32).ok_or_else(bad_resource)?; Ok(JsonOp::Sync(json!({}))) } -#[derive(Debug)] pub struct SeekFuture { seek_from: SeekFrom, rid: ResourceId, + state: ThreadSafeState, } impl Future for SeekFuture { @@ -132,13 +132,13 @@ impl Future for SeekFuture { type Error = ErrBox; fn poll(&mut self) -> Poll { - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let tokio_file = match resource { - CliResource::FsFile(ref mut file) => file, + StreamResource::FsFile(ref mut file) => file, _ => return Err(bad_resource()), }; @@ -156,7 +156,7 @@ struct SeekArgs { } fn op_seek( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -177,7 +177,11 @@ fn op_seek( } }; - let fut = SeekFuture { seek_from, rid }; + let fut = SeekFuture { + state: state.clone(), + seek_from, + rid, + }; let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 3ede4b4112..30c9339997 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,19 +1,117 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; use crate::deno_error::bad_resource; +use crate::http_body::HttpBody; use crate::ops::minimal_op; -use crate::resources; -use crate::resources::CliResource; -use crate::resources::DenoAsyncRead; -use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; +use deno::ErrBox; +use deno::Resource; use deno::*; +use futures; use futures::Future; use futures::Poll; +use std; +use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio_process; +use tokio_rustls::client::TlsStream as ClientTlsStream; +use tokio_rustls::server::TlsStream as ServerTlsStream; + +#[cfg(not(windows))] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; + +#[cfg(windows)] +extern crate winapi; + +lazy_static! { + /// Due to portability issues on Windows handle to stdout is created from raw file descriptor. + /// The caveat of that approach is fact that when this handle is dropped underlying + /// file descriptor is closed - that is highly not desirable in case of stdout. + /// That's why we store this global handle that is then cloned when obtaining stdio + /// for process. In turn when resource table is dropped storing reference to that handle, + /// the handle itself won't be closed (so Deno.core.print) will still work. + static ref STDOUT_HANDLE: std::fs::File = { + #[cfg(not(windows))] + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + #[cfg(windows)] + let stdout = unsafe { + std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_OUTPUT_HANDLE, + )) + }; + + stdout + }; +} pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op("read", s.core_op(minimal_op(op_read))); - i.register_op("write", s.core_op(minimal_op(op_write))); + i.register_op( + "read", + s.core_op(minimal_op(s.stateful_minimal_op(op_read))), + ); + i.register_op( + "write", + s.core_op(minimal_op(s.stateful_minimal_op(op_write))), + ); +} + +pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { + let stdin = StreamResource::Stdin(tokio::io::stdin()); + let stdout = StreamResource::Stdout({ + let stdout = STDOUT_HANDLE + .try_clone() + .expect("Unable to clone stdout handle"); + tokio::fs::File::from_std(stdout) + }); + let stderr = StreamResource::Stderr(tokio::io::stderr()); + + (stdin, stdout, stderr) +} + +pub enum StreamResource { + Stdin(tokio::io::Stdin), + Stdout(tokio::fs::File), + Stderr(tokio::io::Stderr), + FsFile(tokio::fs::File), + TcpStream(tokio::net::TcpStream), + ServerTlsStream(Box>), + ClientTlsStream(Box>), + HttpBody(HttpBody), + ChildStdin(tokio_process::ChildStdin), + ChildStdout(tokio_process::ChildStdout), + ChildStderr(tokio_process::ChildStderr), +} + +impl Resource for StreamResource {} + +/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncRead { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll; +} + +impl DenoAsyncRead for StreamResource { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + let r = match self { + StreamResource::FsFile(ref mut f) => f.poll_read(buf), + StreamResource::Stdin(ref mut f) => f.poll_read(buf), + StreamResource::TcpStream(ref mut f) => f.poll_read(buf), + StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), + StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), + StreamResource::HttpBody(ref mut f) => f.poll_read(buf), + StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), + StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } } #[derive(Debug, PartialEq)] @@ -27,14 +125,15 @@ enum IoState { /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read(rid: ResourceId, buf: T) -> Read +pub fn read(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read where T: AsMut<[u8]>, { Read { rid, buf, - state: IoState::Pending, + io_state: IoState::Pending, + state: state.clone(), } } @@ -42,11 +141,11 @@ where /// a buffer. /// /// Created by the [`read`] function. -#[derive(Debug)] pub struct Read { rid: ResourceId, buf: T, - state: IoState, + io_state: IoState, + state: ThreadSafeState, } impl Future for Read @@ -57,42 +156,77 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == IoState::Done { + if self.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.state = IoState::Done; + self.io_state = IoState::Done; Ok(nread.into()) } } -pub fn op_read(rid: i32, zero_copy: Option) -> Box { +pub fn op_read( + state: &ThreadSafeState, + rid: i32, + zero_copy: Option, +) -> Box { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) + return Box::new(futures::future::err(deno_error::no_buffer_specified())); } Some(buf) => buf, }; - let fut = read(rid as u32, zero_copy) + let fut = read(state, rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nread| Ok(nread as i32)); Box::new(fut) } +/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncWrite { + fn poll_write(&mut self, buf: &[u8]) -> Poll; + + fn shutdown(&mut self) -> Poll<(), ErrBox>; +} + +impl DenoAsyncWrite for StreamResource { + fn poll_write(&mut self, buf: &[u8]) -> Poll { + let r = match self { + StreamResource::FsFile(ref mut f) => f.poll_write(buf), + StreamResource::Stdout(ref mut f) => f.poll_write(buf), + StreamResource::Stderr(ref mut f) => f.poll_write(buf), + StreamResource::TcpStream(ref mut f) => f.poll_write(buf), + StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), + StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), + StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } + + fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + unimplemented!() + } +} + /// A future used to write some data to a stream. -#[derive(Debug)] pub struct Write { rid: ResourceId, buf: T, - state: IoState, + io_state: IoState, + state: ThreadSafeState, } /// Creates a future that will write some of the buffer `buf` to @@ -100,14 +234,15 @@ pub struct Write { /// /// Any error which happens during writing will cause both the stream and the /// buffer to get destroyed. -pub fn write(rid: ResourceId, buf: T) -> Write +pub fn write(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write where T: AsRef<[u8]>, { Write { rid, buf, - state: IoState::Pending, + io_state: IoState::Pending, + state: state.clone(), } } @@ -121,30 +256,34 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == IoState::Done { + if self.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.state = IoState::Done; + self.io_state = IoState::Done; Ok(nwritten.into()) } } -pub fn op_write(rid: i32, zero_copy: Option) -> Box { +pub fn op_write( + state: &ThreadSafeState, + rid: i32, + zero_copy: Option, +) -> Box { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) + return Box::new(futures::future::err(deno_error::no_buffer_specified())); } Some(buf) => buf, }; - let fut = write(rid as u32, zero_copy) + let fut = write(state, rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nwritten| Ok(nwritten as i32)); diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 4e6eb37c81..9b33d59189 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -5,6 +5,7 @@ mod dispatch_minimal; pub use dispatch_json::json_op; pub use dispatch_json::JsonOp; pub use dispatch_minimal::minimal_op; +pub use dispatch_minimal::MinimalOp; pub mod compiler; pub mod errors; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index a4b3bf934e..2fe81e1403 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,12 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::resources; -use crate::resources::CliResource; -use crate::resources::Resource; use crate::state::ThreadSafeState; +use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -34,18 +33,19 @@ enum AcceptState { } /// Simply accepts a connection. -pub fn accept(rid: ResourceId) -> Accept { +pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { Accept { - state: AcceptState::Eager, + accept_state: AcceptState::Eager, rid, + state: state.clone(), } } /// A future representing state of accepting a TCP connection. -#[derive(Debug)] pub struct Accept { - state: AcceptState, + accept_state: AcceptState, rid: ResourceId, + state: ThreadSafeState, } impl Future for Accept { @@ -53,11 +53,11 @@ impl Future for Accept { type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == AcceptState::Done { + if self.accept_state == AcceptState::Done { panic!("poll Accept after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let listener_resource = table .get_mut::(self.rid) .ok_or_else(|| { @@ -70,22 +70,22 @@ impl Future for Accept { let listener = &mut listener_resource.listener; - if self.state == AcceptState::Eager { + if self.accept_state == AcceptState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.state = AcceptState::Pending; + self.accept_state = AcceptState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; return Err(e); } } @@ -94,7 +94,7 @@ impl Future for Accept { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -103,7 +103,7 @@ impl Future for Accept { } Err(e) => { listener_resource.untrack_task(); - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; Err(e) } } @@ -116,23 +116,25 @@ struct AcceptArgs { } fn op_accept( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - - let table = resources::lock_resource_table(); + let state_ = state.clone(); + let table = state.lock_resource_table(); table .get::(rid) .ok_or_else(bad_resource)?; - let op = accept(rid) + let op = accept(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = resources::add_tcp_stream(tcp_stream); + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -161,7 +163,7 @@ fn op_dial( ) -> Result { let args: DialArgs = serde_json::from_value(args)?; assert_eq!(args.transport, "tcp"); // TODO Support others. - + let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { @@ -170,7 +172,9 @@ fn op_dial( .and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = resources::add_tcp_stream(tcp_stream); + let mut table = state_.lock_resource_table(); + let rid = table + .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -193,7 +197,7 @@ struct ShutdownArgs { } fn op_shutdown( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -208,10 +212,12 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut table = resources::lock_resource_table(); - let resource = table.get_mut::(rid).ok_or_else(bad_resource)?; + let mut table = state.lock_resource_table(); + let resource = table + .get_mut::(rid) + .ok_or_else(bad_resource)?; match resource { - CliResource::TcpStream(ref mut stream) => { + StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?; } _ => return Err(bad_resource()), @@ -299,7 +305,7 @@ fn op_listen( task: None, local_addr, }; - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); let rid = table.add("tcpListener", Box::new(listener_resource)); Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/process.rs b/cli/ops/process.rs index f7897ec519..237b02fd05 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,9 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; -use crate::resources; -use crate::resources::CloneFileFuture; use crate::signal::kill; use crate::state::ThreadSafeState; use deno::*; @@ -28,6 +27,41 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); } +struct CloneFileFuture { + rid: ResourceId, + state: ThreadSafeState, +} + +impl Future for CloneFileFuture { + type Item = tokio::fs::File; + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + let mut table = self.state.lock_resource_table(); + let repr = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + match repr { + StreamResource::FsFile(ref mut file) => { + file.poll_try_clone().map_err(ErrBox::from) + } + _ => Err(bad_resource()), + } + } +} + +fn clone_file( + rid: u32, + state: &ThreadSafeState, +) -> Result { + (CloneFileFuture { + rid, + state: state.clone(), + }) + .wait() + .map(|f| f.into_std()) +} + fn subprocess_stdio_map(s: &str) -> std::process::Stdio { match s { "inherit" => std::process::Stdio::inherit(), @@ -65,6 +99,7 @@ fn op_run( let run_args: RunArgs = serde_json::from_value(args)?; state.check_run()?; + let state_ = state.clone(); let args = run_args.args; let env = run_args.env; @@ -83,7 +118,7 @@ fn op_run( // TODO: make this work with other resources, eg. sockets let stdin_rid = run_args.stdin_rid; if stdin_rid > 0 { - let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std(); + let file = clone_file(stdin_rid, &state_)?; c.stdin(file); } else { c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())); @@ -91,7 +126,7 @@ fn op_run( let stdout_rid = run_args.stdout_rid; if stdout_rid > 0 { - let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std(); + let file = clone_file(stdout_rid, &state_)?; c.stdout(file); } else { c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())); @@ -99,7 +134,7 @@ fn op_run( let stderr_rid = run_args.stderr_rid; if stderr_rid > 0 { - let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std(); + let file = clone_file(stderr_rid, &state_)?; c.stderr(file); } else { c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); @@ -109,29 +144,42 @@ fn op_run( let mut child = c.spawn_async().map_err(ErrBox::from)?; let pid = child.id(); - let stdin_rid = if child.stdin().is_some() { - let rid = resources::add_child_stdin(child.stdin().take().unwrap()); - Some(rid) - } else { - None + let mut table = state_.lock_resource_table(); + + let stdin_rid = match child.stdin().take() { + Some(child_stdin) => { + let rid = table.add( + "childStdin", + Box::new(StreamResource::ChildStdin(child_stdin)), + ); + Some(rid) + } + None => None, }; - let stdout_rid = if child.stdout().is_some() { - let rid = resources::add_child_stdout(child.stdout().take().unwrap()); - Some(rid) - } else { - None + let stdout_rid = match child.stdout().take() { + Some(child_stdout) => { + let rid = table.add( + "childStdout", + Box::new(StreamResource::ChildStdout(child_stdout)), + ); + Some(rid) + } + None => None, }; - let stderr_rid = if child.stderr().is_some() { - let rid = resources::add_child_stderr(child.stderr().take().unwrap()); - Some(rid) - } else { - None + let stderr_rid = match child.stderr().take() { + Some(child_stderr) => { + let rid = table.add( + "childStderr", + Box::new(StreamResource::ChildStderr(child_stderr)), + ); + Some(rid) + } + None => None, }; let child_resource = ChildResource { child }; - let mut table = resources::lock_resource_table(); let child_rid = table.add("child", Box::new(child_resource)); Ok(JsonOp::Sync(json!({ @@ -145,6 +193,7 @@ fn op_run( pub struct ChildStatus { rid: ResourceId, + state: ThreadSafeState, } impl Future for ChildStatus { @@ -152,7 +201,7 @@ impl Future for ChildStatus { type Error = ErrBox; fn poll(&mut self) -> Poll { - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let child_resource = table .get_mut::(self.rid) .ok_or_else(bad_resource)?; @@ -177,7 +226,10 @@ fn op_run_status( state.check_run()?; - let future = ChildStatus { rid }; + let future = ChildStatus { + rid, + state: state.clone(), + }; let future = future.and_then(move |run_status| { let code = run_status.code(); diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 723fb25718..5919ea586b 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -4,9 +4,8 @@ use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::repl; use crate::repl::Repl; -use crate::resources; -use crate::resources::Resource; use crate::state::ThreadSafeState; +use deno::Resource; use deno::*; use std::sync::Arc; use std::sync::Mutex; @@ -44,7 +43,7 @@ fn op_repl_start( repl::history_path(&state.global_state.dir, &args.history_file); let repl = repl::Repl::new(history_path); let resource = ReplResource(Arc::new(Mutex::new(repl))); - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); let rid = table.add("repl", Box::new(resource)); Ok(JsonOp::Sync(json!(rid))) } @@ -56,7 +55,7 @@ struct ReplReadlineArgs { } fn op_repl_readline( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -64,9 +63,10 @@ fn op_repl_readline( let rid = args.rid as u32; let prompt = args.prompt; debug!("op_repl_readline {} {}", rid, prompt); + let state = state.clone(); blocking_json(false, move || { - let table = resources::lock_resource_table(); + let table = state.lock_resource_table(); let resource = table.get::(rid).ok_or_else(bad_resource)?; let repl = resource.0.clone(); let line = repl.lock().unwrap().readline(&prompt)?; diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index d92c6a83c3..c35e9762c8 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,7 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; -use crate::resources::lock_resource_table; use crate::state::ThreadSafeState; use deno::*; @@ -10,11 +9,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { } fn op_resources( - _state: &ThreadSafeState, + state: &ThreadSafeState, _args: Value, _zero_copy: Option, ) -> Result { - let resource_table = lock_resource_table(); + let resource_table = state.lock_resource_table(); let serialized_resources = resource_table.entries(); Ok(JsonOp::Sync(json!(serialized_resources))) } diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 6e8348c915..48419f76f1 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,13 +1,13 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::resources; -use crate::resources::Resource; use crate::state::ThreadSafeState; +use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -60,7 +60,7 @@ pub fn op_dial_tls( ) -> Result { let args: DialTLSArgs = serde_json::from_value(args)?; let cert_file = args.cert_file; - + let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; if let Some(path) = cert_file.clone() { state.check_read(&path)?; @@ -99,7 +99,11 @@ pub fn op_dial_tls( .connect(dnsname, tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let rid = resources::add_tls_stream(tls_stream); + let mut table = state_.lock_resource_table(); + let rid = table.add( + "clientTlsStream", + Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + ); futures::future::ok(json!({ "rid": rid, "localAddr": local_addr.to_string(), @@ -265,7 +269,7 @@ fn op_listen_tls( task: None, local_addr, }; - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); let rid = table.add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ @@ -282,18 +286,19 @@ enum AcceptTlsState { } /// Simply accepts a TLS connection. -pub fn accept_tls(rid: ResourceId) -> AcceptTls { +pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { AcceptTls { - state: AcceptTlsState::Eager, + accept_state: AcceptTlsState::Eager, rid, + state: state.clone(), } } /// A future representing state of accepting a TLS connection. -#[derive(Debug)] pub struct AcceptTls { - state: AcceptTlsState, + accept_state: AcceptTlsState, rid: ResourceId, + state: ThreadSafeState, } impl Future for AcceptTls { @@ -301,11 +306,11 @@ impl Future for AcceptTls { type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == AcceptTlsState::Done { + if self.accept_state == AcceptTlsState::Done { panic!("poll AcceptTls after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let listener_resource = table .get_mut::(self.rid) .ok_or_else(|| { @@ -318,22 +323,22 @@ impl Future for AcceptTls { let listener = &mut listener_resource.listener; - if self.state == AcceptTlsState::Eager { + if self.accept_state == AcceptTlsState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.state = AcceptTlsState::Pending; + self.accept_state = AcceptTlsState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; return Err(e); } } @@ -342,7 +347,7 @@ impl Future for AcceptTls { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -351,7 +356,7 @@ impl Future for AcceptTls { } Err(e) => { listener_resource.untrack_task(); - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; Err(e) } } @@ -364,21 +369,22 @@ struct AcceptTlsArgs { } fn op_accept_tls( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - - let op = accept_tls(rid) + let state1 = state.clone(); + let state2 = state.clone(); + let op = accept_tls(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; Ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let table = resources::lock_resource_table(); + let table = state1.lock_resource_table(); let resource = table .get::(rid) .ok_or_else(bad_resource) @@ -389,7 +395,11 @@ fn op_accept_tls( .accept(tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let rid = resources::add_server_tls_stream(tls_stream); + let mut table = state2.lock_resource_table(); + let rid = table.add( + "serverTlsStream", + Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), + ); Ok((rid, local_addr, remote_addr)) }) }) diff --git a/cli/resources.rs b/cli/resources.rs deleted file mode 100644 index db9b43eeb1..0000000000 --- a/cli/resources.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. - -// Think of Resources as File Descriptors. They are integers that are allocated -// by the privileged side of Deno to refer to various resources. The simplest -// example are standard file system files and stdio - but there will be other -// resources added in the future that might not correspond to operating system -// level File Descriptors. To avoid confusion we call them "resources" not "file -// descriptors". This module implements a global resource table. Ops (AKA -// handlers) look up resources by their integer id here. - -use crate::deno_error::bad_resource; -use crate::http_body::HttpBody; -use deno::ErrBox; -pub use deno::Resource; -pub use deno::ResourceId; -use deno::ResourceTable; - -use futures; -use futures::Future; -use futures::Poll; -use reqwest::r#async::Decoder as ReqwestDecoder; -use std; -use std::sync::Mutex; -use std::sync::MutexGuard; -use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; -use tokio_process; -use tokio_rustls::client::TlsStream as ClientTlsStream; -use tokio_rustls::server::TlsStream as ServerTlsStream; - -#[cfg(not(windows))] -use std::os::unix::io::FromRawFd; - -#[cfg(windows)] -use std::os::windows::io::FromRawHandle; - -#[cfg(windows)] -extern crate winapi; - -lazy_static! { - static ref RESOURCE_TABLE: Mutex = Mutex::new({ - let mut table = ResourceTable::default(); - - // TODO Load these lazily during lookup? - table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin()))); - - table.add("stdout", Box::new(CliResource::Stdout({ - #[cfg(not(windows))] - let stdout = unsafe { std::fs::File::from_raw_fd(1) }; - #[cfg(windows)] - let stdout = unsafe { - std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_OUTPUT_HANDLE)) - }; - tokio::fs::File::from_std(stdout) - }))); - - table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr()))); - table - }); -} - -// TODO: rename to `StreamResource` -pub enum CliResource { - Stdin(tokio::io::Stdin), - Stdout(tokio::fs::File), - Stderr(tokio::io::Stderr), - FsFile(tokio::fs::File), - TcpStream(tokio::net::TcpStream), - ServerTlsStream(Box>), - ClientTlsStream(Box>), - HttpBody(HttpBody), - ChildStdin(tokio_process::ChildStdin), - ChildStdout(tokio_process::ChildStdout), - ChildStderr(tokio_process::ChildStderr), -} - -impl Resource for CliResource {} - -pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { - RESOURCE_TABLE.lock().unwrap() -} - -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll; -} - -impl DenoAsyncRead for CliResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - let r = match self { - CliResource::FsFile(ref mut f) => f.poll_read(buf), - CliResource::Stdin(ref mut f) => f.poll_read(buf), - CliResource::TcpStream(ref mut f) => f.poll_read(buf), - CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - CliResource::HttpBody(ref mut f) => f.poll_read(buf), - CliResource::ChildStdout(ref mut f) => f.poll_read(buf), - CliResource::ChildStderr(ref mut f) => f.poll_read(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } -} - -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; -} - -impl DenoAsyncWrite for CliResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll { - let r = match self { - CliResource::FsFile(ref mut f) => f.poll_write(buf), - CliResource::Stdout(ref mut f) => f.poll_write(buf), - CliResource::Stderr(ref mut f) => f.poll_write(buf), - CliResource::TcpStream(ref mut f) => f.poll_write(buf), - CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - CliResource::ChildStdin(ref mut f) => f.poll_write(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } - - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { - unimplemented!() - } -} - -pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId { - let mut table = lock_resource_table(); - table.add("fsFile", Box::new(CliResource::FsFile(fs_file))) -} - -pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId { - let mut table = lock_resource_table(); - table.add("tcpStream", Box::new(CliResource::TcpStream(stream))) -} - -pub fn add_tls_stream(stream: ClientTlsStream) -> ResourceId { - let mut table = lock_resource_table(); - table.add( - "clientTlsStream", - Box::new(CliResource::ClientTlsStream(Box::new(stream))), - ) -} - -pub fn add_server_tls_stream(stream: ServerTlsStream) -> ResourceId { - let mut table = lock_resource_table(); - table.add( - "serverTlsStream", - Box::new(CliResource::ServerTlsStream(Box::new(stream))), - ) -} - -pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId { - let body = HttpBody::from(body); - let mut table = lock_resource_table(); - table.add("httpBody", Box::new(CliResource::HttpBody(body))) -} - -pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId { - let mut table = lock_resource_table(); - table.add("childStdin", Box::new(CliResource::ChildStdin(stdin))) -} - -pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId { - let mut table = lock_resource_table(); - table.add("childStdout", Box::new(CliResource::ChildStdout(stdout))) -} - -pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId { - let mut table = lock_resource_table(); - table.add("childStderr", Box::new(CliResource::ChildStderr(stderr))) -} - -pub struct CloneFileFuture { - pub rid: ResourceId, -} - -impl Future for CloneFileFuture { - type Item = tokio::fs::File; - type Error = ErrBox; - - fn poll(&mut self) -> Poll { - let mut table = lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - match repr { - CliResource::FsFile(ref mut file) => { - file.poll_try_clone().map_err(ErrBox::from) - } - _ => Err(bad_resource()), - } - } -} diff --git a/cli/state.rs b/cli/state.rs index edfac72c02..a5e9546b0d 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -5,6 +5,7 @@ use crate::global_timer::GlobalTimer; use crate::import_map::ImportMap; use crate::metrics::Metrics; use crate::ops::JsonOp; +use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; use crate::worker::Worker; use crate::worker::WorkerChannels; @@ -15,6 +16,7 @@ use deno::Loader; use deno::ModuleSpecifier; use deno::Op; use deno::PinnedBuf; +use deno::ResourceTable; use futures::Future; use rand::rngs::StdRng; use rand::SeedableRng; @@ -27,6 +29,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; +use std::sync::MutexGuard; use std::time::Instant; use tokio::sync::mpsc; @@ -52,6 +55,7 @@ pub struct State { pub start_time: Instant, pub seeded_rng: Option>, pub include_deno_namespace: bool, + pub resource_table: Mutex, } impl Clone for ThreadSafeState { @@ -68,6 +72,10 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { + pub fn lock_resource_table(&self) -> MutexGuard { + self.resource_table.lock().unwrap() + } + /// Wrap core `OpDispatcher` to collect metrics. pub fn core_op( &self, @@ -103,6 +111,21 @@ impl ThreadSafeState { } } + /// This is a special function that provides `state` argument to dispatcher. + pub fn stateful_minimal_op( + &self, + dispatcher: D, + ) -> impl Fn(i32, Option) -> Box + where + D: Fn(&ThreadSafeState, i32, Option) -> Box, + { + let state = self.clone(); + + move |rid: i32, zero_copy: Option| -> Box { + dispatcher(&state, rid, zero_copy) + } + } + /// This is a special function that provides `state` argument to dispatcher. /// /// NOTE: This only works with JSON dispatcher. @@ -220,6 +243,7 @@ impl ThreadSafeState { start_time: Instant::now(), seeded_rng, include_deno_namespace, + resource_table: Mutex::new(ResourceTable::default()), }; Ok(ThreadSafeState(Arc::new(state))) diff --git a/core/resources.rs b/core/resources.rs index da4fb6b078..216f5c8dfb 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -65,7 +65,7 @@ impl ResourceTable { } // close(2) is done by dropping the value. Therefore we just need to remove - // the resource from the RESOURCE_TABLE. + // the resource from the resource table. pub fn close(&mut self, rid: ResourceId) -> Option<()> { self.map.remove(&rid).map(|(_name, _resource)| ()) }