diff --git a/Cargo.lock b/Cargo.lock index 21bf5f1469..3d885d3615 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -522,11 +522,12 @@ dependencies = [ "lazy_static", "libc", "log", + "pin-project 1.0.2", "rusty_v8", "serde", "serde_json", "smallvec", - "tokio 0.3.4", + "tokio 0.3.5", "url", ] @@ -2944,9 +2945,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61" +checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" dependencies = [ "autocfg 1.0.1", "bytes 0.6.0", diff --git a/core/Cargo.toml b/core/Cargo.toml index 367def1f2f..21cc5f1f7b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] } serde = { version = "1.0", features = ["derive"] } smallvec = "1.4.2" url = { version = "2.1.1", features = ["serde"] } +pin-project = "1.0.2" [[example]] name = "http_bench_bin_ops" @@ -35,4 +36,4 @@ path = "examples/http_bench_json_ops.rs" # These dependendencies are only used for the 'http_bench_*_ops' examples. [dev-dependencies] -tokio = { version = "0.3.4", features = ["full"] } +tokio = { version = "0.3.5", features = ["full"] } diff --git a/core/async_cancel.rs b/core/async_cancel.rs new file mode 100644 index 0000000000..90cb0c41ff --- /dev/null +++ b/core/async_cancel.rs @@ -0,0 +1,710 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::RcLike; +use futures::future::FusedFuture; +use futures::future::Future; +use futures::future::TryFuture; +use futures::task::Context; +use futures::task::Poll; +use pin_project::pin_project; +use std::any::type_name; +use std::error::Error; +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; +use std::io; +use std::pin::Pin; +use std::rc::Rc; + +use self::internal as i; + +#[derive(Debug, Default)] +pub struct CancelHandle { + node: i::Node, +} + +impl CancelHandle { + pub fn new() -> Self { + Default::default() + } + + pub fn new_rc() -> Rc { + Rc::new(Self::new()) + } + + /// Cancel all cancelable futures that are bound to this handle. Note that + /// this method does not require a mutable reference to the `CancelHandle`. + pub fn cancel(&self) { + self.node.cancel(); + } + + pub fn is_canceled(&self) -> bool { + self.node.is_canceled() + } +} + +#[pin_project(project = CancelableProjection)] +#[derive(Debug)] +pub enum Cancelable { + Pending { + #[pin] + future: F, + #[pin] + registration: i::Registration, + }, + Terminated, +} + +impl Future for Cancelable { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let poll_result = match self.as_mut().project() { + CancelableProjection::Pending { + future, + registration, + } => Self::poll_pending(future, registration, cx), + CancelableProjection::Terminated => { + panic!("{}::poll() called after completion", type_name::()) + } + }; + // Fuse: if this Future is completed or canceled, make sure the inner + // `future` and `registration` fields are dropped in order to unlink it from + // its cancel handle. + if matches!(poll_result, Poll::Ready(_)) { + self.set(Cancelable::Terminated) + } + poll_result + } +} + +impl FusedFuture for Cancelable { + fn is_terminated(&self) -> bool { + matches!(self, Self::Terminated) + } +} + +#[pin_project(project = TryCancelableProjection)] +#[derive(Debug)] +pub struct TryCancelable { + #[pin] + inner: Cancelable, +} + +impl Future for TryCancelable +where + F: Future>, + Canceled: Into, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let TryCancelableProjection { inner } = self.project(); + match inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(result)) => Poll::Ready(result), + Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + } + } +} + +impl FusedFuture for TryCancelable +where + F: Future>, + Canceled: Into, +{ + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +pub trait CancelFuture +where + Self: Future + Sized, +{ + fn or_cancel>( + self, + cancel_handle: H, + ) -> Cancelable { + Cancelable::new(self, cancel_handle.into()) + } +} + +impl CancelFuture for F where F: Future {} + +pub trait CancelTryFuture +where + Self: TryFuture + Sized, + Canceled: Into, +{ + fn try_or_cancel>( + self, + cancel_handle: H, + ) -> TryCancelable { + TryCancelable::new(self, cancel_handle.into()) + } +} + +impl CancelTryFuture for F +where + F: TryFuture, + Canceled: Into, +{ +} + +#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)] +pub struct Canceled; + +impl Display for Canceled { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "operation canceled") + } +} + +impl Error for Canceled {} + +impl From for io::Error { + fn from(_: Canceled) -> Self { + io::Error::new(io::ErrorKind::Interrupted, Canceled) + } +} + +mod internal { + use super::CancelHandle; + use super::Cancelable; + use super::Canceled; + use super::TryCancelable; + use crate::RcRef; + use futures::future::Future; + use futures::task::Context; + use futures::task::Poll; + use futures::task::Waker; + use pin_project::pin_project; + use std::any::Any; + use std::cell::UnsafeCell; + use std::marker::PhantomPinned; + use std::mem::replace; + use std::pin::Pin; + use std::ptr::NonNull; + use std::rc::Rc; + use std::rc::Weak; + + impl Cancelable { + pub(super) fn new(future: F, cancel_handle: RcRef) -> Self { + let head_node = RcRef::map(cancel_handle, |r| &r.node); + let registration = Registration::WillRegister { head_node }; + Self::Pending { + future, + registration, + } + } + + pub(super) fn poll_pending( + future: Pin<&mut F>, + mut registration: Pin<&mut Registration>, + cx: &mut Context, + ) -> Poll> { + // If this future is being polled for the first time, perform an extra + // cancellation check _before_ polling the inner future. The reason to do + // this is that polling the inner future for the first time might start + // some activity that cannot actually be canceled (e.g. running a compute + // job in a thread pool), so we should try to never start it at all. + match &*registration { + Registration::WillRegister { head_node } if head_node.is_canceled() => { + return Poll::Ready(Err(Canceled)); + } + _ => {} + } + + match future.poll(cx) { + Poll::Ready(res) => return Poll::Ready(Ok(res)), + Poll::Pending => {} + } + + // Register this future with its `CancelHandle`, saving the `Waker` that + // can be used to make the runtime poll this future when it is canceled. + // When already registered, update the stored `Waker` if necessary. + let head_node = match &*registration { + Registration::WillRegister { .. } => { + match registration.as_mut().project_replace(Default::default()) { + RegistrationProjectionOwned::WillRegister { head_node } => { + Some(head_node) + } + _ => unreachable!(), + } + } + _ => None, + }; + let node = match registration.project() { + RegistrationProjection::Registered { node } => node, + _ => unreachable!(), + }; + node.register(cx.waker(), head_node)?; + + Poll::Pending + } + } + + impl TryCancelable { + pub(super) fn new(future: F, cancel_handle: RcRef) -> Self { + Self { + inner: Cancelable::new(future, cancel_handle), + } + } + } + + #[pin_project(project = RegistrationProjection, + project_replace = RegistrationProjectionOwned)] + #[derive(Debug)] + pub enum Registration { + WillRegister { + head_node: RcRef, + }, + Registered { + #[pin] + node: Node, + }, + } + + impl Default for Registration { + fn default() -> Self { + Self::Registered { + node: Default::default(), + } + } + } + + #[derive(Debug)] + pub struct Node { + inner: UnsafeCell, + _pin: PhantomPinned, + } + + impl Node { + /// If necessary, register a `Cancelable` node with a `CancelHandle`, and + /// save or update the `Waker` that can wake with this cancelable future. + pub fn register( + &self, + waker: &Waker, + head_rc: Option>, + ) -> Result<(), Canceled> { + match head_rc.as_ref().map(RcRef::split) { + Some((head, rc)) => { + // Register this `Cancelable` node with a `CancelHandle` head node. + assert_ne!(self, head); + let self_inner = unsafe { &mut *self.inner.get() }; + let head_inner = unsafe { &mut *head.inner.get() }; + self_inner.link(waker, head_inner, rc) + } + None => { + // This `Cancelable` has already been linked to a `CancelHandle` head + // node; just update our stored `Waker` if necessary. + let inner = unsafe { &mut *self.inner.get() }; + inner.update_waker(waker) + } + } + } + + pub fn cancel(&self) { + let inner = unsafe { &mut *self.inner.get() }; + inner.cancel(); + } + + pub fn is_canceled(&self) -> bool { + let inner = unsafe { &mut *self.inner.get() }; + inner.is_canceled() + } + } + + impl Default for Node { + fn default() -> Self { + Self { + inner: UnsafeCell::new(NodeInner::Unlinked), + _pin: PhantomPinned, + } + } + } + + impl Drop for Node { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get() }; + inner.unlink(); + } + } + + impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self as *const _ == other as *const _ + } + } + + #[derive(Debug)] + enum NodeInner { + Unlinked, + Linked { + kind: NodeKind, + prev: NonNull, + next: NonNull, + }, + Canceled, + } + + impl NodeInner { + fn as_non_null(&mut self) -> NonNull { + NonNull::from(self) + } + + fn link( + &mut self, + waker: &Waker, + head: &mut Self, + rc_pin: &Rc, + ) -> Result<(), Canceled> { + // The future should not have been linked to a cancel handle before. + assert!(matches!(self, NodeInner::Unlinked)); + + match head { + NodeInner::Unlinked => { + *head = NodeInner::Linked { + kind: NodeKind::head(rc_pin), + prev: self.as_non_null(), + next: self.as_non_null(), + }; + *self = NodeInner::Linked { + kind: NodeKind::item(waker), + prev: head.as_non_null(), + next: head.as_non_null(), + }; + Ok(()) + } + NodeInner::Linked { + kind: NodeKind::Head { .. }, + prev: next_prev_nn, + .. + } => { + let prev = unsafe { &mut *next_prev_nn.as_ptr() }; + match prev { + NodeInner::Linked { + kind: NodeKind::Item { .. }, + next: prev_next_nn, + .. + } => { + *self = NodeInner::Linked { + kind: NodeKind::item(waker), + prev: replace(next_prev_nn, self.as_non_null()), + next: replace(prev_next_nn, self.as_non_null()), + }; + Ok(()) + } + _ => unreachable!(), + } + } + NodeInner::Canceled => Err(Canceled), + _ => unreachable!(), + } + } + + fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> { + match self { + NodeInner::Unlinked => Ok(()), + NodeInner::Linked { + kind: NodeKind::Item { waker }, + .. + } => { + if !waker.will_wake(new_waker) { + *waker = new_waker.clone(); + } + Ok(()) + } + NodeInner::Canceled => Err(Canceled), + _ => unreachable!(), + } + } + + /// If this node is linked to other nodes, remove it from the chain. This + /// method is called (only) by the drop handler for `Node`. It is suitable + /// for both 'head' and 'item' nodes. + fn unlink(&mut self) { + if let NodeInner::Linked { + prev: mut prev_nn, + next: mut next_nn, + .. + } = replace(self, NodeInner::Unlinked) + { + if prev_nn == next_nn { + // There were only two nodes in this chain; after unlinking ourselves + // the other node is no longer linked. + let other = unsafe { prev_nn.as_mut() }; + *other = NodeInner::Unlinked; + } else { + // The chain had more than two nodes. + match unsafe { prev_nn.as_mut() } { + NodeInner::Linked { + next: prev_next_nn, .. + } => { + *prev_next_nn = next_nn; + } + _ => unreachable!(), + } + match unsafe { next_nn.as_mut() } { + NodeInner::Linked { + prev: next_prev_nn, .. + } => { + *next_prev_nn = prev_nn; + } + _ => unreachable!(), + } + } + } + } + + /// Mark this node and all linked nodes for cancellation. Note that `self` + /// must refer to a head (`CancelHandle`) node. + fn cancel(&mut self) { + let mut head_nn = NonNull::from(self); + let mut item_nn; + + // Mark the head node as canceled. + match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) { + NodeInner::Linked { + kind: NodeKind::Head { .. }, + next: next_nn, + .. + } => item_nn = next_nn, + NodeInner::Unlinked | NodeInner::Canceled => return, + _ => unreachable!(), + }; + + // Cancel all item nodes in the chain, waking each stored `Waker`. + while item_nn != head_nn { + match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) { + NodeInner::Linked { + kind: NodeKind::Item { waker }, + next: next_nn, + .. + } => { + waker.wake(); + item_nn = next_nn; + } + _ => unreachable!(), + } + } + } + + /// Returns true if this node has been marked for cancellation. Note that + /// `self` must refer to a head (`CancelHandle`) node. + fn is_canceled(&self) -> bool { + match self { + NodeInner::Unlinked => false, + NodeInner::Linked { + kind: NodeKind::Head { .. }, + .. + } => false, + NodeInner::Canceled => true, + _ => unreachable!(), + } + } + } + + #[derive(Debug)] + enum NodeKind { + /// In a chain of linked nodes, the "head" node is owned by the + /// `CancelHandle`. A chain usually contains at most one head node; however + /// when a `CancelHandle` is dropped before the futures associated with it + /// are dropped, a chain may temporarily contain no head node at all. + Head { + /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding + /// the heap allocation that contains the `CancelHandle`. Without this + /// extra weak reference, `Rc::get_mut()` might succeed and allow the + /// `CancelHandle` to be moved when it isn't safe to do so. + weak_pin: Weak, + }, + /// All item nodes in a chain are associated with a `Cancelable` head node. + Item { + /// If this future indeed does get canceled, the waker is needed to make + /// sure that the canceled future gets polled as soon as possible. + waker: Waker, + }, + } + + impl NodeKind { + fn head(rc_pin: &Rc) -> Self { + let weak_pin = Rc::downgrade(rc_pin); + Self::Head { weak_pin } + } + + fn item(waker: &Waker) -> Self { + let waker = waker.clone(); + Self::Item { waker } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::AnyError; + use futures::future::pending; + use futures::future::poll_fn; + use futures::future::ready; + use futures::future::FutureExt; + use futures::future::TryFutureExt; + use futures::select; + use futures::task::noop_waker_ref; + use futures::task::Context; + use futures::task::Poll; + use std::convert::Infallible as Never; + use std::io; + use tokio::net::TcpStream; + use tokio::spawn; + + fn box_fused<'a, F: FusedFuture + 'a>( + future: F, + ) -> Pin + 'a>> { + Box::pin(future) + } + + async fn ready_in_n(name: &str, count: usize) -> &str { + let mut remaining = count as isize; + poll_fn(|_| { + assert!(remaining >= 0); + if remaining == 0 { + Poll::Ready(name) + } else { + remaining -= 1; + Poll::Pending + } + }) + .await + } + + #[test] + fn cancel_future() { + let cancel_now = CancelHandle::new_rc(); + let cancel_at_0 = CancelHandle::new_rc(); + let cancel_at_1 = CancelHandle::new_rc(); + let cancel_at_4 = CancelHandle::new_rc(); + let cancel_never = CancelHandle::new_rc(); + + cancel_now.cancel(); + + let mut futures = vec![ + box_fused(ready("A").or_cancel(&cancel_now)), + box_fused(ready("B").or_cancel(&cancel_at_0)), + box_fused(ready("C").or_cancel(&cancel_at_1)), + box_fused( + ready_in_n("D", 0) + .or_cancel(&cancel_never) + .try_or_cancel(&cancel_now), + ), + box_fused( + ready_in_n("E", 1) + .or_cancel(&cancel_at_1) + .try_or_cancel(&cancel_at_1), + ), + box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)), + box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)), + box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)), + box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)), + box_fused(ready_in_n("J", 5).map(Ok)), + box_fused(ready_in_n("K", 5).or_cancel(cancel_never)), + ]; + + let mut cx = Context::from_waker(noop_waker_ref()); + + for i in 0..=5 { + match i { + 0 => cancel_at_0.cancel(), + 1 => cancel_at_1.cancel(), + 4 => cancel_at_4.cancel(), + 2 | 3 | 5 => {} + _ => unreachable!(), + } + + let results = futures + .iter_mut() + .filter(|fut| !fut.is_terminated()) + .filter_map(|fut| match fut.poll_unpin(&mut cx) { + Poll::Pending => None, + Poll::Ready(res) => Some(res), + }) + .collect::>(); + + match i { + 0 => assert_eq!( + results, + [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)] + ), + 1 => assert_eq!(results, [Ok("E"), Err(Canceled)]), + 2 => assert_eq!(results, []), + 3 => assert_eq!(results, [Ok("G")]), + 4 => assert_eq!(results, [Ok("H"), Err(Canceled)]), + 5 => assert_eq!(results, [Ok("J"), Ok("K")]), + _ => unreachable!(), + } + } + + assert_eq!(futures.into_iter().any(|fut| !fut.is_terminated()), false); + + let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4]; + assert_eq!(cancel_handles.iter().any(|c| !c.is_canceled()), false); + } + + #[tokio::test] + async fn cancel_try_future() { + { + // Cancel a spawned task before it actually runs. + let cancel_handle = Rc::new(CancelHandle::new()); + let future = spawn(async { panic!("the task should not be spawned") }) + .map_err(AnyError::from) + .try_or_cancel(&cancel_handle); + cancel_handle.cancel(); + let error = future.await.unwrap_err(); + assert!(error.downcast_ref::().is_some()); + assert_eq!(error.to_string().as_str(), "operation canceled"); + } + + { + // Cancel a network I/O future right after polling it. + let cancel_handle = Rc::new(CancelHandle::new()); + let result = loop { + select! { + r = TcpStream::connect("1.2.3.4:12345") + .try_or_cancel(&cancel_handle) => break r, + default => cancel_handle.cancel(), + }; + }; + let error = result.unwrap_err(); + assert_eq!(error.kind(), io::ErrorKind::Interrupted); + assert_eq!(error.to_string().as_str(), "operation canceled"); + } + } + + #[test] + fn cancel_handle_pinning() { + let mut cancel_handle = CancelHandle::new_rc(); + + // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should + // succeed. + assert!(Rc::get_mut(&mut cancel_handle).is_some()); + + let mut future = pending::().or_cancel(&cancel_handle); + let future = unsafe { Pin::new_unchecked(&mut future) }; + + // There are two `Rc` references now, so this fails. + assert!(Rc::get_mut(&mut cancel_handle).is_none()); + + let mut cx = Context::from_waker(noop_waker_ref()); + assert!(future.poll(&mut cx).is_pending()); + + // Polling `future` has established a link between the future and + // `cancel_handle`, so both values should be pinned at this point. + assert!(Rc::get_mut(&mut cancel_handle).is_none()); + + cancel_handle.cancel(); + + // Canceling or dropping the associated future(s) unlinks them from the + // cancel handle, therefore `cancel_handle` can now safely be moved again. + assert!(Rc::get_mut(&mut cancel_handle).is_some()); + } +} diff --git a/core/async_cell.rs b/core/async_cell.rs index a140dceb19..bf62692ed8 100644 --- a/core/async_cell.rs +++ b/core/async_cell.rs @@ -126,6 +126,7 @@ impl RcRef> { /// let foo_rc: RcRef = RcRef::map(stuff_rc.clone(), |v| &v.foo); /// let bar_rc: RcRef = RcRef::map(stuff_rc, |v| &v.bar); /// ``` +#[derive(Debug)] pub struct RcRef { rc: Rc, value: *const T, @@ -136,7 +137,7 @@ impl RcRef { Self::from(Rc::new(value)) } - pub fn map, F: FnOnce(&S) -> &T>( + pub fn map, F: FnOnce(&S) -> &T>( source: R, map_fn: F, ) -> RcRef { @@ -144,6 +145,11 @@ impl RcRef { let value = map_fn(unsafe { &*value }); RcRef { rc, value } } + + pub(crate) fn split(rc_ref: &Self) -> (&T, &Rc) { + let &Self { ref rc, value } = rc_ref; + (unsafe { &*value }, rc) + } } impl Default for RcRef { @@ -152,6 +158,21 @@ impl Default for RcRef { } } +impl Clone for RcRef { + fn clone(&self) -> Self { + Self { + rc: self.rc.clone(), + value: self.value, + } + } +} + +impl From<&RcRef> for RcRef { + fn from(rc_ref: &RcRef) -> Self { + rc_ref.clone() + } +} + impl From> for RcRef { fn from(rc: Rc) -> Self { Self { @@ -161,12 +182,9 @@ impl From> for RcRef { } } -impl Clone for RcRef { - fn clone(&self) -> Self { - Self { - rc: self.rc.clone(), - value: self.value, - } +impl From<&Rc> for RcRef { + fn from(rc: &Rc) -> Self { + rc.clone().into() } } @@ -189,8 +207,18 @@ impl AsRef for RcRef { } } +/// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`, +/// so that applicable methods can operate on either type. +pub trait RcLike: AsRef + Into> {} + +impl RcLike for Rc {} +impl RcLike for RcRef {} +impl RcLike for &Rc {} +impl RcLike for &RcRef {} + mod internal { use super::AsyncRefCell; + use super::RcLike; use super::RcRef; use futures::future::Future; use futures::ready; @@ -204,32 +232,29 @@ mod internal { use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; - use std::rc::Rc; impl AsyncRefCell { /// Borrow the cell's contents synchronouslym without creating an /// intermediate future. If the cell has already been borrowed and either /// the existing or the requested borrow is exclusive, this function returns - /// `None`. - pub(super) fn borrow_sync< - M: BorrowModeTrait, - R: RcLike>, - >( - cell: &R, + /// `None`. + pub fn borrow_sync>>( + cell: R, ) -> Option> { + let cell_ref = cell.as_ref(); // Don't allow synchronous borrows to cut in line; if there are any // enqueued waiters, return `None`, even if the current borrow is a shared // one and the requested borrow is too. - let waiters = unsafe { &mut *cell.waiters.as_ptr() }; + let waiters = unsafe { &mut *cell_ref.waiters.as_ptr() }; if waiters.is_empty() { // There are no enqueued waiters, but it is still possible that the cell // is currently borrowed. If there are no current borrows, or both the // existing and requested ones are shared, `try_add()` returns the // adjusted borrow count. let new_borrow_count = - cell.borrow_count.get().try_add(M::borrow_mode())?; - cell.borrow_count.set(new_borrow_count); - Some(AsyncBorrowImpl::::new(cell.clone().into())) + cell_ref.borrow_count.get().try_add(M::borrow_mode())?; + cell_ref.borrow_count.set(new_borrow_count); + Some(AsyncBorrowImpl::::new(cell.into())) } else { None } @@ -359,10 +384,10 @@ mod internal { } impl AsyncBorrowFutureImpl { - pub fn new>>(cell: &R) -> Self { + pub fn new>>(cell: R) -> Self { Self { - cell: Some(cell.clone().into()), - id: cell.create_waiter::(), + id: cell.as_ref().create_waiter::(), + cell: Some(cell.into()), _phantom: PhantomData, } } @@ -561,13 +586,6 @@ mod internal { self.waker.take() } } - - /// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`, - /// so that applicable methods can operate on either type. - pub trait RcLike: Clone + Deref + Into> {} - - impl RcLike for Rc {} - impl RcLike for RcRef {} } #[cfg(test)] diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 9af74d9800..1d7a76c3d0 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,10 +3,10 @@ #[macro_use] extern crate log; -use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; -use deno_core::AsyncRefFuture; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::Op; use deno_core::OpState; @@ -46,51 +46,65 @@ impl log::Log for Logger { fn flush(&self) {} } -// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in -// a cell, because it only supports one op (`accept`) which does not require -// a mutable reference to the listener. -struct TcpListener(AsyncRefCell); - -impl Resource for TcpListener {} +// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell, +// because it only supports one op (`accept`) which does not require a mutable +// reference to the listener. +struct TcpListener { + inner: tokio::net::TcpListener, + cancel: CancelHandle, +} impl TcpListener { - /// Returns a future that yields a shared borrow of the TCP listener. - fn borrow(self: Rc) -> AsyncRefFuture { - RcRef::map(self, |r| &r.0).borrow() + async fn accept(self: Rc) -> Result { + let cancel = RcRef::map(&self, |r| &r.cancel); + let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into(); + Ok(stream) + } +} + +impl Resource for TcpListener { + fn close(self: Rc) { + self.cancel.cancel(); } } impl TryFrom for TcpListener { type Error = Error; - fn try_from(l: std::net::TcpListener) -> Result { - tokio::net::TcpListener::try_from(l) - .map(AsyncRefCell::new) - .map(Self) + fn try_from( + std_listener: std::net::TcpListener, + ) -> Result { + tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self { + inner: tokio_listener, + cancel: Default::default(), + }) } } struct TcpStream { rd: AsyncRefCell, wr: AsyncRefCell, + // When a `TcpStream` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, } -impl Resource for TcpStream {} - impl TcpStream { - /// Returns a future that yields an exclusive borrow of the read end of the - /// tcp stream. - fn rd_borrow_mut( - self: Rc, - ) -> AsyncMutFuture { - RcRef::map(self, |r| &r.rd).borrow_mut() + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + rd.read(buf).try_or_cancel(cancel).await } - /// Returns a future that yields an exclusive borrow of the write end of the - /// tcp stream. - fn wr_borrow_mut( - self: Rc, - ) -> AsyncMutFuture { - RcRef::map(self, |r| &r.wr).borrow_mut() + async fn write(self: Rc, buf: &[u8]) -> Result { + let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; + wr.write(buf).await + } +} + +impl Resource for TcpStream { + fn close(self: Rc) { + self.cancel.cancel() } } @@ -100,6 +114,7 @@ impl From for TcpStream { Self { rd: rd.into(), wr: wr.into(), + cancel: Default::default(), } } } @@ -179,14 +194,12 @@ async fn op_accept( ) -> Result { debug!("accept rid={}", rid); - let listener_rc = state + let listener = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let listener_ref = listener_rc.borrow().await; - - let stream: TcpStream = listener_ref.accept().await?.0.into(); + let stream = listener.accept().await?; let rid = state.borrow_mut().resource_table_2.add(stream); Ok(rid) } @@ -199,14 +212,12 @@ async fn op_read( assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("read rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - - rd_stream_mut.read(&mut bufs[0]).await + stream.read(&mut bufs[0]).await } async fn op_write( @@ -217,14 +228,12 @@ async fn op_write( assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("write rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - - wr_stream_mut.write(&bufs[0]).await + stream.write(&bufs[0]).await } fn register_op_bin_sync( diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 77f5b9dbe6..c4fcd63636 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -5,10 +5,10 @@ extern crate log; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; -use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; -use deno_core::AsyncRefFuture; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::RcRef; @@ -41,51 +41,65 @@ impl log::Log for Logger { fn flush(&self) {} } -// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in -// a cell, because it only supports one op (`accept`) which does not require -// a mutable reference to the listener. -struct TcpListener(AsyncRefCell); - -impl Resource for TcpListener {} +// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell, +// because it only supports one op (`accept`) which does not require a mutable +// reference to the listener. +struct TcpListener { + inner: tokio::net::TcpListener, + cancel: CancelHandle, +} impl TcpListener { - /// Returns a future that yields a shared borrow of the TCP listener. - fn borrow(self: Rc) -> AsyncRefFuture { - RcRef::map(self, |r| &r.0).borrow() + async fn accept(self: Rc) -> Result { + let cancel = RcRef::map(&self, |r| &r.cancel); + let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into(); + Ok(stream) + } +} + +impl Resource for TcpListener { + fn close(self: Rc) { + self.cancel.cancel(); } } impl TryFrom for TcpListener { type Error = Error; - fn try_from(l: std::net::TcpListener) -> Result { - tokio::net::TcpListener::try_from(l) - .map(AsyncRefCell::new) - .map(Self) + fn try_from( + std_listener: std::net::TcpListener, + ) -> Result { + tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self { + inner: tokio_listener, + cancel: Default::default(), + }) } } struct TcpStream { rd: AsyncRefCell, wr: AsyncRefCell, + // When a `TcpStream` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, } -impl Resource for TcpStream {} - impl TcpStream { - /// Returns a future that yields an exclusive borrow of the read end of the - /// tcp stream. - fn rd_borrow_mut( - self: Rc, - ) -> AsyncMutFuture { - RcRef::map(self, |r| &r.rd).borrow_mut() + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + rd.read(buf).try_or_cancel(cancel).await } - /// Returns a future that yields an exclusive borrow of the write end of the - /// tcp stream. - fn wr_borrow_mut( - self: Rc, - ) -> AsyncMutFuture { - RcRef::map(self, |r| &r.wr).borrow_mut() + async fn write(self: Rc, buf: &[u8]) -> Result { + let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; + wr.write(buf).await + } +} + +impl Resource for TcpStream { + fn close(self: Rc) { + self.cancel.cancel() } } @@ -95,6 +109,7 @@ impl From for TcpStream { Self { rd: rd.into(), wr: wr.into(), + cancel: Default::default(), } } } @@ -157,14 +172,12 @@ async fn op_accept( .unwrap(); debug!("accept rid={}", rid); - let listener_rc = state + let listener = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let listener_ref = listener_rc.borrow().await; - - let stream: TcpStream = listener_ref.accept().await?.0.into(); + let stream = listener.accept().await?; let rid = state.borrow_mut().resource_table_2.add(stream); Ok(serde_json::json!({ "rid": rid })) } @@ -184,14 +197,12 @@ async fn op_read( .unwrap(); debug!("read rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - - let nread = rd_stream_mut.read(&mut bufs[0]).await?; + let nread = stream.read(&mut bufs[0]).await?; Ok(serde_json::json!({ "nread": nread })) } @@ -210,14 +221,12 @@ async fn op_write( .unwrap(); debug!("write rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::(rid) .ok_or_else(bad_resource_id)?; - let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - - let nwritten = wr_stream_mut.write(&bufs[0]).await?; + let nwritten = stream.write(&bufs[0]).await?; Ok(serde_json::json!({ "nwritten": nwritten })) } diff --git a/core/lib.rs b/core/lib.rs index 372cd558ec..20ee5a3d5d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -5,6 +5,7 @@ extern crate lazy_static; #[macro_use] extern crate log; +mod async_cancel; mod async_cell; mod bindings; pub mod error; @@ -28,11 +29,18 @@ pub use serde; pub use serde_json; pub use url; +pub use crate::async_cancel::CancelFuture; +pub use crate::async_cancel::CancelHandle; +pub use crate::async_cancel::CancelTryFuture; +pub use crate::async_cancel::Cancelable; +pub use crate::async_cancel::Canceled; +pub use crate::async_cancel::TryCancelable; pub use crate::async_cell::AsyncMut; pub use crate::async_cell::AsyncMutFuture; pub use crate::async_cell::AsyncRef; pub use crate::async_cell::AsyncRefCell; pub use crate::async_cell::AsyncRefFuture; +pub use crate::async_cell::RcLike; pub use crate::async_cell::RcRef; pub use crate::flags::v8_set_flags; pub use crate::module_specifier::ModuleResolutionError; diff --git a/core/resources2.rs b/core/resources2.rs index 62cb3f0565..92548a5565 100644 --- a/core/resources2.rs +++ b/core/resources2.rs @@ -24,6 +24,11 @@ pub trait Resource: Any + 'static { fn name(&self) -> Cow { type_name::().into() } + + /// Resources may implement the `close()` trait method if they need to do + /// resource specific clean-ups, such as cancelling pending futures, after a + /// resource has been removed from the resource table. + fn close(self: Rc) {} } impl dyn Resource { @@ -117,7 +122,7 @@ impl ResourceTable { /// cause the resource to be dropped. However, since resources are reference /// counted, therefore pending ops are not automatically cancelled. pub fn close(&mut self, rid: ResourceId) -> Option<()> { - self.index.remove(&rid).map(|_| ()) + self.index.remove(&rid).map(|resource| resource.close()) } /// Returns an iterator that yields a `(id, name)` pair for every resource