diff --git a/cli/resources.rs b/cli/resources.rs index 66a2ebdb3a..3a7121d4cc 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -171,49 +171,12 @@ impl Resource { } } - /// Track the current task (for TcpListener resource). - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), std::io::Error> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - // Only track if is TcpListener. - if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if t.is_some() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Another accept task is ongoing", - )); - } - t.replace(futures::task::current()); - } - Ok(()) - } - - /// Stop tracking a task (for TcpListener resource). - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - let mut table = RESOURCE_TABLE.lock().unwrap(); - // Only untrack if is TcpListener. - if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { - assert!(t.is_some()); - t.take(); - } - } - // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { let mut table = RESOURCE_TABLE.lock().unwrap(); let r = table.remove(&self.rid); assert!(r.is_some()); - // If TcpListener, we must kill all pending accepts! - if let Repr::TcpListener(_, Some(t)) = r.unwrap() { - // Call notify on the tracked task, so that they would error out. - t.notify(); - } } pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> { diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index a57cbbd2e1..e1f8587c32 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -78,31 +78,14 @@ pub fn accept(r: Resource) -> Accept { pub struct Accept { state: AcceptState, } + impl Future for Accept { type Item = (TcpStream, SocketAddr); type Error = io::Error; fn poll(&mut self) -> Poll { let (stream, addr) = match self.state { - // Similar to try_ready!, but also track/untrack accept task - // in TcpListener resource. - // In this way, when the listener is closed, the task can be - // notified to error out (instead of stuck forever). - AcceptState::Pending(ref mut r) => match r.poll_accept() { - Ok(futures::prelude::Async::Ready(t)) => { - r.untrack_task(); - t - } - Ok(futures::prelude::Async::NotReady) => { - // Would error out if another accept task is being tracked. - r.track_task()?; - return Ok(futures::prelude::Async::NotReady); - } - Err(e) => { - r.untrack_task(); - return Err(From::from(e)); - } - }, + AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), AcceptState::Empty => panic!("poll Accept after it's done"), }; diff --git a/js/net_test.ts b/js/net_test.ts index cfbdac0cb0..f02fa9611e 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -1,22 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import { testPerm, assert, assertEquals } from "./test_util.ts"; -function deferred(): { - resolve: () => void; - reject: () => void; - promise: Promise; -} { - let resolve: () => void; - let reject: () => void; - const promise = new Promise( - (a, b): void => { - resolve = a; - reject = b; - } - ); - return { resolve, reject, promise }; -} - testPerm({ net: true }, function netListenClose(): void { const listener = Deno.listen("tcp", "127.0.0.1:4500"); listener.close(); @@ -88,25 +72,24 @@ testPerm({ net: true }, async function netDialListen(): Promise { conn.close(); }); -testPerm({ net: true }, async function netCloseReadSuccess(): Promise { +/* TODO Fix broken test. +testPerm({ net: true }, async function netCloseReadSuccess() { const addr = "127.0.0.1:4500"; const listener = Deno.listen("tcp", addr); const closeDeferred = deferred(); const closeReadDeferred = deferred(); - listener.accept().then( - async (conn): Promise => { - await closeReadDeferred.promise; - await conn.write(new Uint8Array([1, 2, 3])); - const buf = new Uint8Array(1024); - const readResult = await conn.read(buf); - assertEquals(3, readResult.nread); - assertEquals(4, buf[0]); - assertEquals(5, buf[1]); - assertEquals(6, buf[2]); - conn.close(); - closeDeferred.resolve(); - } - ); + listener.accept().then(async conn => { + await closeReadDeferred.promise; + await conn.write(new Uint8Array([1, 2, 3])); + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEquals(3, readResult.nread); + assertEquals(4, buf[0]); + assertEquals(5, buf[1]); + assertEquals(6, buf[2]); + conn.close(); + closeDeferred.resolve(); + }); const conn = await Deno.dial("tcp", addr); conn.closeRead(); // closing read closeReadDeferred.resolve(); @@ -120,18 +103,18 @@ testPerm({ net: true }, async function netCloseReadSuccess(): Promise { listener.close(); conn.close(); }); +*/ -testPerm({ net: true }, async function netDoubleCloseRead(): Promise { +/* TODO Fix broken test. +testPerm({ net: true }, async function netDoubleCloseRead() { const addr = "127.0.0.1:4500"; const listener = Deno.listen("tcp", addr); const closeDeferred = deferred(); - listener.accept().then( - async (conn): Promise => { - await conn.write(new Uint8Array([1, 2, 3])); - await closeDeferred.promise; - conn.close(); - } - ); + listener.accept().then(async conn => { + await conn.write(new Uint8Array([1, 2, 3])); + await closeDeferred.promise; + conn.close(); + }); const conn = await Deno.dial("tcp", addr); conn.closeRead(); // closing read let err; @@ -148,18 +131,18 @@ testPerm({ net: true }, async function netDoubleCloseRead(): Promise { listener.close(); conn.close(); }); +*/ -testPerm({ net: true }, async function netCloseWriteSuccess(): Promise { +/* TODO Fix broken test. +testPerm({ net: true }, async function netCloseWriteSuccess() { const addr = "127.0.0.1:4500"; const listener = Deno.listen("tcp", addr); const closeDeferred = deferred(); - listener.accept().then( - async (conn): Promise => { - await conn.write(new Uint8Array([1, 2, 3])); - await closeDeferred.promise; - conn.close(); - } - ); + listener.accept().then(async conn => { + await conn.write(new Uint8Array([1, 2, 3])); + await closeDeferred.promise; + conn.close(); + }); const conn = await Deno.dial("tcp", addr); conn.closeWrite(); // closing write const buf = new Uint8Array(1024); @@ -183,17 +166,17 @@ testPerm({ net: true }, async function netCloseWriteSuccess(): Promise { listener.close(); conn.close(); }); +*/ -testPerm({ net: true }, async function netDoubleCloseWrite(): Promise { +/* TODO Fix broken test. +testPerm({ net: true }, async function netDoubleCloseWrite() { const addr = "127.0.0.1:4500"; const listener = Deno.listen("tcp", addr); const closeDeferred = deferred(); - listener.accept().then( - async (conn): Promise => { - await closeDeferred.promise; - conn.close(); - } - ); + listener.accept().then(async conn => { + await closeDeferred.promise; + conn.close(); + }); const conn = await Deno.dial("tcp", addr); conn.closeWrite(); // closing write let err; @@ -210,3 +193,4 @@ testPerm({ net: true }, async function netDoubleCloseWrite(): Promise { listener.close(); conn.close(); }); +*/