mirror of
https://github.com/denoland/deno.git
synced 2024-11-28 16:20:57 -05:00
core: add plumbing for canceling ops when closing a resource (#8661)
This commit is contained in:
parent
b1379b7de3
commit
b200e6fc3e
8 changed files with 876 additions and 115 deletions
7
Cargo.lock
generated
7
Cargo.lock
generated
|
@ -522,11 +522,12 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
"pin-project 1.0.2",
|
||||||
"rusty_v8",
|
"rusty_v8",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"tokio 0.3.4",
|
"tokio 0.3.5",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2944,9 +2945,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "0.3.4"
|
version = "0.3.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61"
|
checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg 1.0.1",
|
"autocfg 1.0.1",
|
||||||
"bytes 0.6.0",
|
"bytes 0.6.0",
|
||||||
|
|
|
@ -24,6 +24,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
smallvec = "1.4.2"
|
smallvec = "1.4.2"
|
||||||
url = { version = "2.1.1", features = ["serde"] }
|
url = { version = "2.1.1", features = ["serde"] }
|
||||||
|
pin-project = "1.0.2"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "http_bench_bin_ops"
|
name = "http_bench_bin_ops"
|
||||||
|
@ -35,4 +36,4 @@ path = "examples/http_bench_json_ops.rs"
|
||||||
|
|
||||||
# These dependendencies are only used for the 'http_bench_*_ops' examples.
|
# These dependendencies are only used for the 'http_bench_*_ops' examples.
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.3.4", features = ["full"] }
|
tokio = { version = "0.3.5", features = ["full"] }
|
||||||
|
|
710
core/async_cancel.rs
Normal file
710
core/async_cancel.rs
Normal file
|
@ -0,0 +1,710 @@
|
||||||
|
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use crate::RcLike;
|
||||||
|
use futures::future::FusedFuture;
|
||||||
|
use futures::future::Future;
|
||||||
|
use futures::future::TryFuture;
|
||||||
|
use futures::task::Context;
|
||||||
|
use futures::task::Poll;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
use std::any::type_name;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt;
|
||||||
|
use std::fmt::Display;
|
||||||
|
use std::fmt::Formatter;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use self::internal as i;
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct CancelHandle {
|
||||||
|
node: i::Node,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CancelHandle {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Default::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_rc() -> Rc<Self> {
|
||||||
|
Rc::new(Self::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancel all cancelable futures that are bound to this handle. Note that
|
||||||
|
/// this method does not require a mutable reference to the `CancelHandle`.
|
||||||
|
pub fn cancel(&self) {
|
||||||
|
self.node.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_canceled(&self) -> bool {
|
||||||
|
self.node.is_canceled()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project(project = CancelableProjection)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Cancelable<F> {
|
||||||
|
Pending {
|
||||||
|
#[pin]
|
||||||
|
future: F,
|
||||||
|
#[pin]
|
||||||
|
registration: i::Registration,
|
||||||
|
},
|
||||||
|
Terminated,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future> Future for Cancelable<F> {
|
||||||
|
type Output = Result<F::Output, Canceled>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
let poll_result = match self.as_mut().project() {
|
||||||
|
CancelableProjection::Pending {
|
||||||
|
future,
|
||||||
|
registration,
|
||||||
|
} => Self::poll_pending(future, registration, cx),
|
||||||
|
CancelableProjection::Terminated => {
|
||||||
|
panic!("{}::poll() called after completion", type_name::<Self>())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Fuse: if this Future is completed or canceled, make sure the inner
|
||||||
|
// `future` and `registration` fields are dropped in order to unlink it from
|
||||||
|
// its cancel handle.
|
||||||
|
if matches!(poll_result, Poll::Ready(_)) {
|
||||||
|
self.set(Cancelable::Terminated)
|
||||||
|
}
|
||||||
|
poll_result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future> FusedFuture for Cancelable<F> {
|
||||||
|
fn is_terminated(&self) -> bool {
|
||||||
|
matches!(self, Self::Terminated)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project(project = TryCancelableProjection)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TryCancelable<F> {
|
||||||
|
#[pin]
|
||||||
|
inner: Cancelable<F>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, T, E> Future for TryCancelable<F>
|
||||||
|
where
|
||||||
|
F: Future<Output = Result<T, E>>,
|
||||||
|
Canceled: Into<E>,
|
||||||
|
{
|
||||||
|
type Output = F::Output;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
let TryCancelableProjection { inner } = self.project();
|
||||||
|
match inner.poll(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(Ok(result)) => Poll::Ready(result),
|
||||||
|
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, T, E> FusedFuture for TryCancelable<F>
|
||||||
|
where
|
||||||
|
F: Future<Output = Result<T, E>>,
|
||||||
|
Canceled: Into<E>,
|
||||||
|
{
|
||||||
|
fn is_terminated(&self) -> bool {
|
||||||
|
self.inner.is_terminated()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait CancelFuture
|
||||||
|
where
|
||||||
|
Self: Future + Sized,
|
||||||
|
{
|
||||||
|
fn or_cancel<H: RcLike<CancelHandle>>(
|
||||||
|
self,
|
||||||
|
cancel_handle: H,
|
||||||
|
) -> Cancelable<Self> {
|
||||||
|
Cancelable::new(self, cancel_handle.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> CancelFuture for F where F: Future {}
|
||||||
|
|
||||||
|
pub trait CancelTryFuture
|
||||||
|
where
|
||||||
|
Self: TryFuture + Sized,
|
||||||
|
Canceled: Into<Self::Error>,
|
||||||
|
{
|
||||||
|
fn try_or_cancel<H: RcLike<CancelHandle>>(
|
||||||
|
self,
|
||||||
|
cancel_handle: H,
|
||||||
|
) -> TryCancelable<Self> {
|
||||||
|
TryCancelable::new(self, cancel_handle.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> CancelTryFuture for F
|
||||||
|
where
|
||||||
|
F: TryFuture,
|
||||||
|
Canceled: Into<F::Error>,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
|
||||||
|
pub struct Canceled;
|
||||||
|
|
||||||
|
impl Display for Canceled {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "operation canceled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for Canceled {}
|
||||||
|
|
||||||
|
impl From<Canceled> for io::Error {
|
||||||
|
fn from(_: Canceled) -> Self {
|
||||||
|
io::Error::new(io::ErrorKind::Interrupted, Canceled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod internal {
|
||||||
|
use super::CancelHandle;
|
||||||
|
use super::Cancelable;
|
||||||
|
use super::Canceled;
|
||||||
|
use super::TryCancelable;
|
||||||
|
use crate::RcRef;
|
||||||
|
use futures::future::Future;
|
||||||
|
use futures::task::Context;
|
||||||
|
use futures::task::Poll;
|
||||||
|
use futures::task::Waker;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
use std::any::Any;
|
||||||
|
use std::cell::UnsafeCell;
|
||||||
|
use std::marker::PhantomPinned;
|
||||||
|
use std::mem::replace;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::ptr::NonNull;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::rc::Weak;
|
||||||
|
|
||||||
|
impl<F: Future> Cancelable<F> {
|
||||||
|
pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
|
||||||
|
let head_node = RcRef::map(cancel_handle, |r| &r.node);
|
||||||
|
let registration = Registration::WillRegister { head_node };
|
||||||
|
Self::Pending {
|
||||||
|
future,
|
||||||
|
registration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn poll_pending(
|
||||||
|
future: Pin<&mut F>,
|
||||||
|
mut registration: Pin<&mut Registration>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<Result<F::Output, Canceled>> {
|
||||||
|
// If this future is being polled for the first time, perform an extra
|
||||||
|
// cancellation check _before_ polling the inner future. The reason to do
|
||||||
|
// this is that polling the inner future for the first time might start
|
||||||
|
// some activity that cannot actually be canceled (e.g. running a compute
|
||||||
|
// job in a thread pool), so we should try to never start it at all.
|
||||||
|
match &*registration {
|
||||||
|
Registration::WillRegister { head_node } if head_node.is_canceled() => {
|
||||||
|
return Poll::Ready(Err(Canceled));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
match future.poll(cx) {
|
||||||
|
Poll::Ready(res) => return Poll::Ready(Ok(res)),
|
||||||
|
Poll::Pending => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register this future with its `CancelHandle`, saving the `Waker` that
|
||||||
|
// can be used to make the runtime poll this future when it is canceled.
|
||||||
|
// When already registered, update the stored `Waker` if necessary.
|
||||||
|
let head_node = match &*registration {
|
||||||
|
Registration::WillRegister { .. } => {
|
||||||
|
match registration.as_mut().project_replace(Default::default()) {
|
||||||
|
RegistrationProjectionOwned::WillRegister { head_node } => {
|
||||||
|
Some(head_node)
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let node = match registration.project() {
|
||||||
|
RegistrationProjection::Registered { node } => node,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
node.register(cx.waker(), head_node)?;
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future> TryCancelable<F> {
|
||||||
|
pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Cancelable::new(future, cancel_handle),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project(project = RegistrationProjection,
|
||||||
|
project_replace = RegistrationProjectionOwned)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Registration {
|
||||||
|
WillRegister {
|
||||||
|
head_node: RcRef<Node>,
|
||||||
|
},
|
||||||
|
Registered {
|
||||||
|
#[pin]
|
||||||
|
node: Node,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Registration {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Registered {
|
||||||
|
node: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Node {
|
||||||
|
inner: UnsafeCell<NodeInner>,
|
||||||
|
_pin: PhantomPinned,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Node {
|
||||||
|
/// If necessary, register a `Cancelable` node with a `CancelHandle`, and
|
||||||
|
/// save or update the `Waker` that can wake with this cancelable future.
|
||||||
|
pub fn register(
|
||||||
|
&self,
|
||||||
|
waker: &Waker,
|
||||||
|
head_rc: Option<RcRef<Node>>,
|
||||||
|
) -> Result<(), Canceled> {
|
||||||
|
match head_rc.as_ref().map(RcRef::split) {
|
||||||
|
Some((head, rc)) => {
|
||||||
|
// Register this `Cancelable` node with a `CancelHandle` head node.
|
||||||
|
assert_ne!(self, head);
|
||||||
|
let self_inner = unsafe { &mut *self.inner.get() };
|
||||||
|
let head_inner = unsafe { &mut *head.inner.get() };
|
||||||
|
self_inner.link(waker, head_inner, rc)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// This `Cancelable` has already been linked to a `CancelHandle` head
|
||||||
|
// node; just update our stored `Waker` if necessary.
|
||||||
|
let inner = unsafe { &mut *self.inner.get() };
|
||||||
|
inner.update_waker(waker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cancel(&self) {
|
||||||
|
let inner = unsafe { &mut *self.inner.get() };
|
||||||
|
inner.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_canceled(&self) -> bool {
|
||||||
|
let inner = unsafe { &mut *self.inner.get() };
|
||||||
|
inner.is_canceled()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Node {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: UnsafeCell::new(NodeInner::Unlinked),
|
||||||
|
_pin: PhantomPinned,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Node {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let inner = unsafe { &mut *self.inner.get() };
|
||||||
|
inner.unlink();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Node {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self as *const _ == other as *const _
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum NodeInner {
|
||||||
|
Unlinked,
|
||||||
|
Linked {
|
||||||
|
kind: NodeKind,
|
||||||
|
prev: NonNull<NodeInner>,
|
||||||
|
next: NonNull<NodeInner>,
|
||||||
|
},
|
||||||
|
Canceled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeInner {
|
||||||
|
fn as_non_null(&mut self) -> NonNull<Self> {
|
||||||
|
NonNull::from(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn link(
|
||||||
|
&mut self,
|
||||||
|
waker: &Waker,
|
||||||
|
head: &mut Self,
|
||||||
|
rc_pin: &Rc<dyn Any>,
|
||||||
|
) -> Result<(), Canceled> {
|
||||||
|
// The future should not have been linked to a cancel handle before.
|
||||||
|
assert!(matches!(self, NodeInner::Unlinked));
|
||||||
|
|
||||||
|
match head {
|
||||||
|
NodeInner::Unlinked => {
|
||||||
|
*head = NodeInner::Linked {
|
||||||
|
kind: NodeKind::head(rc_pin),
|
||||||
|
prev: self.as_non_null(),
|
||||||
|
next: self.as_non_null(),
|
||||||
|
};
|
||||||
|
*self = NodeInner::Linked {
|
||||||
|
kind: NodeKind::item(waker),
|
||||||
|
prev: head.as_non_null(),
|
||||||
|
next: head.as_non_null(),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Head { .. },
|
||||||
|
prev: next_prev_nn,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
let prev = unsafe { &mut *next_prev_nn.as_ptr() };
|
||||||
|
match prev {
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Item { .. },
|
||||||
|
next: prev_next_nn,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
*self = NodeInner::Linked {
|
||||||
|
kind: NodeKind::item(waker),
|
||||||
|
prev: replace(next_prev_nn, self.as_non_null()),
|
||||||
|
next: replace(prev_next_nn, self.as_non_null()),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NodeInner::Canceled => Err(Canceled),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
|
||||||
|
match self {
|
||||||
|
NodeInner::Unlinked => Ok(()),
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Item { waker },
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
if !waker.will_wake(new_waker) {
|
||||||
|
*waker = new_waker.clone();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
NodeInner::Canceled => Err(Canceled),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If this node is linked to other nodes, remove it from the chain. This
|
||||||
|
/// method is called (only) by the drop handler for `Node`. It is suitable
|
||||||
|
/// for both 'head' and 'item' nodes.
|
||||||
|
fn unlink(&mut self) {
|
||||||
|
if let NodeInner::Linked {
|
||||||
|
prev: mut prev_nn,
|
||||||
|
next: mut next_nn,
|
||||||
|
..
|
||||||
|
} = replace(self, NodeInner::Unlinked)
|
||||||
|
{
|
||||||
|
if prev_nn == next_nn {
|
||||||
|
// There were only two nodes in this chain; after unlinking ourselves
|
||||||
|
// the other node is no longer linked.
|
||||||
|
let other = unsafe { prev_nn.as_mut() };
|
||||||
|
*other = NodeInner::Unlinked;
|
||||||
|
} else {
|
||||||
|
// The chain had more than two nodes.
|
||||||
|
match unsafe { prev_nn.as_mut() } {
|
||||||
|
NodeInner::Linked {
|
||||||
|
next: prev_next_nn, ..
|
||||||
|
} => {
|
||||||
|
*prev_next_nn = next_nn;
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
match unsafe { next_nn.as_mut() } {
|
||||||
|
NodeInner::Linked {
|
||||||
|
prev: next_prev_nn, ..
|
||||||
|
} => {
|
||||||
|
*next_prev_nn = prev_nn;
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark this node and all linked nodes for cancellation. Note that `self`
|
||||||
|
/// must refer to a head (`CancelHandle`) node.
|
||||||
|
fn cancel(&mut self) {
|
||||||
|
let mut head_nn = NonNull::from(self);
|
||||||
|
let mut item_nn;
|
||||||
|
|
||||||
|
// Mark the head node as canceled.
|
||||||
|
match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Head { .. },
|
||||||
|
next: next_nn,
|
||||||
|
..
|
||||||
|
} => item_nn = next_nn,
|
||||||
|
NodeInner::Unlinked | NodeInner::Canceled => return,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Cancel all item nodes in the chain, waking each stored `Waker`.
|
||||||
|
while item_nn != head_nn {
|
||||||
|
match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Item { waker },
|
||||||
|
next: next_nn,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
waker.wake();
|
||||||
|
item_nn = next_nn;
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if this node has been marked for cancellation. Note that
|
||||||
|
/// `self` must refer to a head (`CancelHandle`) node.
|
||||||
|
fn is_canceled(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
NodeInner::Unlinked => false,
|
||||||
|
NodeInner::Linked {
|
||||||
|
kind: NodeKind::Head { .. },
|
||||||
|
..
|
||||||
|
} => false,
|
||||||
|
NodeInner::Canceled => true,
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum NodeKind {
|
||||||
|
/// In a chain of linked nodes, the "head" node is owned by the
|
||||||
|
/// `CancelHandle`. A chain usually contains at most one head node; however
|
||||||
|
/// when a `CancelHandle` is dropped before the futures associated with it
|
||||||
|
/// are dropped, a chain may temporarily contain no head node at all.
|
||||||
|
Head {
|
||||||
|
/// The `weak_pin` field adds adds a weak reference to the `Rc` guarding
|
||||||
|
/// the heap allocation that contains the `CancelHandle`. Without this
|
||||||
|
/// extra weak reference, `Rc::get_mut()` might succeed and allow the
|
||||||
|
/// `CancelHandle` to be moved when it isn't safe to do so.
|
||||||
|
weak_pin: Weak<dyn Any>,
|
||||||
|
},
|
||||||
|
/// All item nodes in a chain are associated with a `Cancelable` head node.
|
||||||
|
Item {
|
||||||
|
/// If this future indeed does get canceled, the waker is needed to make
|
||||||
|
/// sure that the canceled future gets polled as soon as possible.
|
||||||
|
waker: Waker,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeKind {
|
||||||
|
fn head(rc_pin: &Rc<dyn Any>) -> Self {
|
||||||
|
let weak_pin = Rc::downgrade(rc_pin);
|
||||||
|
Self::Head { weak_pin }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn item(waker: &Waker) -> Self {
|
||||||
|
let waker = waker.clone();
|
||||||
|
Self::Item { waker }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::error::AnyError;
|
||||||
|
use futures::future::pending;
|
||||||
|
use futures::future::poll_fn;
|
||||||
|
use futures::future::ready;
|
||||||
|
use futures::future::FutureExt;
|
||||||
|
use futures::future::TryFutureExt;
|
||||||
|
use futures::select;
|
||||||
|
use futures::task::noop_waker_ref;
|
||||||
|
use futures::task::Context;
|
||||||
|
use futures::task::Poll;
|
||||||
|
use std::convert::Infallible as Never;
|
||||||
|
use std::io;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::spawn;
|
||||||
|
|
||||||
|
fn box_fused<'a, F: FusedFuture + 'a>(
|
||||||
|
future: F,
|
||||||
|
) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
|
||||||
|
Box::pin(future)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ready_in_n(name: &str, count: usize) -> &str {
|
||||||
|
let mut remaining = count as isize;
|
||||||
|
poll_fn(|_| {
|
||||||
|
assert!(remaining >= 0);
|
||||||
|
if remaining == 0 {
|
||||||
|
Poll::Ready(name)
|
||||||
|
} else {
|
||||||
|
remaining -= 1;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_future() {
|
||||||
|
let cancel_now = CancelHandle::new_rc();
|
||||||
|
let cancel_at_0 = CancelHandle::new_rc();
|
||||||
|
let cancel_at_1 = CancelHandle::new_rc();
|
||||||
|
let cancel_at_4 = CancelHandle::new_rc();
|
||||||
|
let cancel_never = CancelHandle::new_rc();
|
||||||
|
|
||||||
|
cancel_now.cancel();
|
||||||
|
|
||||||
|
let mut futures = vec![
|
||||||
|
box_fused(ready("A").or_cancel(&cancel_now)),
|
||||||
|
box_fused(ready("B").or_cancel(&cancel_at_0)),
|
||||||
|
box_fused(ready("C").or_cancel(&cancel_at_1)),
|
||||||
|
box_fused(
|
||||||
|
ready_in_n("D", 0)
|
||||||
|
.or_cancel(&cancel_never)
|
||||||
|
.try_or_cancel(&cancel_now),
|
||||||
|
),
|
||||||
|
box_fused(
|
||||||
|
ready_in_n("E", 1)
|
||||||
|
.or_cancel(&cancel_at_1)
|
||||||
|
.try_or_cancel(&cancel_at_1),
|
||||||
|
),
|
||||||
|
box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
|
||||||
|
box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
|
||||||
|
box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
|
||||||
|
box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
|
||||||
|
box_fused(ready_in_n("J", 5).map(Ok)),
|
||||||
|
box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut cx = Context::from_waker(noop_waker_ref());
|
||||||
|
|
||||||
|
for i in 0..=5 {
|
||||||
|
match i {
|
||||||
|
0 => cancel_at_0.cancel(),
|
||||||
|
1 => cancel_at_1.cancel(),
|
||||||
|
4 => cancel_at_4.cancel(),
|
||||||
|
2 | 3 | 5 => {}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
|
||||||
|
let results = futures
|
||||||
|
.iter_mut()
|
||||||
|
.filter(|fut| !fut.is_terminated())
|
||||||
|
.filter_map(|fut| match fut.poll_unpin(&mut cx) {
|
||||||
|
Poll::Pending => None,
|
||||||
|
Poll::Ready(res) => Some(res),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
match i {
|
||||||
|
0 => assert_eq!(
|
||||||
|
results,
|
||||||
|
[Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
|
||||||
|
),
|
||||||
|
1 => assert_eq!(results, [Ok("E"), Err(Canceled)]),
|
||||||
|
2 => assert_eq!(results, []),
|
||||||
|
3 => assert_eq!(results, [Ok("G")]),
|
||||||
|
4 => assert_eq!(results, [Ok("H"), Err(Canceled)]),
|
||||||
|
5 => assert_eq!(results, [Ok("J"), Ok("K")]),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(futures.into_iter().any(|fut| !fut.is_terminated()), false);
|
||||||
|
|
||||||
|
let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
|
||||||
|
assert_eq!(cancel_handles.iter().any(|c| !c.is_canceled()), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn cancel_try_future() {
|
||||||
|
{
|
||||||
|
// Cancel a spawned task before it actually runs.
|
||||||
|
let cancel_handle = Rc::new(CancelHandle::new());
|
||||||
|
let future = spawn(async { panic!("the task should not be spawned") })
|
||||||
|
.map_err(AnyError::from)
|
||||||
|
.try_or_cancel(&cancel_handle);
|
||||||
|
cancel_handle.cancel();
|
||||||
|
let error = future.await.unwrap_err();
|
||||||
|
assert!(error.downcast_ref::<Canceled>().is_some());
|
||||||
|
assert_eq!(error.to_string().as_str(), "operation canceled");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Cancel a network I/O future right after polling it.
|
||||||
|
let cancel_handle = Rc::new(CancelHandle::new());
|
||||||
|
let result = loop {
|
||||||
|
select! {
|
||||||
|
r = TcpStream::connect("1.2.3.4:12345")
|
||||||
|
.try_or_cancel(&cancel_handle) => break r,
|
||||||
|
default => cancel_handle.cancel(),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
let error = result.unwrap_err();
|
||||||
|
assert_eq!(error.kind(), io::ErrorKind::Interrupted);
|
||||||
|
assert_eq!(error.to_string().as_str(), "operation canceled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_handle_pinning() {
|
||||||
|
let mut cancel_handle = CancelHandle::new_rc();
|
||||||
|
|
||||||
|
// There is only one reference to `cancel_handle`, so `Rc::get_mut()` should
|
||||||
|
// succeed.
|
||||||
|
assert!(Rc::get_mut(&mut cancel_handle).is_some());
|
||||||
|
|
||||||
|
let mut future = pending::<Never>().or_cancel(&cancel_handle);
|
||||||
|
let future = unsafe { Pin::new_unchecked(&mut future) };
|
||||||
|
|
||||||
|
// There are two `Rc<CancelHandle>` references now, so this fails.
|
||||||
|
assert!(Rc::get_mut(&mut cancel_handle).is_none());
|
||||||
|
|
||||||
|
let mut cx = Context::from_waker(noop_waker_ref());
|
||||||
|
assert!(future.poll(&mut cx).is_pending());
|
||||||
|
|
||||||
|
// Polling `future` has established a link between the future and
|
||||||
|
// `cancel_handle`, so both values should be pinned at this point.
|
||||||
|
assert!(Rc::get_mut(&mut cancel_handle).is_none());
|
||||||
|
|
||||||
|
cancel_handle.cancel();
|
||||||
|
|
||||||
|
// Canceling or dropping the associated future(s) unlinks them from the
|
||||||
|
// cancel handle, therefore `cancel_handle` can now safely be moved again.
|
||||||
|
assert!(Rc::get_mut(&mut cancel_handle).is_some());
|
||||||
|
}
|
||||||
|
}
|
|
@ -126,6 +126,7 @@ impl<T> RcRef<AsyncRefCell<T>> {
|
||||||
/// let foo_rc: RcRef<u32> = RcRef::map(stuff_rc.clone(), |v| &v.foo);
|
/// let foo_rc: RcRef<u32> = RcRef::map(stuff_rc.clone(), |v| &v.foo);
|
||||||
/// let bar_rc: RcRef<String> = RcRef::map(stuff_rc, |v| &v.bar);
|
/// let bar_rc: RcRef<String> = RcRef::map(stuff_rc, |v| &v.bar);
|
||||||
/// ```
|
/// ```
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct RcRef<T> {
|
pub struct RcRef<T> {
|
||||||
rc: Rc<dyn Any>,
|
rc: Rc<dyn Any>,
|
||||||
value: *const T,
|
value: *const T,
|
||||||
|
@ -136,7 +137,7 @@ impl<T: 'static> RcRef<T> {
|
||||||
Self::from(Rc::new(value))
|
Self::from(Rc::new(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn map<S: 'static, R: i::RcLike<S>, F: FnOnce(&S) -> &T>(
|
pub fn map<S: 'static, R: RcLike<S>, F: FnOnce(&S) -> &T>(
|
||||||
source: R,
|
source: R,
|
||||||
map_fn: F,
|
map_fn: F,
|
||||||
) -> RcRef<T> {
|
) -> RcRef<T> {
|
||||||
|
@ -144,6 +145,11 @@ impl<T: 'static> RcRef<T> {
|
||||||
let value = map_fn(unsafe { &*value });
|
let value = map_fn(unsafe { &*value });
|
||||||
RcRef { rc, value }
|
RcRef { rc, value }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn split(rc_ref: &Self) -> (&T, &Rc<dyn Any>) {
|
||||||
|
let &Self { ref rc, value } = rc_ref;
|
||||||
|
(unsafe { &*value }, rc)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Default + 'static> Default for RcRef<T> {
|
impl<T: Default + 'static> Default for RcRef<T> {
|
||||||
|
@ -152,6 +158,21 @@ impl<T: Default + 'static> Default for RcRef<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for RcRef<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
rc: self.rc.clone(),
|
||||||
|
value: self.value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> From<&RcRef<T>> for RcRef<T> {
|
||||||
|
fn from(rc_ref: &RcRef<T>) -> Self {
|
||||||
|
rc_ref.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: 'static> From<Rc<T>> for RcRef<T> {
|
impl<T: 'static> From<Rc<T>> for RcRef<T> {
|
||||||
fn from(rc: Rc<T>) -> Self {
|
fn from(rc: Rc<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -161,12 +182,9 @@ impl<T: 'static> From<Rc<T>> for RcRef<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for RcRef<T> {
|
impl<T: 'static> From<&Rc<T>> for RcRef<T> {
|
||||||
fn clone(&self) -> Self {
|
fn from(rc: &Rc<T>) -> Self {
|
||||||
Self {
|
rc.clone().into()
|
||||||
rc: self.rc.clone(),
|
|
||||||
value: self.value,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,8 +207,18 @@ impl<T> AsRef<T> for RcRef<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`,
|
||||||
|
/// so that applicable methods can operate on either type.
|
||||||
|
pub trait RcLike<T>: AsRef<T> + Into<RcRef<T>> {}
|
||||||
|
|
||||||
|
impl<T: 'static> RcLike<T> for Rc<T> {}
|
||||||
|
impl<T: 'static> RcLike<T> for RcRef<T> {}
|
||||||
|
impl<T: 'static> RcLike<T> for &Rc<T> {}
|
||||||
|
impl<T: 'static> RcLike<T> for &RcRef<T> {}
|
||||||
|
|
||||||
mod internal {
|
mod internal {
|
||||||
use super::AsyncRefCell;
|
use super::AsyncRefCell;
|
||||||
|
use super::RcLike;
|
||||||
use super::RcRef;
|
use super::RcRef;
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
|
@ -204,32 +232,29 @@ mod internal {
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::ops::DerefMut;
|
use std::ops::DerefMut;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
|
||||||
|
|
||||||
impl<T> AsyncRefCell<T> {
|
impl<T> AsyncRefCell<T> {
|
||||||
/// Borrow the cell's contents synchronouslym without creating an
|
/// Borrow the cell's contents synchronouslym without creating an
|
||||||
/// intermediate future. If the cell has already been borrowed and either
|
/// intermediate future. If the cell has already been borrowed and either
|
||||||
/// the existing or the requested borrow is exclusive, this function returns
|
/// the existing or the requested borrow is exclusive, this function returns
|
||||||
/// `None`.
|
/// `None`.
|
||||||
pub(super) fn borrow_sync<
|
pub fn borrow_sync<M: BorrowModeTrait, R: RcLike<AsyncRefCell<T>>>(
|
||||||
M: BorrowModeTrait,
|
cell: R,
|
||||||
R: RcLike<AsyncRefCell<T>>,
|
|
||||||
>(
|
|
||||||
cell: &R,
|
|
||||||
) -> Option<AsyncBorrowImpl<T, M>> {
|
) -> Option<AsyncBorrowImpl<T, M>> {
|
||||||
|
let cell_ref = cell.as_ref();
|
||||||
// Don't allow synchronous borrows to cut in line; if there are any
|
// Don't allow synchronous borrows to cut in line; if there are any
|
||||||
// enqueued waiters, return `None`, even if the current borrow is a shared
|
// enqueued waiters, return `None`, even if the current borrow is a shared
|
||||||
// one and the requested borrow is too.
|
// one and the requested borrow is too.
|
||||||
let waiters = unsafe { &mut *cell.waiters.as_ptr() };
|
let waiters = unsafe { &mut *cell_ref.waiters.as_ptr() };
|
||||||
if waiters.is_empty() {
|
if waiters.is_empty() {
|
||||||
// There are no enqueued waiters, but it is still possible that the cell
|
// There are no enqueued waiters, but it is still possible that the cell
|
||||||
// is currently borrowed. If there are no current borrows, or both the
|
// is currently borrowed. If there are no current borrows, or both the
|
||||||
// existing and requested ones are shared, `try_add()` returns the
|
// existing and requested ones are shared, `try_add()` returns the
|
||||||
// adjusted borrow count.
|
// adjusted borrow count.
|
||||||
let new_borrow_count =
|
let new_borrow_count =
|
||||||
cell.borrow_count.get().try_add(M::borrow_mode())?;
|
cell_ref.borrow_count.get().try_add(M::borrow_mode())?;
|
||||||
cell.borrow_count.set(new_borrow_count);
|
cell_ref.borrow_count.set(new_borrow_count);
|
||||||
Some(AsyncBorrowImpl::<T, M>::new(cell.clone().into()))
|
Some(AsyncBorrowImpl::<T, M>::new(cell.into()))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -359,10 +384,10 @@ mod internal {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, M: BorrowModeTrait> AsyncBorrowFutureImpl<T, M> {
|
impl<T, M: BorrowModeTrait> AsyncBorrowFutureImpl<T, M> {
|
||||||
pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: &R) -> Self {
|
pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: R) -> Self {
|
||||||
Self {
|
Self {
|
||||||
cell: Some(cell.clone().into()),
|
id: cell.as_ref().create_waiter::<M>(),
|
||||||
id: cell.create_waiter::<M>(),
|
cell: Some(cell.into()),
|
||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -561,13 +586,6 @@ mod internal {
|
||||||
self.waker.take()
|
self.waker.take()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`,
|
|
||||||
/// so that applicable methods can operate on either type.
|
|
||||||
pub trait RcLike<T>: Clone + Deref<Target = T> + Into<RcRef<T>> {}
|
|
||||||
|
|
||||||
impl<T: 'static> RcLike<T> for Rc<T> {}
|
|
||||||
impl<T: 'static> RcLike<T> for RcRef<T> {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -3,10 +3,10 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
use deno_core::AsyncMutFuture;
|
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncRefFuture;
|
|
||||||
use deno_core::BufVec;
|
use deno_core::BufVec;
|
||||||
|
use deno_core::CancelHandle;
|
||||||
|
use deno_core::CancelTryFuture;
|
||||||
use deno_core::JsRuntime;
|
use deno_core::JsRuntime;
|
||||||
use deno_core::Op;
|
use deno_core::Op;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
|
@ -46,51 +46,65 @@ impl log::Log for Logger {
|
||||||
fn flush(&self) {}
|
fn flush(&self) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
|
// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell,
|
||||||
// a cell, because it only supports one op (`accept`) which does not require
|
// because it only supports one op (`accept`) which does not require a mutable
|
||||||
// a mutable reference to the listener.
|
// reference to the listener.
|
||||||
struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
|
struct TcpListener {
|
||||||
|
inner: tokio::net::TcpListener,
|
||||||
impl Resource for TcpListener {}
|
cancel: CancelHandle,
|
||||||
|
}
|
||||||
|
|
||||||
impl TcpListener {
|
impl TcpListener {
|
||||||
/// Returns a future that yields a shared borrow of the TCP listener.
|
async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> {
|
||||||
fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
|
let cancel = RcRef::map(&self, |r| &r.cancel);
|
||||||
RcRef::map(self, |r| &r.0).borrow()
|
let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into();
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for TcpListener {
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<std::net::TcpListener> for TcpListener {
|
impl TryFrom<std::net::TcpListener> for TcpListener {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
|
fn try_from(
|
||||||
tokio::net::TcpListener::try_from(l)
|
std_listener: std::net::TcpListener,
|
||||||
.map(AsyncRefCell::new)
|
) -> Result<Self, Self::Error> {
|
||||||
.map(Self)
|
tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self {
|
||||||
|
inner: tokio_listener,
|
||||||
|
cancel: Default::default(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TcpStream {
|
struct TcpStream {
|
||||||
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
|
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
|
||||||
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
|
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
|
||||||
|
// When a `TcpStream` resource is closed, all pending 'read' ops are
|
||||||
|
// canceled, while 'write' ops are allowed to complete. Therefore only
|
||||||
|
// 'read' futures are attached to this cancel handle.
|
||||||
|
cancel: CancelHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Resource for TcpStream {}
|
|
||||||
|
|
||||||
impl TcpStream {
|
impl TcpStream {
|
||||||
/// Returns a future that yields an exclusive borrow of the read end of the
|
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> {
|
||||||
/// tcp stream.
|
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
|
||||||
fn rd_borrow_mut(
|
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||||
self: Rc<Self>,
|
rd.read(buf).try_or_cancel(cancel).await
|
||||||
) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
|
|
||||||
RcRef::map(self, |r| &r.rd).borrow_mut()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a future that yields an exclusive borrow of the write end of the
|
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> {
|
||||||
/// tcp stream.
|
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
|
||||||
fn wr_borrow_mut(
|
wr.write(buf).await
|
||||||
self: Rc<Self>,
|
}
|
||||||
) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
|
}
|
||||||
RcRef::map(self, |r| &r.wr).borrow_mut()
|
|
||||||
|
impl Resource for TcpStream {
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,6 +114,7 @@ impl From<tokio::net::TcpStream> for TcpStream {
|
||||||
Self {
|
Self {
|
||||||
rd: rd.into(),
|
rd: rd.into(),
|
||||||
wr: wr.into(),
|
wr: wr.into(),
|
||||||
|
cancel: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -179,14 +194,12 @@ async fn op_accept(
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
debug!("accept rid={}", rid);
|
debug!("accept rid={}", rid);
|
||||||
|
|
||||||
let listener_rc = state
|
let listener = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpListener>(rid)
|
.get::<TcpListener>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let listener_ref = listener_rc.borrow().await;
|
let stream = listener.accept().await?;
|
||||||
|
|
||||||
let stream: TcpStream = listener_ref.accept().await?.0.into();
|
|
||||||
let rid = state.borrow_mut().resource_table_2.add(stream);
|
let rid = state.borrow_mut().resource_table_2.add(stream);
|
||||||
Ok(rid)
|
Ok(rid)
|
||||||
}
|
}
|
||||||
|
@ -199,14 +212,12 @@ async fn op_read(
|
||||||
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
|
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
|
||||||
debug!("read rid={}", rid);
|
debug!("read rid={}", rid);
|
||||||
|
|
||||||
let stream_rc = state
|
let stream = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpStream>(rid)
|
.get::<TcpStream>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
|
stream.read(&mut bufs[0]).await
|
||||||
|
|
||||||
rd_stream_mut.read(&mut bufs[0]).await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn op_write(
|
async fn op_write(
|
||||||
|
@ -217,14 +228,12 @@ async fn op_write(
|
||||||
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
|
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
|
||||||
debug!("write rid={}", rid);
|
debug!("write rid={}", rid);
|
||||||
|
|
||||||
let stream_rc = state
|
let stream = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpStream>(rid)
|
.get::<TcpStream>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
|
stream.write(&bufs[0]).await
|
||||||
|
|
||||||
wr_stream_mut.write(&bufs[0]).await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_op_bin_sync<F>(
|
fn register_op_bin_sync<F>(
|
||||||
|
|
|
@ -5,10 +5,10 @@ extern crate log;
|
||||||
|
|
||||||
use deno_core::error::bad_resource_id;
|
use deno_core::error::bad_resource_id;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::AsyncMutFuture;
|
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncRefFuture;
|
|
||||||
use deno_core::BufVec;
|
use deno_core::BufVec;
|
||||||
|
use deno_core::CancelHandle;
|
||||||
|
use deno_core::CancelTryFuture;
|
||||||
use deno_core::JsRuntime;
|
use deno_core::JsRuntime;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::RcRef;
|
use deno_core::RcRef;
|
||||||
|
@ -41,51 +41,65 @@ impl log::Log for Logger {
|
||||||
fn flush(&self) {}
|
fn flush(&self) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
|
// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell,
|
||||||
// a cell, because it only supports one op (`accept`) which does not require
|
// because it only supports one op (`accept`) which does not require a mutable
|
||||||
// a mutable reference to the listener.
|
// reference to the listener.
|
||||||
struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
|
struct TcpListener {
|
||||||
|
inner: tokio::net::TcpListener,
|
||||||
impl Resource for TcpListener {}
|
cancel: CancelHandle,
|
||||||
|
}
|
||||||
|
|
||||||
impl TcpListener {
|
impl TcpListener {
|
||||||
/// Returns a future that yields a shared borrow of the TCP listener.
|
async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> {
|
||||||
fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
|
let cancel = RcRef::map(&self, |r| &r.cancel);
|
||||||
RcRef::map(self, |r| &r.0).borrow()
|
let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into();
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for TcpListener {
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<std::net::TcpListener> for TcpListener {
|
impl TryFrom<std::net::TcpListener> for TcpListener {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
|
fn try_from(
|
||||||
tokio::net::TcpListener::try_from(l)
|
std_listener: std::net::TcpListener,
|
||||||
.map(AsyncRefCell::new)
|
) -> Result<Self, Self::Error> {
|
||||||
.map(Self)
|
tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self {
|
||||||
|
inner: tokio_listener,
|
||||||
|
cancel: Default::default(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TcpStream {
|
struct TcpStream {
|
||||||
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
|
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
|
||||||
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
|
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
|
||||||
|
// When a `TcpStream` resource is closed, all pending 'read' ops are
|
||||||
|
// canceled, while 'write' ops are allowed to complete. Therefore only
|
||||||
|
// 'read' futures are attached to this cancel handle.
|
||||||
|
cancel: CancelHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Resource for TcpStream {}
|
|
||||||
|
|
||||||
impl TcpStream {
|
impl TcpStream {
|
||||||
/// Returns a future that yields an exclusive borrow of the read end of the
|
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> {
|
||||||
/// tcp stream.
|
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
|
||||||
fn rd_borrow_mut(
|
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||||
self: Rc<Self>,
|
rd.read(buf).try_or_cancel(cancel).await
|
||||||
) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
|
|
||||||
RcRef::map(self, |r| &r.rd).borrow_mut()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a future that yields an exclusive borrow of the write end of the
|
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> {
|
||||||
/// tcp stream.
|
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
|
||||||
fn wr_borrow_mut(
|
wr.write(buf).await
|
||||||
self: Rc<Self>,
|
}
|
||||||
) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
|
}
|
||||||
RcRef::map(self, |r| &r.wr).borrow_mut()
|
|
||||||
|
impl Resource for TcpStream {
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +109,7 @@ impl From<tokio::net::TcpStream> for TcpStream {
|
||||||
Self {
|
Self {
|
||||||
rd: rd.into(),
|
rd: rd.into(),
|
||||||
wr: wr.into(),
|
wr: wr.into(),
|
||||||
|
cancel: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -157,14 +172,12 @@ async fn op_accept(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
debug!("accept rid={}", rid);
|
debug!("accept rid={}", rid);
|
||||||
|
|
||||||
let listener_rc = state
|
let listener = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpListener>(rid)
|
.get::<TcpListener>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let listener_ref = listener_rc.borrow().await;
|
let stream = listener.accept().await?;
|
||||||
|
|
||||||
let stream: TcpStream = listener_ref.accept().await?.0.into();
|
|
||||||
let rid = state.borrow_mut().resource_table_2.add(stream);
|
let rid = state.borrow_mut().resource_table_2.add(stream);
|
||||||
Ok(serde_json::json!({ "rid": rid }))
|
Ok(serde_json::json!({ "rid": rid }))
|
||||||
}
|
}
|
||||||
|
@ -184,14 +197,12 @@ async fn op_read(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
debug!("read rid={}", rid);
|
debug!("read rid={}", rid);
|
||||||
|
|
||||||
let stream_rc = state
|
let stream = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpStream>(rid)
|
.get::<TcpStream>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
|
let nread = stream.read(&mut bufs[0]).await?;
|
||||||
|
|
||||||
let nread = rd_stream_mut.read(&mut bufs[0]).await?;
|
|
||||||
Ok(serde_json::json!({ "nread": nread }))
|
Ok(serde_json::json!({ "nread": nread }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,14 +221,12 @@ async fn op_write(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
debug!("write rid={}", rid);
|
debug!("write rid={}", rid);
|
||||||
|
|
||||||
let stream_rc = state
|
let stream = state
|
||||||
.borrow()
|
.borrow()
|
||||||
.resource_table_2
|
.resource_table_2
|
||||||
.get::<TcpStream>(rid)
|
.get::<TcpStream>(rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
|
let nwritten = stream.write(&bufs[0]).await?;
|
||||||
|
|
||||||
let nwritten = wr_stream_mut.write(&bufs[0]).await?;
|
|
||||||
Ok(serde_json::json!({ "nwritten": nwritten }))
|
Ok(serde_json::json!({ "nwritten": nwritten }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ extern crate lazy_static;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
|
mod async_cancel;
|
||||||
mod async_cell;
|
mod async_cell;
|
||||||
mod bindings;
|
mod bindings;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
@ -28,11 +29,18 @@ pub use serde;
|
||||||
pub use serde_json;
|
pub use serde_json;
|
||||||
pub use url;
|
pub use url;
|
||||||
|
|
||||||
|
pub use crate::async_cancel::CancelFuture;
|
||||||
|
pub use crate::async_cancel::CancelHandle;
|
||||||
|
pub use crate::async_cancel::CancelTryFuture;
|
||||||
|
pub use crate::async_cancel::Cancelable;
|
||||||
|
pub use crate::async_cancel::Canceled;
|
||||||
|
pub use crate::async_cancel::TryCancelable;
|
||||||
pub use crate::async_cell::AsyncMut;
|
pub use crate::async_cell::AsyncMut;
|
||||||
pub use crate::async_cell::AsyncMutFuture;
|
pub use crate::async_cell::AsyncMutFuture;
|
||||||
pub use crate::async_cell::AsyncRef;
|
pub use crate::async_cell::AsyncRef;
|
||||||
pub use crate::async_cell::AsyncRefCell;
|
pub use crate::async_cell::AsyncRefCell;
|
||||||
pub use crate::async_cell::AsyncRefFuture;
|
pub use crate::async_cell::AsyncRefFuture;
|
||||||
|
pub use crate::async_cell::RcLike;
|
||||||
pub use crate::async_cell::RcRef;
|
pub use crate::async_cell::RcRef;
|
||||||
pub use crate::flags::v8_set_flags;
|
pub use crate::flags::v8_set_flags;
|
||||||
pub use crate::module_specifier::ModuleResolutionError;
|
pub use crate::module_specifier::ModuleResolutionError;
|
||||||
|
|
|
@ -24,6 +24,11 @@ pub trait Resource: Any + 'static {
|
||||||
fn name(&self) -> Cow<str> {
|
fn name(&self) -> Cow<str> {
|
||||||
type_name::<Self>().into()
|
type_name::<Self>().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resources may implement the `close()` trait method if they need to do
|
||||||
|
/// resource specific clean-ups, such as cancelling pending futures, after a
|
||||||
|
/// resource has been removed from the resource table.
|
||||||
|
fn close(self: Rc<Self>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl dyn Resource {
|
impl dyn Resource {
|
||||||
|
@ -117,7 +122,7 @@ impl ResourceTable {
|
||||||
/// cause the resource to be dropped. However, since resources are reference
|
/// cause the resource to be dropped. However, since resources are reference
|
||||||
/// counted, therefore pending ops are not automatically cancelled.
|
/// counted, therefore pending ops are not automatically cancelled.
|
||||||
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
|
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
|
||||||
self.index.remove(&rid).map(|_| ())
|
self.index.remove(&rid).map(|resource| resource.close())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator that yields a `(id, name)` pair for every resource
|
/// Returns an iterator that yields a `(id, name)` pair for every resource
|
||||||
|
|
Loading…
Reference in a new issue