mirror of
https://github.com/denoland/deno.git
synced 2025-01-03 04:48:52 -05:00
Kill all pending accepts when TCP listener is closed (#1517)
This commit is contained in:
parent
c870cf4082
commit
431e455642
3 changed files with 106 additions and 7 deletions
|
@ -8,6 +8,42 @@ testPerm({ net: true }, function netListenClose() {
|
||||||
listener.close();
|
listener.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testPerm({ net: true }, async function netCloseWhileAccept() {
|
||||||
|
const listener = deno.listen("tcp", ":4501");
|
||||||
|
const p = listener.accept();
|
||||||
|
listener.close();
|
||||||
|
let err;
|
||||||
|
try {
|
||||||
|
await p;
|
||||||
|
} catch (e) {
|
||||||
|
err = e;
|
||||||
|
}
|
||||||
|
assert(!!err);
|
||||||
|
assertEqual(err.kind, deno.ErrorKind.Other);
|
||||||
|
assertEqual(err.message, "Listener has been closed");
|
||||||
|
});
|
||||||
|
|
||||||
|
testPerm({ net: true }, async function netConcurrentAccept() {
|
||||||
|
const listener = deno.listen("tcp", ":4502");
|
||||||
|
let err;
|
||||||
|
// Consume this accept error
|
||||||
|
// (since it would still be waiting when listener.close is called)
|
||||||
|
listener.accept().catch(e => {
|
||||||
|
assertEqual(e.kind, deno.ErrorKind.Other);
|
||||||
|
assertEqual(e.message, "Listener has been closed");
|
||||||
|
});
|
||||||
|
const p1 = listener.accept();
|
||||||
|
try {
|
||||||
|
await p1;
|
||||||
|
} catch (e) {
|
||||||
|
err = e;
|
||||||
|
}
|
||||||
|
assert(!!err);
|
||||||
|
assertEqual(err.kind, deno.ErrorKind.Other);
|
||||||
|
assertEqual(err.message, "Another accept task is ongoing");
|
||||||
|
listener.close();
|
||||||
|
});
|
||||||
|
|
||||||
testPerm({ net: true }, async function netDialListen() {
|
testPerm({ net: true }, async function netDialListen() {
|
||||||
const listener = deno.listen("tcp", ":4500");
|
const listener = deno.listen("tcp", ":4500");
|
||||||
listener.accept().then(async conn => {
|
listener.accept().then(async conn => {
|
||||||
|
|
|
@ -87,7 +87,12 @@ enum Repr {
|
||||||
Stdout(tokio::fs::File),
|
Stdout(tokio::fs::File),
|
||||||
Stderr(tokio::io::Stderr),
|
Stderr(tokio::io::Stderr),
|
||||||
FsFile(tokio::fs::File),
|
FsFile(tokio::fs::File),
|
||||||
TcpListener(tokio::net::TcpListener),
|
// Since TcpListener might be closed while there is a pending accept task,
|
||||||
|
// we need to track the task so that when the listener is closed,
|
||||||
|
// this pending task could be notified and die.
|
||||||
|
// Currently TcpListener itself does not take care of this issue.
|
||||||
|
// See: https://github.com/tokio-rs/tokio/issues/846
|
||||||
|
TcpListener(tokio::net::TcpListener, Option<futures::task::Task>),
|
||||||
TcpStream(tokio::net::TcpStream),
|
TcpStream(tokio::net::TcpStream),
|
||||||
HttpBody(HttpBody),
|
HttpBody(HttpBody),
|
||||||
Repl(Repl),
|
Repl(Repl),
|
||||||
|
@ -132,7 +137,7 @@ fn inspect_repr(repr: &Repr) -> String {
|
||||||
Repr::Stdout(_) => "stdout",
|
Repr::Stdout(_) => "stdout",
|
||||||
Repr::Stderr(_) => "stderr",
|
Repr::Stderr(_) => "stderr",
|
||||||
Repr::FsFile(_) => "fsFile",
|
Repr::FsFile(_) => "fsFile",
|
||||||
Repr::TcpListener(_) => "tcpListener",
|
Repr::TcpListener(_, _) => "tcpListener",
|
||||||
Repr::TcpStream(_) => "tcpStream",
|
Repr::TcpStream(_) => "tcpStream",
|
||||||
Repr::HttpBody(_) => "httpBody",
|
Repr::HttpBody(_) => "httpBody",
|
||||||
Repr::Repl(_) => "repl",
|
Repr::Repl(_) => "repl",
|
||||||
|
@ -159,20 +164,60 @@ impl Resource {
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
let maybe_repr = table.get_mut(&self.rid);
|
let maybe_repr = table.get_mut(&self.rid);
|
||||||
match maybe_repr {
|
match maybe_repr {
|
||||||
None => panic!("bad rid"),
|
None => Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
"Listener has been closed",
|
||||||
|
)),
|
||||||
Some(repr) => match repr {
|
Some(repr) => match repr {
|
||||||
Repr::TcpListener(ref mut s) => s.poll_accept(),
|
Repr::TcpListener(ref mut s, _) => s.poll_accept(),
|
||||||
_ => panic!("Cannot accept"),
|
_ => panic!("Cannot accept"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Track the current task (for TcpListener resource).
|
||||||
|
/// Throws an error if another task is already tracked.
|
||||||
|
pub fn track_task(&mut self) -> Result<(), std::io::Error> {
|
||||||
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
// Only track if is TcpListener.
|
||||||
|
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
|
||||||
|
// Currently, we only allow tracking a single accept task for a listener.
|
||||||
|
// This might be changed in the future with multiple workers.
|
||||||
|
// Caveat: TcpListener by itself also only tracks an accept task at a time.
|
||||||
|
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
|
||||||
|
if t.is_some() {
|
||||||
|
return Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
"Another accept task is ongoing",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
t.replace(futures::task::current());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop tracking a task (for TcpListener resource).
|
||||||
|
/// Happens when the task is done and thus no further tracking is needed.
|
||||||
|
pub fn untrack_task(&mut self) {
|
||||||
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
|
// Only untrack if is TcpListener.
|
||||||
|
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
|
||||||
|
assert!(t.is_some());
|
||||||
|
t.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// close(2) is done by dropping the value. Therefore we just need to remove
|
// close(2) is done by dropping the value. Therefore we just need to remove
|
||||||
// the resource from the RESOURCE_TABLE.
|
// the resource from the RESOURCE_TABLE.
|
||||||
pub fn close(&self) {
|
pub fn close(&self) {
|
||||||
let mut table = RESOURCE_TABLE.lock().unwrap();
|
let mut table = RESOURCE_TABLE.lock().unwrap();
|
||||||
let r = table.remove(&self.rid);
|
let r = table.remove(&self.rid);
|
||||||
assert!(r.is_some());
|
assert!(r.is_some());
|
||||||
|
// If TcpListener, we must kill all pending accepts!
|
||||||
|
if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
|
||||||
|
// Call notify on the tracked task, so that they would error out.
|
||||||
|
t.notify();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
|
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
|
||||||
|
@ -264,7 +309,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource {
|
||||||
pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
|
pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
|
||||||
let rid = new_rid();
|
let rid = new_rid();
|
||||||
let mut tg = RESOURCE_TABLE.lock().unwrap();
|
let mut tg = RESOURCE_TABLE.lock().unwrap();
|
||||||
let r = tg.insert(rid, Repr::TcpListener(listener));
|
let r = tg.insert(rid, Repr::TcpListener(listener, None));
|
||||||
assert!(r.is_none());
|
assert!(r.is_none());
|
||||||
Resource { rid }
|
Resource { rid }
|
||||||
}
|
}
|
||||||
|
@ -515,7 +560,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
|
||||||
match maybe_repr {
|
match maybe_repr {
|
||||||
None => panic!("bad rid"),
|
None => panic!("bad rid"),
|
||||||
Some(repr) => match repr {
|
Some(repr) => match repr {
|
||||||
Repr::TcpListener(ref mut tcp_listener) => {
|
Repr::TcpListener(ref mut tcp_listener, _) => {
|
||||||
eager::tcp_accept(tcp_listener, resource)
|
eager::tcp_accept(tcp_listener, resource)
|
||||||
}
|
}
|
||||||
_ => Either::A(tokio_util::accept(resource)),
|
_ => Either::A(tokio_util::accept(resource)),
|
||||||
|
|
|
@ -63,7 +63,25 @@ impl Future for Accept {
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
let (stream, addr) = match self.state {
|
let (stream, addr) = match self.state {
|
||||||
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
|
// Similar to try_ready!, but also track/untrack accept task
|
||||||
|
// in TcpListener resource.
|
||||||
|
// In this way, when the listener is closed, the task can be
|
||||||
|
// notified to error out (instead of stuck forever).
|
||||||
|
AcceptState::Pending(ref mut r) => match r.poll_accept() {
|
||||||
|
Ok(futures::prelude::Async::Ready(t)) => {
|
||||||
|
r.untrack_task();
|
||||||
|
t
|
||||||
|
}
|
||||||
|
Ok(futures::prelude::Async::NotReady) => {
|
||||||
|
// Would error out if another accept task is being tracked.
|
||||||
|
r.track_task()?;
|
||||||
|
return Ok(futures::prelude::Async::NotReady);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
r.untrack_task();
|
||||||
|
return Err(From::from(e));
|
||||||
|
}
|
||||||
|
},
|
||||||
AcceptState::Empty => panic!("poll Accept after it's done"),
|
AcceptState::Empty => panic!("poll Accept after it's done"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue