From 431e455642cee8f7258814d6bb9b426a99d1256b Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Tue, 15 Jan 2019 17:36:51 -0800 Subject: [PATCH] Kill all pending accepts when TCP listener is closed (#1517) --- js/net_test.ts | 36 ++++++++++++++++++++++++++++++ src/resources.rs | 57 ++++++++++++++++++++++++++++++++++++++++++----- src/tokio_util.rs | 20 ++++++++++++++++- 3 files changed, 106 insertions(+), 7 deletions(-) diff --git a/js/net_test.ts b/js/net_test.ts index 6ba36547f2..be415f19c7 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -8,6 +8,42 @@ testPerm({ net: true }, function netListenClose() { listener.close(); }); +testPerm({ net: true }, async function netCloseWhileAccept() { + const listener = deno.listen("tcp", ":4501"); + const p = listener.accept(); + listener.close(); + let err; + try { + await p; + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.Other); + assertEqual(err.message, "Listener has been closed"); +}); + +testPerm({ net: true }, async function netConcurrentAccept() { + const listener = deno.listen("tcp", ":4502"); + let err; + // Consume this accept error + // (since it would still be waiting when listener.close is called) + listener.accept().catch(e => { + assertEqual(e.kind, deno.ErrorKind.Other); + assertEqual(e.message, "Listener has been closed"); + }); + const p1 = listener.accept(); + try { + await p1; + } catch (e) { + err = e; + } + assert(!!err); + assertEqual(err.kind, deno.ErrorKind.Other); + assertEqual(err.message, "Another accept task is ongoing"); + listener.close(); +}); + testPerm({ net: true }, async function netDialListen() { const listener = deno.listen("tcp", ":4500"); listener.accept().then(async conn => { diff --git a/src/resources.rs b/src/resources.rs index 55e1a9f641..1f5a121dea 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -87,7 +87,12 @@ enum Repr { Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), FsFile(tokio::fs::File), - TcpListener(tokio::net::TcpListener), + // Since TcpListener might be closed while there is a pending accept task, + // we need to track the task so that when the listener is closed, + // this pending task could be notified and die. + // Currently TcpListener itself does not take care of this issue. + // See: https://github.com/tokio-rs/tokio/issues/846 + TcpListener(tokio::net::TcpListener, Option), TcpStream(tokio::net::TcpStream), HttpBody(HttpBody), Repl(Repl), @@ -132,7 +137,7 @@ fn inspect_repr(repr: &Repr) -> String { Repr::Stdout(_) => "stdout", Repr::Stderr(_) => "stderr", Repr::FsFile(_) => "fsFile", - Repr::TcpListener(_) => "tcpListener", + Repr::TcpListener(_, _) => "tcpListener", Repr::TcpStream(_) => "tcpStream", Repr::HttpBody(_) => "httpBody", Repr::Repl(_) => "repl", @@ -159,20 +164,60 @@ impl Resource { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&self.rid); match maybe_repr { - None => panic!("bad rid"), + None => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + )), Some(repr) => match repr { - Repr::TcpListener(ref mut s) => s.poll_accept(), + Repr::TcpListener(ref mut s, _) => s.poll_accept(), _ => panic!("Cannot accept"), }, } } + /// Track the current task (for TcpListener resource). + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), std::io::Error> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + // Only track if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if t.is_some() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + )); + } + t.replace(futures::task::current()); + } + Ok(()) + } + + /// Stop tracking a task (for TcpListener resource). + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + let mut table = RESOURCE_TABLE.lock().unwrap(); + // Only untrack if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + assert!(t.is_some()); + t.take(); + } + } + // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { let mut table = RESOURCE_TABLE.lock().unwrap(); let r = table.remove(&self.rid); assert!(r.is_some()); + // If TcpListener, we must kill all pending accepts! + if let Repr::TcpListener(_, Some(t)) = r.unwrap() { + // Call notify on the tracked task, so that they would error out. + t.notify(); + } } pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> { @@ -264,7 +309,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource { let rid = new_rid(); let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TcpListener(listener)); + let r = tg.insert(rid, Repr::TcpListener(listener, None)); assert!(r.is_none()); Resource { rid } } @@ -515,7 +560,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept { match maybe_repr { None => panic!("bad rid"), Some(repr) => match repr { - Repr::TcpListener(ref mut tcp_listener) => { + Repr::TcpListener(ref mut tcp_listener, _) => { eager::tcp_accept(tcp_listener, resource) } _ => Either::A(tokio_util::accept(resource)), diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 32542aa43a..2eb0211db9 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -63,7 +63,25 @@ impl Future for Accept { fn poll(&mut self) -> Poll { let (stream, addr) = match self.state { - AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + AcceptState::Pending(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => { + r.untrack_task(); + t + } + Ok(futures::prelude::Async::NotReady) => { + // Would error out if another accept task is being tracked. + r.track_task()?; + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => { + r.untrack_task(); + return Err(From::from(e)); + } + }, AcceptState::Empty => panic!("poll Accept after it's done"), };