diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 1c5aa6edd4..d603b746bf 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,16 +1,20 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; +use crate::resources::CoreResource; use crate::resources::Resource; use crate::state::ThreadSafeState; -use crate::tokio_util; use deno::*; +use futures::Async; use futures::Future; +use futures::Poll; use std; use std::convert::From; use std::net::Shutdown; +use std::net::SocketAddr; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -22,6 +26,90 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen)))); } +#[derive(Debug, PartialEq)] +enum AcceptState { + Eager, + Pending, + Done, +} + +/// Simply accepts a connection. +pub fn accept(rid: ResourceId) -> Accept { + Accept { + state: AcceptState::Eager, + rid, + } +} + +/// A future representing state of accepting a TCP connection. +#[derive(Debug)] +pub struct Accept { + state: AcceptState, + rid: ResourceId, +} + +impl Future for Accept { + type Item = (TcpStream, SocketAddr); + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + if self.state == AcceptState::Done { + panic!("poll Accept after it's done"); + } + + let mut table = resources::lock_resource_table(); + let listener_resource = table + .get_mut::(self.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + ); + ErrBox::from(e) + })?; + + let listener = &mut listener_resource.listener; + + if self.state == AcceptState::Eager { + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + self.state = AcceptState::Done; + return Ok((stream, addr).into()); + } + Ok(Async::NotReady) => { + self.state = AcceptState::Pending; + return Ok(Async::NotReady); + } + Err(e) => { + self.state = AcceptState::Done; + return Err(e); + } + } + } + + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + listener_resource.untrack_task(); + self.state = AcceptState::Done; + Ok((stream, addr).into()) + } + Ok(Async::NotReady) => { + listener_resource.track_task()?; + Ok(Async::NotReady) + } + Err(e) => { + listener_resource.untrack_task(); + self.state = AcceptState::Done; + Err(e) + } + } + } +} + #[derive(Deserialize)] struct AcceptArgs { rid: i32, @@ -33,10 +121,14 @@ fn op_accept( _zero_copy: Option, ) -> Result { let args: AcceptArgs = serde_json::from_value(args)?; - let server_rid = args.rid as u32; + let rid = args.rid as u32; - let server_resource = resources::lookup(server_rid)?; - let op = tokio_util::accept(server_resource) + let table = resources::lock_resource_table(); + table + .get::(rid) + .ok_or_else(bad_resource)?; + + let op = accept(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; @@ -129,6 +221,59 @@ struct ListenArgs { port: u16, } +#[allow(dead_code)] +struct TcpListenerResource { + listener: tokio::net::TcpListener, + task: Option, + local_addr: SocketAddr, +} + +impl CoreResource for TcpListenerResource {} + +impl Drop for TcpListenerResource { + fn drop(&mut self) { + self.notify_task(); + } +} + +impl TcpListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), ErrBox> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.task.is_some() { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + ); + return Err(ErrBox::from(e)); + } + + self.task.replace(futures::task::current()); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn notify_task(&mut self) { + if let Some(task) = self.task.take() { + task.notify(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + if self.task.is_some() { + self.task.take(); + } + } +} + fn op_listen( state: &ThreadSafeState, args: Value, @@ -142,10 +287,17 @@ fn op_listen( let addr = resolve_addr(&args.hostname, args.port).wait()?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let resource = resources::add_tcp_listener(listener); + let local_addr_str = local_addr.to_string(); + let listener_resource = TcpListenerResource { + listener, + task: None, + local_addr, + }; + let mut table = resources::lock_resource_table(); + let rid = table.add("tcpListener", Box::new(listener_resource)); Ok(JsonOp::Sync(json!({ - "rid": resource.rid, - "localAddr": local_addr.to_string() + "rid": rid, + "localAddr": local_addr_str, }))) } diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 569b5a1f67..ee08f357a7 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,18 +1,22 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; +use crate::resources::CoreResource; use crate::state::ThreadSafeState; -use crate::tokio_util; use deno::*; +use futures::Async; use futures::Future; +use futures::Poll; use std; use std::convert::From; use std::fs::File; use std::io::BufReader; +use std::net::SocketAddr; use std::sync::Arc; use tokio; use tokio::net::TcpListener; @@ -167,6 +171,60 @@ fn load_keys(path: &str) -> Result, ErrBox> { Ok(keys) } +#[allow(dead_code)] +pub struct TlsListenerResource { + listener: tokio::net::TcpListener, + tls_acceptor: TlsAcceptor, + task: Option, + local_addr: SocketAddr, +} + +impl CoreResource for TlsListenerResource {} + +impl Drop for TlsListenerResource { + fn drop(&mut self) { + self.notify_task(); + } +} + +impl TlsListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), ErrBox> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.task.is_some() { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + ); + return Err(ErrBox::from(e)); + } + + self.task.replace(futures::task::current()); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn notify_task(&mut self) { + if let Some(task) = self.task.take() { + task.notify(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + if self.task.is_some() { + self.task.take(); + } + } +} + #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ListenTlsArgs { @@ -196,18 +254,110 @@ fn op_listen_tls( config .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) .expect("invalid key or certificate"); - let acceptor = TlsAcceptor::from(Arc::new(config)); + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); let addr = resolve_addr(&args.hostname, args.port).wait()?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let resource = resources::add_tls_listener(listener, acceptor); + let local_addr_str = local_addr.to_string(); + let tls_listener_resource = TlsListenerResource { + listener, + tls_acceptor, + task: None, + local_addr, + }; + let mut table = resources::lock_resource_table(); + let rid = table.add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ - "rid": resource.rid, - "localAddr": local_addr.to_string() + "rid": rid, + "localAddr": local_addr_str }))) } +#[derive(Debug, PartialEq)] +enum AcceptTlsState { + Eager, + Pending, + Done, +} + +/// Simply accepts a TLS connection. +pub fn accept_tls(rid: ResourceId) -> AcceptTls { + AcceptTls { + state: AcceptTlsState::Eager, + rid, + } +} + +/// A future representing state of accepting a TLS connection. +#[derive(Debug)] +pub struct AcceptTls { + state: AcceptTlsState, + rid: ResourceId, +} + +impl Future for AcceptTls { + type Item = (TcpStream, SocketAddr); + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + if self.state == AcceptTlsState::Done { + panic!("poll AcceptTls after it's done"); + } + + let mut table = resources::lock_resource_table(); + let listener_resource = table + .get_mut::(self.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + ); + ErrBox::from(e) + })?; + + let listener = &mut listener_resource.listener; + + if self.state == AcceptTlsState::Eager { + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + self.state = AcceptTlsState::Done; + return Ok((stream, addr).into()); + } + Ok(Async::NotReady) => { + self.state = AcceptTlsState::Pending; + return Ok(Async::NotReady); + } + Err(e) => { + self.state = AcceptTlsState::Done; + return Err(e); + } + } + } + + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + listener_resource.untrack_task(); + self.state = AcceptTlsState::Done; + Ok((stream, addr).into()) + } + Ok(Async::NotReady) => { + listener_resource.track_task()?; + Ok(Async::NotReady) + } + Err(e) => { + listener_resource.untrack_task(); + self.state = AcceptTlsState::Done; + Err(e) + } + } + } +} + #[derive(Deserialize)] struct AcceptTlsArgs { rid: i32, @@ -219,26 +369,31 @@ fn op_accept_tls( _zero_copy: Option, ) -> Result { let args: AcceptTlsArgs = serde_json::from_value(args)?; - let server_rid = args.rid as u32; + let rid = args.rid as u32; - let server_resource = resources::lookup(server_rid)?; - let op = tokio_util::accept(server_resource) + let op = accept_tls(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; Ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let mut server_resource = resources::lookup(server_rid).unwrap(); - server_resource - .poll_accept_tls(tcp_stream) + let table = resources::lock_resource_table(); + let resource = table + .get::(rid) + .ok_or_else(bad_resource) + .expect("Can't find tls listener"); + + resource + .tls_acceptor + .accept(tcp_stream) + .map_err(ErrBox::from) .and_then(move |tls_stream| { let tls_stream_resource = resources::add_server_tls_stream(tls_stream); Ok((tls_stream_resource, local_addr, remote_addr)) }) }) - .map_err(ErrBox::from) .and_then(move |(tls_stream_resource, local_addr, remote_addr)| { futures::future::ok(json!({ "rid": tls_stream_resource.rid, diff --git a/cli/resources.rs b/cli/resources.rs index 2910d16b6c..3b070e06b6 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -21,8 +21,8 @@ use futures::Future; use futures::Poll; use reqwest::r#async::Decoder as ReqwestDecoder; use std; -use std::io::{Error, Read, Seek, SeekFrom, Write}; -use std::net::{Shutdown, SocketAddr}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::net::Shutdown; use std::process::ExitStatus; use std::sync::Mutex; use std::sync::MutexGuard; @@ -32,12 +32,10 @@ use tokio::net::TcpStream; use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; -use tokio_rustls::TlsAcceptor; #[cfg(not(windows))] use std::os::unix::io::FromRawFd; -use futures::future::Either; #[cfg(windows)] use std::os::windows::io::FromRawHandle; @@ -73,17 +71,6 @@ enum CliResource { Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), FsFile(tokio::fs::File), - // Since TcpListener might be closed while there is a pending accept task, - // we need to track the task so that when the listener is closed, - // this pending task could be notified and die. - // Currently TcpListener itself does not take care of this issue. - // See: https://github.com/tokio-rs/tokio/issues/846 - TcpListener(tokio::net::TcpListener, Option), - TlsListener( - tokio::net::TcpListener, - TlsAcceptor, - Option, - ), TcpStream(tokio::net::TcpStream), ServerTlsStream(Box>), ClientTlsStream(Box>), @@ -97,22 +84,7 @@ enum CliResource { ChildStderr(tokio_process::ChildStderr), } -impl CoreResource for CliResource { - // TODO(ry) These task notifications are hacks to workaround various behaviors - // in Tokio. They should not influence the overall design of Deno. The - // CoreResource::close should be removed in favor of the drop trait. - fn close(&self) { - match self { - CliResource::TcpListener(_, Some(t)) => { - t.notify(); - } - CliResource::TlsListener(_, _, Some(t)) => { - t.notify(); - } - _ => {} - } - } -} +impl CoreResource for CliResource {} pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { RESOURCE_TABLE.lock().unwrap() @@ -126,78 +98,6 @@ pub struct Resource { } impl Resource { - // TODO Should it return a Resource instead of net::TcpStream? - pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), Error> { - let mut table = lock_resource_table(); - match table.get_mut::(self.rid) { - None => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Listener has been closed", - )), - Some(repr) => match repr { - CliResource::TcpListener(ref mut s, _) => s.poll_accept(), - CliResource::TlsListener(ref mut s, _, _) => s.poll_accept(), - _ => panic!("Cannot accept"), - }, - } - } - - pub fn poll_accept_tls( - &mut self, - tcp_stream: TcpStream, - ) -> impl Future, Error = Error> { - let mut table = lock_resource_table(); - match table.get_mut::(self.rid) { - None => Either::A(futures::future::err(std::io::Error::new( - std::io::ErrorKind::Other, - "Listener has been closed", - ))), - Some(repr) => match repr { - CliResource::TlsListener(_, ref mut acceptor, _) => { - Either::B(acceptor.accept(tcp_stream)) - } - _ => panic!("Cannot accept"), - }, - } - } - - /// Track the current task (for TcpListener resource). - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), std::io::Error> { - let mut table = lock_resource_table(); - // Only track if is TcpListener. - if let Some(CliResource::TcpListener(_, t)) = - table.get_mut::(self.rid) - { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if t.is_some() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Another accept task is ongoing", - )); - } - t.replace(futures::task::current()); - } - Ok(()) - } - - /// Stop tracking a task (for TcpListener resource). - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - let mut table = lock_resource_table(); - // Only untrack if is TcpListener. - if let Some(CliResource::TcpListener(_, t)) = - table.get_mut::(self.rid) - { - if t.is_some() { - t.take(); - } - } - } - // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { @@ -307,27 +207,6 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { Resource { rid } } -pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource { - let mut table = lock_resource_table(); - let rid = table.add( - "tcpListener", - Box::new(CliResource::TcpListener(listener, None)), - ); - Resource { rid } -} - -pub fn add_tls_listener( - listener: tokio::net::TcpListener, - acceptor: TlsAcceptor, -) -> Resource { - let mut table = lock_resource_table(); - let rid = table.add( - "tlsListener", - Box::new(CliResource::TlsListener(listener, acceptor, None)), - ); - Resource { rid } -} - pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource { let mut table = lock_resource_table(); let rid = table.add("tcpStream", Box::new(CliResource::TcpStream(stream))); diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 1341c657a7..c86748b550 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -1,15 +1,10 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::resources::Resource; use deno::ErrBox; use futures; use futures::Future; use futures::Poll; -use std::io; -use std::mem; -use std::net::SocketAddr; use std::ops::FnOnce; use tokio; -use tokio::net::TcpStream; use tokio::runtime; pub fn create_threadpool_runtime( @@ -77,74 +72,6 @@ where tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); } -#[derive(Debug)] -enum AcceptState { - Eager(Resource), - Pending(Resource), - Empty, -} - -/// Simply accepts a connection. -pub fn accept(r: Resource) -> Accept { - Accept { - state: AcceptState::Eager(r), - } -} - -/// A future which can be used to easily read available number of bytes to fill -/// a buffer. -/// -/// Created by the [`read`] function. -#[derive(Debug)] -pub struct Accept { - state: AcceptState, -} - -impl Future for Accept { - type Item = (TcpStream, SocketAddr); - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let (stream, addr) = match self.state { - // Similar to try_ready!, but also track/untrack accept task - // in TcpListener resource. - // In this way, when the listener is closed, the task can be - // notified to error out (instead of stuck forever). - AcceptState::Eager(ref mut r) => match r.poll_accept() { - Ok(futures::prelude::Async::Ready(t)) => t, - Ok(futures::prelude::Async::NotReady) => { - self.state = AcceptState::Pending(r.to_owned()); - return Ok(futures::prelude::Async::NotReady); - } - Err(e) => { - return Err(e); - } - }, - AcceptState::Pending(ref mut r) => match r.poll_accept() { - Ok(futures::prelude::Async::Ready(t)) => { - r.untrack_task(); - t - } - Ok(futures::prelude::Async::NotReady) => { - // Would error out if another accept task is being tracked. - r.track_task()?; - return Ok(futures::prelude::Async::NotReady); - } - Err(e) => { - r.untrack_task(); - return Err(e); - } - }, - AcceptState::Empty => panic!("poll Accept after it's done"), - }; - - match mem::replace(&mut self.state, AcceptState::Empty) { - AcceptState::Empty => panic!("invalid internal state"), - _ => Ok((stream, addr).into()), - } - } -} - /// `futures::future::poll_fn` only support `F: FnMut()->Poll` /// However, we require that `F: FnOnce()->Poll`. /// Therefore, we created our version of `poll_fn`. diff --git a/core/resources.rs b/core/resources.rs index 1ba061d0b9..a66fb91207 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -69,18 +69,14 @@ impl ResourceTable { // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&mut self, rid: ResourceId) -> Option<()> { - if let Some((_name, resource)) = self.map.remove(&rid) { - resource.close(); - return Some(()); - } - None + self.map.remove(&rid).map(|(_name, _resource)| ()) } } /// Abstract type representing resource in Deno. -pub trait Resource: Downcast + Any + Send { - /// Method that allows to cleanup resource. - // TODO(ry) remove this method. Resources should rely on drop trait instead. - fn close(&self) {} -} +/// +/// The only thing it does is implementing `Downcast` trait +/// that allows to cast resource to concrete type in `TableResource::get` +/// and `TableResource::get_mut` methods. +pub trait Resource: Downcast + Any + Send {} impl_downcast!(Resource);