From fb5c31416d4b9e526ca0fcc134dc8f366e367012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Souto?= Date: Wed, 11 Mar 2020 22:19:24 +0000 Subject: [PATCH] Add waker to StreamResource to fix hang on close bugs (#4293) --- cli/js/tests/net_test.ts | 43 ++++++++++++++ cli/ops/fetch.rs | 6 +- cli/ops/fs.rs | 13 +++-- cli/ops/io.rs | 100 ++++++++++++++++++++++++++++----- cli/ops/net.rs | 26 +++++---- cli/ops/process.rs | 20 ++++--- cli/ops/tls.rs | 10 +++- cli/ops/tty.rs | 73 ++++++++++++------------ cli/tests/integration_tests.rs | 6 +- 9 files changed, 219 insertions(+), 78 deletions(-) diff --git a/cli/js/tests/net_test.ts b/cli/js/tests/net_test.ts index 1a58c35311..fccd62f389 100644 --- a/cli/js/tests/net_test.ts +++ b/cli/js/tests/net_test.ts @@ -336,3 +336,46 @@ unitTest( conn.close(); } ); + +unitTest( + { + perms: { net: true } + }, + async function netHangsOnClose() { + let acceptedConn: Deno.Conn; + const resolvable = createResolvable(); + + async function iteratorReq(listener: Deno.Listener): Promise { + const p = new Uint8Array(10); + const conn = await listener.accept(); + acceptedConn = conn; + + try { + while (true) { + const nread = await conn.read(p); + if (nread === Deno.EOF) { + break; + } + await conn.write(new Uint8Array([1, 2, 3])); + } + } catch (err) { + assert(!!err); + assert(err instanceof Deno.errors.BadResource); + } + + resolvable.resolve(); + } + + const addr = { hostname: "127.0.0.1", port: 4500 }; + const listener = Deno.listen(addr); + iteratorReq(listener); + const conn = await Deno.connect(addr); + await conn.write(new Uint8Array([1, 2, 3, 4])); + const buf = new Uint8Array(10); + await conn.read(buf); + conn!.close(); + acceptedConn!.close(); + listener.close(); + await resolvable; + } +); diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 9f36ad5fd1..d222787a62 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; +use super::io::{StreamResource, StreamResourceHolder}; use crate::http_util::{create_http_client, HttpBody}; use crate::op_error::OpError; use crate::state::State; @@ -80,7 +80,9 @@ pub fn op_fetch( let mut state = state_.borrow_mut(); let rid = state.resource_table.add( "httpBody", - Box::new(StreamResource::HttpBody(Box::new(body))), + Box::new(StreamResourceHolder::new(StreamResource::HttpBody( + Box::new(body), + ))), ); let json_res = json!({ diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs index 4ef59e8e70..01ce548bab 100644 --- a/cli/ops/fs.rs +++ b/cli/ops/fs.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Some deserializer fields are only used on Unix and Windows build fails without it use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value}; -use super::io::{FileMetadata, StreamResource}; +use super::io::{FileMetadata, StreamResource, StreamResourceHolder}; use crate::fs as deno_fs; use crate::op_error::OpError; use crate::ops::dispatch_json::JsonResult; @@ -153,7 +153,10 @@ fn op_open( let mut state = state_.borrow_mut(); let rid = state.resource_table.add( "fsFile", - Box::new(StreamResource::FsFile(fs_file, FileMetadata::default())), + Box::new(StreamResourceHolder::new(StreamResource::FsFile( + fs_file, + FileMetadata::default(), + ))), ); Ok(json!(rid)) }; @@ -198,12 +201,12 @@ fn op_seek( }; let state = state.borrow(); - let resource = state + let resource_holder = state .resource_table - .get::(rid) + .get::(rid) .ok_or_else(OpError::bad_resource_id)?; - let tokio_file = match resource { + let tokio_file = match resource_holder.resource { StreamResource::FsFile(ref file, _) => file, _ => return Err(OpError::bad_resource_id()), }; diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 2562b4c559..b7f67cea42 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -7,7 +7,9 @@ use deno_core::*; use futures::future::poll_fn; use futures::future::FutureExt; use futures::ready; +use std::collections::HashMap; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Context; use std::task::Poll; use tokio::io::{AsyncRead, AsyncWrite}; @@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) { ); } -pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { - let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default()); - let stdout = StreamResource::Stdout({ +pub fn get_stdio() -> ( + StreamResourceHolder, + StreamResourceHolder, + StreamResourceHolder, +) { + let stdin = StreamResourceHolder::new(StreamResource::Stdin( + tokio::io::stdin(), + TTYMetadata::default(), + )); + let stdout = StreamResourceHolder::new(StreamResource::Stdout({ let stdout = STDOUT_HANDLE .try_clone() .expect("Unable to clone stdout handle"); tokio::fs::File::from_std(stdout) - }); - let stderr = StreamResource::Stderr(tokio::io::stderr()); + })); + let stderr = + StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr())); (stdin, stdout, stderr) } @@ -87,6 +97,51 @@ pub struct FileMetadata { pub tty: TTYMetadata, } +pub struct StreamResourceHolder { + pub resource: StreamResource, + waker: HashMap, + waker_counter: AtomicUsize, +} + +impl StreamResourceHolder { + pub fn new(resource: StreamResource) -> StreamResourceHolder { + StreamResourceHolder { + resource, + // Atleast one task is expecter for the resource + waker: HashMap::with_capacity(1), + // Tracks wakers Ids + waker_counter: AtomicUsize::new(0), + } + } +} + +impl Drop for StreamResourceHolder { + fn drop(&mut self) { + self.wake_tasks(); + } +} + +impl StreamResourceHolder { + pub fn track_task(&mut self, cx: &Context) -> Result { + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + // Its OK if it overflows + let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); + self.waker.insert(task_waker_id, waker); + Ok(task_waker_id) + } + + pub fn wake_tasks(&mut self) { + for waker in self.waker.values() { + waker.wake(); + } + } + + pub fn untrack_task(&mut self, task_waker_id: usize) { + self.waker.remove(&task_waker_id); + } +} + pub enum StreamResource { Stdin(tokio::io::Stdin, TTYMetadata), Stdout(tokio::fs::File), @@ -150,10 +205,27 @@ pub fn op_read( poll_fn(move |cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::(rid as u32) + let resource_holder = resource_table + .get_mut::(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?; + + let mut task_tracker_id: Option = None; + let nread = match resource_holder + .resource + .poll_read(cx, &mut buf.as_mut()[..]) + .map_err(OpError::from) + { + Poll::Ready(t) => { + if let Some(id) = task_tracker_id { + resource_holder.untrack_task(id); + } + t + } + Poll::Pending => { + task_tracker_id.replace(resource_holder.track_task(cx)?); + return Poll::Pending; + } + }?; Poll::Ready(Ok(nread as i32)) }) .boxed_local() @@ -233,10 +305,10 @@ pub fn op_write( async move { let nwritten = poll_fn(|cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::(rid as u32) + let resource_holder = resource_table + .get_mut::(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - resource.poll_write(cx, &buf.as_ref()[..]) + resource_holder.resource.poll_write(cx, &buf.as_ref()[..]) }) .await?; @@ -246,10 +318,10 @@ pub fn op_write( // https://github.com/denoland/deno/issues/3565 poll_fn(|cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::(rid as u32) + let resource_holder = resource_table + .get_mut::(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - resource.poll_flush(cx) + resource_holder.resource.poll_flush(cx) }) .await?; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index d8dd9b3c9b..3987e94c1c 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; +use super::io::{StreamResource, StreamResourceHolder}; use crate::op_error::OpError; use crate::resolve_addr::resolve_addr; use crate::state::State; @@ -78,9 +78,12 @@ fn op_accept( let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; let mut state = state_.borrow_mut(); - let rid = state - .resource_table - .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream( + tcp_stream, + ))), + ); Ok(json!({ "rid": rid, "localAddr": { @@ -207,9 +210,12 @@ fn op_connect( let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; let mut state = state_.borrow_mut(); - let rid = state - .resource_table - .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream( + tcp_stream, + ))), + ); Ok(json!({ "rid": rid, "localAddr": { @@ -251,11 +257,11 @@ fn op_shutdown( }; let mut state = state.borrow_mut(); - let resource = state + let resource_holder = state .resource_table - .get_mut::(rid) + .get_mut::(rid) .ok_or_else(OpError::bad_resource_id)?; - match resource { + match resource_holder.resource { StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?; } diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 743ffa22b4..55080fc2d2 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; +use super::io::{StreamResource, StreamResourceHolder}; use crate::op_error::OpError; use crate::signal::kill; use crate::state::State; @@ -24,11 +24,11 @@ pub fn init(i: &mut Isolate, s: &State) { fn clone_file(rid: u32, state: &State) -> Result { let mut state = state.borrow_mut(); - let repr = state + let repr_holder = state .resource_table - .get_mut::(rid) + .get_mut::(rid) .ok_or_else(OpError::bad_resource_id)?; - let file = match repr { + let file = match repr_holder.resource { StreamResource::FsFile(ref mut file, _) => file, _ => return Err(OpError::bad_resource_id()), }; @@ -127,7 +127,9 @@ fn op_run( Some(child_stdin) => { let rid = table.add( "childStdin", - Box::new(StreamResource::ChildStdin(child_stdin)), + Box::new(StreamResourceHolder::new(StreamResource::ChildStdin( + child_stdin, + ))), ); Some(rid) } @@ -138,7 +140,9 @@ fn op_run( Some(child_stdout) => { let rid = table.add( "childStdout", - Box::new(StreamResource::ChildStdout(child_stdout)), + Box::new(StreamResourceHolder::new(StreamResource::ChildStdout( + child_stdout, + ))), ); Some(rid) } @@ -149,7 +153,9 @@ fn op_run( Some(child_stderr) => { let rid = table.add( "childStderr", - Box::new(StreamResource::ChildStderr(child_stderr)), + Box::new(StreamResourceHolder::new(StreamResource::ChildStderr( + child_stderr, + ))), ); Some(rid) } diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index e64bc47457..642284ea26 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; +use super::io::{StreamResource, StreamResourceHolder}; use crate::op_error::OpError; use crate::resolve_addr::resolve_addr; use crate::state::State; @@ -85,7 +85,9 @@ pub fn op_connect_tls( let mut state = state_.borrow_mut(); let rid = state.resource_table.add( "clientTlsStream", - Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( + Box::new(tls_stream), + ))), ); Ok(json!({ "rid": rid, @@ -318,7 +320,9 @@ fn op_accept_tls( let mut state = state.borrow_mut(); state.resource_table.add( "serverTlsStream", - Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), + Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream( + Box::new(tls_stream), + ))), ) }; Ok(json!({ diff --git a/cli/ops/tty.rs b/cli/ops/tty.rs index 69ac66688c..c44ab946f7 100644 --- a/cli/ops/tty.rs +++ b/cli/ops/tty.rs @@ -1,5 +1,5 @@ use super::dispatch_json::JsonOp; -use super::io::StreamResource; +use super::io::{StreamResource, StreamResourceHolder}; use crate::op_error::OpError; use crate::ops::json_op; use crate::state::State; @@ -66,13 +66,13 @@ pub fn op_set_raw( use winapi::um::{consoleapi, handleapi}; let state = state_.borrow_mut(); - let resource = state.resource_table.get::(rid); - if resource.is_none() { + let resource_holder = state.resource_table.get::(rid); + if resource_holder.is_none() { return Err(OpError::bad_resource_id()); } // For now, only stdin. - let handle = match resource.unwrap() { + let handle = match &resource_holder.unwrap().resource { StreamResource::Stdin(_, _) => std::io::stdin().as_raw_handle(), StreamResource::FsFile(f, _) => { let tokio_file = futures::executor::block_on(f.try_clone())?; @@ -111,25 +111,27 @@ pub fn op_set_raw( use std::os::unix::io::AsRawFd; let mut state = state_.borrow_mut(); - let resource = state.resource_table.get_mut::(rid); - if resource.is_none() { + let resource_holder = + state.resource_table.get_mut::(rid); + if resource_holder.is_none() { return Err(OpError::bad_resource_id()); } if is_raw { - let (raw_fd, maybe_tty_mode) = match resource.unwrap() { - StreamResource::Stdin(_, ref mut metadata) => { - (std::io::stdin().as_raw_fd(), &mut metadata.mode) - } - StreamResource::FsFile(f, ref mut metadata) => { - let tokio_file = futures::executor::block_on(f.try_clone())?; - let std_file = futures::executor::block_on(tokio_file.into_std()); - (std_file.as_raw_fd(), &mut metadata.tty.mode) - } - _ => { - return Err(OpError::other("Not supported".to_owned())); - } - }; + let (raw_fd, maybe_tty_mode) = + match &mut resource_holder.unwrap().resource { + StreamResource::Stdin(_, ref mut metadata) => { + (std::io::stdin().as_raw_fd(), &mut metadata.mode) + } + StreamResource::FsFile(f, ref mut metadata) => { + let tokio_file = futures::executor::block_on(f.try_clone())?; + let std_file = futures::executor::block_on(tokio_file.into_std()); + (std_file.as_raw_fd(), &mut metadata.tty.mode) + } + _ => { + return Err(OpError::other("Not supported".to_owned())); + } + }; if maybe_tty_mode.is_some() { // Already raw. Skip. @@ -159,19 +161,20 @@ pub fn op_set_raw( Ok(JsonOp::Sync(json!({}))) } else { // Try restore saved mode. - let (raw_fd, maybe_tty_mode) = match resource.unwrap() { - StreamResource::Stdin(_, ref mut metadata) => { - (std::io::stdin().as_raw_fd(), &mut metadata.mode) - } - StreamResource::FsFile(f, ref mut metadata) => { - let tokio_file = futures::executor::block_on(f.try_clone())?; - let std_file = futures::executor::block_on(tokio_file.into_std()); - (std_file.as_raw_fd(), &mut metadata.tty.mode) - } - _ => { - return Err(OpError::other("Not supported".to_owned())); - } - }; + let (raw_fd, maybe_tty_mode) = + match &mut resource_holder.unwrap().resource { + StreamResource::Stdin(_, ref mut metadata) => { + (std::io::stdin().as_raw_fd(), &mut metadata.mode) + } + StreamResource::FsFile(f, ref mut metadata) => { + let tokio_file = futures::executor::block_on(f.try_clone())?; + let std_file = futures::executor::block_on(tokio_file.into_std()); + (std_file.as_raw_fd(), &mut metadata.tty.mode) + } + _ => { + return Err(OpError::other("Not supported".to_owned())); + } + }; if let Some(mode) = maybe_tty_mode.take() { termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; @@ -200,12 +203,12 @@ pub fn op_isatty( return Err(OpError::bad_resource_id()); } - let resource = state.resource_table.get::(rid); - if resource.is_none() { + let resource_holder = state.resource_table.get::(rid); + if resource_holder.is_none() { return Ok(JsonOp::Sync(json!(false))); } - match resource.unwrap() { + match &resource_holder.unwrap().resource { StreamResource::Stdin(_, _) => { Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdin)))) } diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index d5a3fb8c82..ce08c6b61a 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -1785,7 +1785,8 @@ fn test_permissions_net_fetch_allow_localhost_4545() { true, "run --allow-net=localhost:4545 complex_permissions_test.ts netFetch http://localhost:4545/", None, - None,true, + None, + true, ); assert!(!err.contains(util::PERMISSION_DENIED_PATTERN)); } @@ -1918,7 +1919,7 @@ fn test_permissions_net_listen_allow_localhost() { "run --allow-net=localhost complex_permissions_test.ts netListen localhost:4600", None, None, - false, + false, ); assert!(!err.contains(util::PERMISSION_DENIED_PATTERN)); } @@ -1932,6 +1933,7 @@ mod util { use std::process::Command; use std::process::Output; use std::process::Stdio; + use tempfile::TempDir; pub const PERMISSION_VARIANTS: [&str; 5] =