mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table and "AsyncRefCell". Old implementation of resource table was completely removed and all code referencing it was updated to use new system.
This commit is contained in:
parent
9fe26f8ca1
commit
6984b63f2f
26 changed files with 1237 additions and 1221 deletions
|
@ -21,8 +21,6 @@ unitTest({ perms: { net: true } }, function netTcpListenClose(): void {
|
|||
unitTest(
|
||||
{
|
||||
perms: { net: true },
|
||||
// TODO:
|
||||
ignore: Deno.build.os === "windows",
|
||||
},
|
||||
function netUdpListenClose(): void {
|
||||
const socket = Deno.listenDatagram({
|
||||
|
@ -257,7 +255,7 @@ unitTest(
|
|||
);
|
||||
|
||||
unitTest(
|
||||
{ ignore: Deno.build.os === "windows", perms: { net: true } },
|
||||
{ perms: { net: true } },
|
||||
async function netUdpSendReceive(): Promise<void> {
|
||||
const alice = Deno.listenDatagram({ port: 3500, transport: "udp" });
|
||||
assert(alice.addr.transport === "udp");
|
||||
|
@ -287,7 +285,31 @@ unitTest(
|
|||
);
|
||||
|
||||
unitTest(
|
||||
{ ignore: Deno.build.os === "windows", perms: { net: true } },
|
||||
{ perms: { net: true } },
|
||||
async function netUdpConcurrentSendReceive(): Promise<void> {
|
||||
const socket = Deno.listenDatagram({ port: 3500, transport: "udp" });
|
||||
assert(socket.addr.transport === "udp");
|
||||
assertEquals(socket.addr.port, 3500);
|
||||
assertEquals(socket.addr.hostname, "127.0.0.1");
|
||||
|
||||
const recvPromise = socket.receive();
|
||||
|
||||
const sendBuf = new Uint8Array([1, 2, 3]);
|
||||
const sendLen = await socket.send(sendBuf, socket.addr);
|
||||
assertEquals(sendLen, 3);
|
||||
|
||||
const [recvBuf, recvAddr] = await recvPromise;
|
||||
assertEquals(recvBuf.length, 3);
|
||||
assertEquals(1, recvBuf[0]);
|
||||
assertEquals(2, recvBuf[1]);
|
||||
assertEquals(3, recvBuf[2]);
|
||||
|
||||
socket.close();
|
||||
},
|
||||
);
|
||||
|
||||
unitTest(
|
||||
{ perms: { net: true } },
|
||||
async function netUdpBorrowMutError(): Promise<void> {
|
||||
const socket = Deno.listenDatagram({
|
||||
port: 4501,
|
||||
|
@ -335,6 +357,34 @@ unitTest(
|
|||
},
|
||||
);
|
||||
|
||||
// TODO(piscisaureus): Enable after Tokio v0.3/v1.0 upgrade.
|
||||
unitTest(
|
||||
{ ignore: true, perms: { read: true, write: true } },
|
||||
async function netUnixPacketConcurrentSendReceive(): Promise<void> {
|
||||
const filePath = await Deno.makeTempFile();
|
||||
const socket = Deno.listenDatagram({
|
||||
path: filePath,
|
||||
transport: "unixpacket",
|
||||
});
|
||||
assert(socket.addr.transport === "unixpacket");
|
||||
assertEquals(socket.addr.path, filePath);
|
||||
|
||||
const recvPromise = socket.receive();
|
||||
|
||||
const sendBuf = new Uint8Array([1, 2, 3]);
|
||||
const sendLen = await socket.send(sendBuf, socket.addr);
|
||||
assertEquals(sendLen, 3);
|
||||
|
||||
const [recvBuf, recvAddr] = await recvPromise;
|
||||
assertEquals(recvBuf.length, 3);
|
||||
assertEquals(1, recvBuf[0]);
|
||||
assertEquals(2, recvBuf[1]);
|
||||
assertEquals(3, recvBuf[2]);
|
||||
|
||||
socket.close();
|
||||
},
|
||||
);
|
||||
|
||||
unitTest(
|
||||
{ perms: { net: true } },
|
||||
async function netTcpListenIteratorBreakClosesResource(): Promise<void> {
|
||||
|
@ -385,7 +435,7 @@ unitTest(
|
|||
);
|
||||
|
||||
unitTest(
|
||||
{ ignore: Deno.build.os === "windows", perms: { net: true } },
|
||||
{ perms: { net: true } },
|
||||
async function netUdpListenCloseWhileIterating(): Promise<void> {
|
||||
const socket = Deno.listenDatagram({ port: 8000, transport: "udp" });
|
||||
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use std::any::type_name;
|
||||
use std::any::Any;
|
||||
use std::borrow::Borrow;
|
||||
use std::cell::Cell;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Formatter;
|
||||
use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
|
||||
|
@ -45,6 +49,17 @@ impl<T: 'static> AsyncRefCell<T> {
|
|||
pub fn as_ptr(&self) -> *mut T {
|
||||
self.value.get()
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> T {
|
||||
assert!(self.borrow_count.get().is_empty());
|
||||
self.value.into_inner()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Debug for AsyncRefCell<T> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
write!(f, "AsyncRefCell<{}>", type_name::<T>())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Default + 'static> Default for AsyncRefCell<T> {
|
||||
|
|
|
@ -170,7 +170,7 @@ fn op_listen(
|
|||
let std_listener = std::net::TcpListener::bind(&addr)?;
|
||||
std_listener.set_nonblocking(true)?;
|
||||
let listener = TcpListener::try_from(std_listener)?;
|
||||
let rid = state.resource_table_2.add(listener);
|
||||
let rid = state.resource_table.add(listener);
|
||||
Ok(rid)
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ fn op_close(
|
|||
) -> Result<u32, Error> {
|
||||
debug!("close rid={}", rid);
|
||||
state
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.close(rid)
|
||||
.map(|_| 0)
|
||||
.ok_or_else(bad_resource_id)
|
||||
|
@ -196,11 +196,11 @@ async fn op_accept(
|
|||
|
||||
let listener = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpListener>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let stream = listener.accept().await?;
|
||||
let rid = state.borrow_mut().resource_table_2.add(stream);
|
||||
let rid = state.borrow_mut().resource_table.add(stream);
|
||||
Ok(rid)
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ async fn op_read(
|
|||
|
||||
let stream = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
stream.read(&mut bufs[0]).await
|
||||
|
@ -230,7 +230,7 @@ async fn op_write(
|
|||
|
||||
let stream = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
stream.write(&bufs[0]).await
|
||||
|
|
|
@ -134,7 +134,7 @@ fn op_listen(
|
|||
let std_listener = std::net::TcpListener::bind(&addr)?;
|
||||
std_listener.set_nonblocking(true)?;
|
||||
let listener = TcpListener::try_from(std_listener)?;
|
||||
let rid = state.resource_table_2.add(listener);
|
||||
let rid = state.resource_table.add(listener);
|
||||
Ok(serde_json::json!({ "rid": rid }))
|
||||
}
|
||||
|
||||
|
@ -152,7 +152,7 @@ fn op_close(
|
|||
.unwrap();
|
||||
debug!("close rid={}", rid);
|
||||
state
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.close(rid)
|
||||
.map(|_| serde_json::json!(()))
|
||||
.ok_or_else(bad_resource_id)
|
||||
|
@ -174,11 +174,11 @@ async fn op_accept(
|
|||
|
||||
let listener = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpListener>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let stream = listener.accept().await?;
|
||||
let rid = state.borrow_mut().resource_table_2.add(stream);
|
||||
let rid = state.borrow_mut().resource_table.add(stream);
|
||||
Ok(serde_json::json!({ "rid": rid }))
|
||||
}
|
||||
|
||||
|
@ -199,7 +199,7 @@ async fn op_read(
|
|||
|
||||
let stream = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let nread = stream.read(&mut bufs[0]).await?;
|
||||
|
@ -223,7 +223,7 @@ async fn op_write(
|
|||
|
||||
let stream = state
|
||||
.borrow()
|
||||
.resource_table_2
|
||||
.resource_table
|
||||
.get::<TcpStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let nwritten = stream.write(&bufs[0]).await?;
|
||||
|
|
|
@ -17,7 +17,6 @@ mod normalize_path;
|
|||
mod ops;
|
||||
pub mod plugin_api;
|
||||
mod resources;
|
||||
mod resources2;
|
||||
mod runtime;
|
||||
mod shared_queue;
|
||||
mod zero_copy_buf;
|
||||
|
@ -64,10 +63,9 @@ pub use crate::ops::OpFn;
|
|||
pub use crate::ops::OpId;
|
||||
pub use crate::ops::OpState;
|
||||
pub use crate::ops::OpTable;
|
||||
pub use crate::resources::Resource;
|
||||
pub use crate::resources::ResourceId;
|
||||
pub use crate::resources::ResourceTable;
|
||||
pub use crate::resources2::Resource;
|
||||
pub use crate::resources2::ResourceId;
|
||||
pub use crate::resources2::ResourceTable2;
|
||||
pub use crate::runtime::GetErrorClassFn;
|
||||
pub use crate::runtime::JsErrorCreateFn;
|
||||
pub use crate::runtime::JsRuntime;
|
||||
|
|
15
core/ops.rs
15
core/ops.rs
|
@ -4,6 +4,8 @@ use crate::error::bad_resource_id;
|
|||
use crate::error::type_error;
|
||||
use crate::error::AnyError;
|
||||
use crate::gotham_state::GothamState;
|
||||
use crate::resources::ResourceTable;
|
||||
use crate::runtime::GetErrorClassFn;
|
||||
use crate::BufVec;
|
||||
use crate::ZeroCopyBuf;
|
||||
use futures::Future;
|
||||
|
@ -33,10 +35,9 @@ pub enum Op {
|
|||
|
||||
/// Maintains the resources and ops inside a JS runtime.
|
||||
pub struct OpState {
|
||||
pub resource_table: crate::ResourceTable,
|
||||
pub resource_table_2: crate::resources2::ResourceTable,
|
||||
pub resource_table: ResourceTable,
|
||||
pub op_table: OpTable,
|
||||
pub get_error_class_fn: crate::runtime::GetErrorClassFn,
|
||||
pub get_error_class_fn: GetErrorClassFn,
|
||||
gotham_state: GothamState,
|
||||
}
|
||||
|
||||
|
@ -47,7 +48,6 @@ impl Default for OpState {
|
|||
fn default() -> OpState {
|
||||
OpState {
|
||||
resource_table: Default::default(),
|
||||
resource_table_2: Default::default(),
|
||||
op_table: OpTable::default(),
|
||||
get_error_class_fn: &|_| "Error",
|
||||
gotham_state: Default::default(),
|
||||
|
@ -279,7 +279,11 @@ pub fn op_resources(
|
|||
_args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
) -> Result<Value, AnyError> {
|
||||
let serialized_resources = state.resource_table.entries();
|
||||
let serialized_resources: HashMap<u32, String> = state
|
||||
.resource_table
|
||||
.names()
|
||||
.map(|(rid, name)| (rid, name.to_string()))
|
||||
.collect();
|
||||
Ok(json!(serialized_resources))
|
||||
}
|
||||
|
||||
|
@ -300,5 +304,6 @@ pub fn op_close(
|
|||
.resource_table
|
||||
.close(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
Ok(json!({}))
|
||||
}
|
||||
|
|
|
@ -1,20 +1,63 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
// Think of Resources as File Descriptors. They are integers that are allocated by
|
||||
// the privileged side of Deno to refer to various rust objects that need to be
|
||||
// referenced between multiple ops. For example, network sockets are resources.
|
||||
// Resources may or may not correspond to a real operating system file
|
||||
// descriptor (hence the different name).
|
||||
// Think of Resources as File Descriptors. They are integers that are allocated
|
||||
// by the privileged side of Deno which refer to various rust objects that need
|
||||
// to be persisted between various ops. For example, network sockets are
|
||||
// resources. Resources may or may not correspond to a real operating system
|
||||
// file descriptor (hence the different name).
|
||||
|
||||
use crate::resources2::ResourceId;
|
||||
use std::any::type_name;
|
||||
use std::any::Any;
|
||||
use std::any::TypeId;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::iter::Iterator;
|
||||
use std::rc::Rc;
|
||||
|
||||
/// These store Deno's file descriptors. These are not necessarily the operating
|
||||
/// system ones.
|
||||
type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>;
|
||||
/// All objects that can be store in the resource table should implement the
|
||||
/// `Resource` trait.
|
||||
pub trait Resource: Any + 'static {
|
||||
/// Returns a string representation of the resource which is made available
|
||||
/// to JavaScript code through `op_resources`. The default implementation
|
||||
/// returns the Rust type name, but specific resource types may override this
|
||||
/// trait method.
|
||||
fn name(&self) -> Cow<str> {
|
||||
type_name::<Self>().into()
|
||||
}
|
||||
|
||||
/// Map-like data structure storing Deno's resources (equivalent to file descriptors).
|
||||
/// Resources may implement the `close()` trait method if they need to do
|
||||
/// resource specific clean-ups, such as cancelling pending futures, after a
|
||||
/// resource has been removed from the resource table.
|
||||
fn close(self: Rc<Self>) {}
|
||||
}
|
||||
|
||||
impl dyn Resource {
|
||||
#[inline(always)]
|
||||
fn is<T: Resource>(&self) -> bool {
|
||||
self.type_id() == TypeId::of::<T>()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
pub fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> {
|
||||
if self.is::<T>() {
|
||||
let ptr = self as *const Rc<_> as *const Rc<T>;
|
||||
Some(unsafe { &*ptr })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `ResourceId` is an integer value referencing a resource. It could be
|
||||
/// considered to be the Deno equivalent of a `file descriptor` in POSIX like
|
||||
/// operating systems. Elsewhere in the code base it is commonly abbreviated
|
||||
/// to `rid`.
|
||||
// TODO: use `u64` instead?
|
||||
pub type ResourceId = u32;
|
||||
|
||||
/// Map-like data structure storing Deno's resources (equivalent to file
|
||||
/// descriptors).
|
||||
///
|
||||
/// Provides basic methods for element access. A resource can be of any type.
|
||||
/// Different types of resources can be stored in the same map, and provided
|
||||
|
@ -24,156 +67,98 @@ type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>;
|
|||
/// the key in the map.
|
||||
#[derive(Default)]
|
||||
pub struct ResourceTable {
|
||||
map: ResourceMap,
|
||||
next_id: u32,
|
||||
index: HashMap<ResourceId, Rc<dyn Resource>>,
|
||||
next_rid: ResourceId,
|
||||
}
|
||||
|
||||
impl ResourceTable {
|
||||
/// Checks if the given resource ID is contained.
|
||||
pub fn has(&self, rid: ResourceId) -> bool {
|
||||
self.map.contains_key(&rid)
|
||||
}
|
||||
|
||||
/// Returns a shared reference to a resource.
|
||||
///
|
||||
/// Returns `None`, if `rid` is not stored or has a type different from `T`.
|
||||
pub fn get<T: Any>(&self, rid: ResourceId) -> Option<&T> {
|
||||
let (_, resource) = self.map.get(&rid)?;
|
||||
resource.downcast_ref::<T>()
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to a resource.
|
||||
///
|
||||
/// Returns `None`, if `rid` is not stored or has a type different from `T`.
|
||||
pub fn get_mut<T: Any>(&mut self, rid: ResourceId) -> Option<&mut T> {
|
||||
let (_, resource) = self.map.get_mut(&rid)?;
|
||||
resource.downcast_mut::<T>()
|
||||
}
|
||||
|
||||
// TODO: resource id allocation should probably be randomized for security.
|
||||
fn next_rid(&mut self) -> ResourceId {
|
||||
let next_rid = self.next_id;
|
||||
self.next_id += 1;
|
||||
next_rid as ResourceId
|
||||
}
|
||||
|
||||
/// Inserts a resource, taking ownership of it.
|
||||
/// Inserts resource into the resource table, which takes ownership of it.
|
||||
///
|
||||
/// The resource type is erased at runtime and must be statically known
|
||||
/// when retrieving it through `get()`.
|
||||
///
|
||||
/// Returns a unique resource ID, which acts as a key for this resource.
|
||||
pub fn add(&mut self, name: &str, resource: Box<dyn Any>) -> ResourceId {
|
||||
let rid = self.next_rid();
|
||||
let r = self.map.insert(rid, (name.to_string(), resource));
|
||||
assert!(r.is_none());
|
||||
pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId {
|
||||
self.add_rc(Rc::new(resource))
|
||||
}
|
||||
|
||||
/// Inserts a `Rc`-wrapped resource into the resource table.
|
||||
///
|
||||
/// The resource type is erased at runtime and must be statically known
|
||||
/// when retrieving it through `get()`.
|
||||
///
|
||||
/// Returns a unique resource ID, which acts as a key for this resource.
|
||||
pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId {
|
||||
let resource = resource as Rc<dyn Resource>;
|
||||
let rid = self.next_rid;
|
||||
let removed_resource = self.index.insert(rid, resource);
|
||||
assert!(removed_resource.is_none());
|
||||
self.next_rid += 1;
|
||||
rid
|
||||
}
|
||||
|
||||
/// Returns a map of resource IDs to names.
|
||||
///
|
||||
/// The name is the one specified during `add()`. To access resources themselves,
|
||||
/// use the `get()` or `get_mut()` functions.
|
||||
pub fn entries(&self) -> HashMap<ResourceId, String> {
|
||||
/// Returns true if any resource with the given `rid` exists.
|
||||
pub fn has(&self, rid: ResourceId) -> bool {
|
||||
self.index.contains_key(&rid)
|
||||
}
|
||||
|
||||
/// Returns a reference counted pointer to the resource of type `T` with the
|
||||
/// given `rid`. If `rid` is not present or has a type different than `T`,
|
||||
/// this function returns `None`.
|
||||
pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> {
|
||||
self
|
||||
.map
|
||||
.iter()
|
||||
.map(|(key, (name, _resource))| (*key, name.clone()))
|
||||
.collect()
|
||||
.index
|
||||
.get(&rid)
|
||||
.and_then(|rc| rc.downcast_rc::<T>())
|
||||
.map(Clone::clone)
|
||||
}
|
||||
|
||||
// close(2) is done by dropping the value. Therefore we just need to remove
|
||||
// the resource from the resource table.
|
||||
pub fn get_any(&self, rid: ResourceId) -> Option<Rc<dyn Resource>> {
|
||||
self.index.get(&rid).map(Clone::clone)
|
||||
}
|
||||
|
||||
/// Removes a resource of type `T` from the resource table and returns it.
|
||||
/// If a resource with the given `rid` exists but its type does not match `T`,
|
||||
/// it is not removed from the resource table. Note that the resource's
|
||||
/// `close()` method is *not* called.
|
||||
pub fn take<T: Resource>(&mut self, rid: ResourceId) -> Option<Rc<T>> {
|
||||
let resource = self.get::<T>(rid)?;
|
||||
self.index.remove(&rid);
|
||||
Some(resource)
|
||||
}
|
||||
|
||||
/// Removes a resource from the resource table and returns it. Note that the
|
||||
/// resource's `close()` method is *not* called.
|
||||
pub fn take_any(&mut self, rid: ResourceId) -> Option<Rc<dyn Resource>> {
|
||||
self.index.remove(&rid)
|
||||
}
|
||||
|
||||
/// Removes the resource with the given `rid` from the resource table. If the
|
||||
/// only reference to this resource existed in the resource table, this will
|
||||
/// cause the resource to be dropped. However, since resources are reference
|
||||
/// counted, therefore pending ops are not automatically cancelled. A resource
|
||||
/// may implement the `close()` method to perform clean-ups such as canceling
|
||||
/// ops.
|
||||
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
|
||||
self.map.remove(&rid).map(|(_name, _resource)| ())
|
||||
self.index.remove(&rid).map(|resource| resource.close())
|
||||
}
|
||||
|
||||
/// Removes the resource identified by `rid` and returns it.
|
||||
/// Returns an iterator that yields a `(id, name)` pair for every resource
|
||||
/// that's currently in the resource table. This can be used for debugging
|
||||
/// purposes or to implement the `op_resources` op. Note that the order in
|
||||
/// which items appear is not specified.
|
||||
///
|
||||
/// When the provided `rid` is stored, the associated resource will be removed.
|
||||
/// Otherwise, nothing happens and `None` is returned.
|
||||
/// # Example
|
||||
///
|
||||
/// If the type `T` matches the resource's type, the resource will be returned.
|
||||
/// If the type mismatches, `None` is returned, but the resource is still removed.
|
||||
pub fn remove<T: Any>(&mut self, rid: ResourceId) -> Option<Box<T>> {
|
||||
if let Some((_name, resource)) = self.map.remove(&rid) {
|
||||
let res = match resource.downcast::<T>() {
|
||||
Ok(res) => Some(res),
|
||||
Err(_e) => None,
|
||||
};
|
||||
return res;
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
struct FakeResource {
|
||||
not_empty: u128,
|
||||
}
|
||||
|
||||
impl FakeResource {
|
||||
fn new(value: u128) -> FakeResource {
|
||||
FakeResource { not_empty: value }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_resource_table_default() {
|
||||
let table = ResourceTable::default();
|
||||
assert_eq!(table.map.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_to_resource_table_not_empty() {
|
||||
let mut table = ResourceTable::default();
|
||||
table.add("fake1", Box::new(FakeResource::new(1)));
|
||||
table.add("fake2", Box::new(FakeResource::new(2)));
|
||||
assert_eq!(table.map.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_to_resource_table_are_contiguous() {
|
||||
let mut table = ResourceTable::default();
|
||||
let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
|
||||
let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
|
||||
assert_eq!(rid1 + 1, rid2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_from_resource_table_is_what_was_given() {
|
||||
let mut table = ResourceTable::default();
|
||||
let rid = table.add("fake", Box::new(FakeResource::new(7)));
|
||||
let resource = table.get::<FakeResource>(rid);
|
||||
assert_eq!(resource.unwrap().not_empty, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_from_resource_table() {
|
||||
let mut table = ResourceTable::default();
|
||||
let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
|
||||
let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
|
||||
assert_eq!(table.map.len(), 2);
|
||||
table.close(rid1);
|
||||
assert_eq!(table.map.len(), 1);
|
||||
table.close(rid2);
|
||||
assert_eq!(table.map.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_take_from_resource_table() {
|
||||
let mut table = ResourceTable::default();
|
||||
let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
|
||||
let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
|
||||
assert_eq!(table.map.len(), 2);
|
||||
let res1 = table.remove::<FakeResource>(rid1);
|
||||
assert_eq!(table.map.len(), 1);
|
||||
assert!(res1.is_some());
|
||||
let res2 = table.remove::<FakeResource>(rid2);
|
||||
assert_eq!(table.map.len(), 0);
|
||||
assert!(res2.is_some());
|
||||
/// ```
|
||||
/// # use deno_core::ResourceTable;
|
||||
/// # let resource_table = ResourceTable::default();
|
||||
/// let resource_names = resource_table.names().collect::<Vec<_>>();
|
||||
/// ```
|
||||
pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> {
|
||||
self
|
||||
.index
|
||||
.iter()
|
||||
.map(|(&id, resource)| (id, resource.name()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,146 +0,0 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
// Think of Resources as File Descriptors. They are integers that are allocated
|
||||
// by the privileged side of Deno which refer to various rust objects that need
|
||||
// to be persisted between various ops. For example, network sockets are
|
||||
// resources. Resources may or may not correspond to a real operating system
|
||||
// file descriptor (hence the different name).
|
||||
|
||||
use std::any::type_name;
|
||||
use std::any::Any;
|
||||
use std::any::TypeId;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::iter::Iterator;
|
||||
use std::rc::Rc;
|
||||
|
||||
/// All objects that can be store in the resource table should implement the
|
||||
/// `Resource` trait.
|
||||
pub trait Resource: Any + 'static {
|
||||
/// Returns a string representation of the resource which is made available
|
||||
/// to JavaScript code through `op_resources`. The default implementation
|
||||
/// returns the Rust type name, but specific resource types may override this
|
||||
/// trait method.
|
||||
fn name(&self) -> Cow<str> {
|
||||
type_name::<Self>().into()
|
||||
}
|
||||
|
||||
/// Resources may implement the `close()` trait method if they need to do
|
||||
/// resource specific clean-ups, such as cancelling pending futures, after a
|
||||
/// resource has been removed from the resource table.
|
||||
fn close(self: Rc<Self>) {}
|
||||
}
|
||||
|
||||
impl dyn Resource {
|
||||
#[inline(always)]
|
||||
fn is<T: Resource>(&self) -> bool {
|
||||
self.type_id() == TypeId::of::<T>()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> {
|
||||
if self.is::<T>() {
|
||||
let ptr = self as *const Rc<_> as *const Rc<T>;
|
||||
Some(unsafe { &*ptr })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `ResourceId` is an integer value referencing a resource. It could be
|
||||
/// considered to be the Deno equivalent of a `file descriptor` in POSIX like
|
||||
/// operating systems. Elsewhere in the code base it is commonly abbreviated
|
||||
/// to `rid`.
|
||||
// TODO: use `u64` instead?
|
||||
pub type ResourceId = u32;
|
||||
|
||||
/// Temporary alias for `crate::resources2::ResourceTable`.
|
||||
// TODO: remove this when the old `ResourceTable` is obsolete.
|
||||
pub type ResourceTable2 = ResourceTable;
|
||||
|
||||
/// Map-like data structure storing Deno's resources (equivalent to file
|
||||
/// descriptors).
|
||||
///
|
||||
/// Provides basic methods for element access. A resource can be of any type.
|
||||
/// Different types of resources can be stored in the same map, and provided
|
||||
/// with a name for description.
|
||||
///
|
||||
/// Each resource is identified through a _resource ID (rid)_, which acts as
|
||||
/// the key in the map.
|
||||
#[derive(Default)]
|
||||
pub struct ResourceTable {
|
||||
index: HashMap<ResourceId, Rc<dyn Resource>>,
|
||||
next_rid: ResourceId,
|
||||
}
|
||||
|
||||
impl ResourceTable {
|
||||
/// Returns true if any resource with the given `rid` is exists.
|
||||
pub fn has(&self, rid: ResourceId) -> bool {
|
||||
self.index.contains_key(&rid)
|
||||
}
|
||||
|
||||
/// Returns a reference counted pointer to the resource of type `T` with the
|
||||
/// given `rid`. If `rid` is not present or has a type different than `T`,
|
||||
/// this function returns `None`.
|
||||
pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> {
|
||||
self
|
||||
.index
|
||||
.get(&rid)
|
||||
.and_then(|resource| resource.downcast_rc::<T>())
|
||||
.map(Clone::clone)
|
||||
}
|
||||
|
||||
/// Inserts resource into the resource table, which takes ownership of it.
|
||||
///
|
||||
/// The resource type is erased at runtime and must be statically known
|
||||
/// when retrieving it through `get()`.
|
||||
///
|
||||
/// Returns a unique resource ID, which acts as a key for this resource.
|
||||
pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId {
|
||||
self.add_rc(Rc::new(resource))
|
||||
}
|
||||
|
||||
/// Inserts a `Rc`-wrapped resource into the resource table.
|
||||
///
|
||||
/// The resource type is erased at runtime and must be statically known
|
||||
/// when retrieving it through `get()`.
|
||||
///
|
||||
/// Returns a unique resource ID, which acts as a key for this resource.
|
||||
pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId {
|
||||
let resource = resource as Rc<dyn Resource>;
|
||||
let rid = self.next_rid;
|
||||
let removed_resource = self.index.insert(rid, resource);
|
||||
assert!(removed_resource.is_none());
|
||||
self.next_rid += 1;
|
||||
rid
|
||||
}
|
||||
|
||||
/// Removes the resource with the given `rid` from the resource table. If the
|
||||
/// only reference to this resource existed in the resource table, this will
|
||||
/// cause the resource to be dropped. However, since resources are reference
|
||||
/// counted, therefore pending ops are not automatically cancelled.
|
||||
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
|
||||
self.index.remove(&rid).map(|resource| resource.close())
|
||||
}
|
||||
|
||||
/// Returns an iterator that yields a `(id, name)` pair for every resource
|
||||
/// that's currently in the resource table. This can be used for debugging
|
||||
/// purposes or to implement the `op_resources` op. Note that the order in
|
||||
/// which items appear is not specified.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use deno_core::ResourceTable2;
|
||||
/// # let resource_table = ResourceTable2::default();
|
||||
/// let resource_names = resource_table.names().collect::<Vec<_>>();
|
||||
/// ```
|
||||
pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> {
|
||||
self
|
||||
.index
|
||||
.iter()
|
||||
.map(|(&id, resource)| (id, resource.name()))
|
||||
}
|
||||
}
|
|
@ -5,15 +5,19 @@
|
|||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::url;
|
||||
use deno_core::url::Url;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::JsRuntime;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
|
||||
use reqwest::header::HeaderName;
|
||||
|
@ -23,6 +27,7 @@ use reqwest::Client;
|
|||
use reqwest::Method;
|
||||
use reqwest::Response;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::From;
|
||||
use std::fs::File;
|
||||
|
@ -172,10 +177,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
let rid = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.add("httpBody", Box::new(res));
|
||||
let rid = state.borrow_mut().resource_table.add(HttpBodyResource {
|
||||
response: AsyncRefCell::new(res),
|
||||
cancel: Default::default(),
|
||||
});
|
||||
|
||||
Ok(json!({
|
||||
"bodyRid": rid,
|
||||
|
@ -199,32 +204,43 @@ pub async fn op_fetch_read(
|
|||
let args: Args = serde_json::from_value(args)?;
|
||||
let rid = args.rid;
|
||||
|
||||
use futures::future::poll_fn;
|
||||
use futures::ready;
|
||||
use futures::FutureExt;
|
||||
let f = poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let response = state
|
||||
.resource_table
|
||||
.get_mut::<Response>(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<HttpBodyResource>(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let mut response = RcRef::map(&resource, |r| &r.response).borrow_mut().await;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let maybe_chunk = response.chunk().or_cancel(cancel).await??;
|
||||
if let Some(chunk) = maybe_chunk {
|
||||
// TODO(ry) This is terribly inefficient. Make this zero-copy.
|
||||
Ok(json!({ "chunk": &*chunk }))
|
||||
} else {
|
||||
Ok(json!({ "chunk": null }))
|
||||
}
|
||||
}
|
||||
|
||||
let mut chunk_fut = response.chunk().boxed_local();
|
||||
let r = ready!(chunk_fut.poll_unpin(cx))?;
|
||||
if let Some(chunk) = r {
|
||||
// TODO(ry) This is terribly inefficient. Make this zero-copy.
|
||||
Ok(json!({ "chunk": &*chunk })).into()
|
||||
} else {
|
||||
Ok(json!({ "chunk": null })).into()
|
||||
}
|
||||
});
|
||||
f.await
|
||||
struct HttpBodyResource {
|
||||
response: AsyncRefCell<Response>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for HttpBodyResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"httpBody".into()
|
||||
}
|
||||
}
|
||||
|
||||
struct HttpClientResource {
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl Resource for HttpClientResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"httpClient".into()
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClientResource {
|
||||
fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
|
@ -255,9 +271,7 @@ where
|
|||
|
||||
let client = create_http_client(args.ca_file.as_deref()).unwrap();
|
||||
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("httpClient", Box::new(HttpClientResource::new(client)));
|
||||
let rid = state.resource_table.add(HttpClientResource::new(client));
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
|
|
|
@ -168,6 +168,12 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
|
|||
e.downcast_ref::<dlopen::Error>()
|
||||
.map(get_dlopen_error_class)
|
||||
})
|
||||
.or_else(|| {
|
||||
e.downcast_ref::<deno_core::Canceled>().map(|e| {
|
||||
let io_err: io::Error = e.to_owned().into();
|
||||
get_io_error_class(&io_err)
|
||||
})
|
||||
})
|
||||
.or_else(|| {
|
||||
e.downcast_ref::<env::VarError>()
|
||||
.map(get_env_var_error_class)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
// Some deserializer fields are only used on Unix and Windows build fails without it
|
||||
use super::io::std_file_resource;
|
||||
use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
|
||||
use super::io::StreamResource;
|
||||
use crate::fs_util::canonicalize_path;
|
||||
use crate::permissions::Permissions;
|
||||
use deno_core::error::custom_error;
|
||||
|
@ -185,13 +185,8 @@ fn op_open_sync(
|
|||
let (path, open_options) = open_helper(state, args)?;
|
||||
let std_file = open_options.open(path)?;
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
let rid = state.resource_table.add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
let resource = StreamResource::fs_file(tokio_file);
|
||||
let rid = state.resource_table.add(resource);
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
|
@ -204,13 +199,8 @@ async fn op_open_async(
|
|||
let tokio_file = tokio::fs::OpenOptions::from(open_options)
|
||||
.open(path)
|
||||
.await?;
|
||||
let rid = state.borrow_mut().resource_table.add(
|
||||
"fsFile",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
|
||||
tokio_file,
|
||||
FileMetadata::default(),
|
||||
))))),
|
||||
);
|
||||
let resource = StreamResource::fs_file(tokio_file);
|
||||
let rid = state.borrow_mut().resource_table.add(resource);
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
|
|
|
@ -3,12 +3,16 @@
|
|||
use crate::permissions::Permissions;
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use notify::event::Event as NotifyEvent;
|
||||
use notify::Error as NotifyError;
|
||||
|
@ -18,6 +22,7 @@ use notify::RecursiveMode;
|
|||
use notify::Watcher;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::From;
|
||||
use std::path::PathBuf;
|
||||
|
@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
|
|||
struct FsEventsResource {
|
||||
#[allow(unused)]
|
||||
watcher: RecommendedWatcher,
|
||||
receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
|
||||
receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for FsEventsResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"fsEvents".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a file system event.
|
||||
|
@ -99,8 +115,12 @@ fn op_fs_events_open(
|
|||
.check_read(&PathBuf::from(path))?;
|
||||
watcher.watch(path, recursive_mode)?;
|
||||
}
|
||||
let resource = FsEventsResource { watcher, receiver };
|
||||
let rid = state.resource_table.add("fsEvents", Box::new(resource));
|
||||
let resource = FsEventsResource {
|
||||
watcher,
|
||||
receiver: AsyncRefCell::new(receiver),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let rid = state.resource_table.add(resource);
|
||||
Ok(json!(rid))
|
||||
}
|
||||
|
||||
|
@ -114,20 +134,18 @@ async fn op_fs_events_poll(
|
|||
rid: u32,
|
||||
}
|
||||
let PollArgs { rid } = serde_json::from_value(args)?;
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let watcher = state
|
||||
.resource_table
|
||||
.get_mut::<FsEventsResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
watcher
|
||||
.receiver
|
||||
.poll_recv(cx)
|
||||
.map(|maybe_result| match maybe_result {
|
||||
Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
|
||||
Some(Err(err)) => Err(err),
|
||||
None => Ok(json!({ "done": true })),
|
||||
})
|
||||
})
|
||||
.await
|
||||
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<FsEventsResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let maybe_result = receiver.recv().or_cancel(cancel).await?;
|
||||
match maybe_result {
|
||||
Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
|
||||
Some(Err(err)) => Err(err),
|
||||
None => Ok(json!({ "done": true })),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,26 +7,29 @@ use deno_core::error::bad_resource_id;
|
|||
use deno_core::error::resource_unavailable;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::futures::future::FutureExt;
|
||||
use deno_core::futures::ready;
|
||||
use deno_core::AsyncMutFuture;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::CancelTryFuture;
|
||||
use deno_core::JsRuntime;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::tcp;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::client::TlsStream as ClientTlsStream;
|
||||
use tokio_rustls::server::TlsStream as ServerTlsStream;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::FromRawFd;
|
||||
|
||||
#[cfg(windows)]
|
||||
|
@ -94,26 +97,28 @@ pub fn init(rt: &mut JsRuntime) {
|
|||
}
|
||||
|
||||
pub fn get_stdio() -> (
|
||||
Option<StreamResourceHolder>,
|
||||
Option<StreamResourceHolder>,
|
||||
Option<StreamResourceHolder>,
|
||||
Option<StreamResource>,
|
||||
Option<StreamResource>,
|
||||
Option<StreamResource>,
|
||||
) {
|
||||
let stdin = get_stdio_stream(&STDIN_HANDLE);
|
||||
let stdout = get_stdio_stream(&STDOUT_HANDLE);
|
||||
let stderr = get_stdio_stream(&STDERR_HANDLE);
|
||||
let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin");
|
||||
let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout");
|
||||
let stderr = get_stdio_stream(&STDERR_HANDLE, "stderr");
|
||||
|
||||
(stdin, stdout, stderr)
|
||||
}
|
||||
|
||||
fn get_stdio_stream(
|
||||
handle: &Option<std::fs::File>,
|
||||
) -> Option<StreamResourceHolder> {
|
||||
name: &str,
|
||||
) -> Option<StreamResource> {
|
||||
match handle {
|
||||
None => None,
|
||||
Some(file_handle) => match file_handle.try_clone() {
|
||||
Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile(
|
||||
Some((tokio::fs::File::from_std(clone), FileMetadata::default())),
|
||||
))),
|
||||
Ok(clone) => {
|
||||
let tokio_file = tokio::fs::File::from_std(clone);
|
||||
Some(StreamResource::stdio(tokio_file, name))
|
||||
}
|
||||
Err(_e) => None,
|
||||
},
|
||||
}
|
||||
|
@ -137,100 +142,317 @@ pub struct FileMetadata {
|
|||
pub tty: TTYMetadata,
|
||||
}
|
||||
|
||||
pub struct StreamResourceHolder {
|
||||
pub resource: StreamResource,
|
||||
waker: HashMap<usize, futures::task::AtomicWaker>,
|
||||
waker_counter: AtomicUsize,
|
||||
#[derive(Debug)]
|
||||
pub struct FullDuplexResource<R, W> {
|
||||
rd: AsyncRefCell<R>,
|
||||
wr: AsyncRefCell<W>,
|
||||
// When a full-duplex resource is closed, all pending 'read' ops are
|
||||
// canceled, while 'write' ops are allowed to complete. Therefore only
|
||||
// 'read' futures should be attached to this cancel handle.
|
||||
cancel_handle: CancelHandle,
|
||||
}
|
||||
|
||||
impl StreamResourceHolder {
|
||||
pub fn new(resource: StreamResource) -> StreamResourceHolder {
|
||||
StreamResourceHolder {
|
||||
resource,
|
||||
// Atleast one task is expecter for the resource
|
||||
waker: HashMap::with_capacity(1),
|
||||
// Tracks wakers Ids
|
||||
waker_counter: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for StreamResourceHolder {
|
||||
fn drop(&mut self) {
|
||||
self.wake_tasks();
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamResourceHolder {
|
||||
pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> {
|
||||
let waker = futures::task::AtomicWaker::new();
|
||||
waker.register(cx.waker());
|
||||
// Its OK if it overflows
|
||||
let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
|
||||
self.waker.insert(task_waker_id, waker);
|
||||
Ok(task_waker_id)
|
||||
}
|
||||
|
||||
pub fn wake_tasks(&mut self) {
|
||||
for waker in self.waker.values() {
|
||||
waker.wake();
|
||||
impl<R: 'static, W: 'static> FullDuplexResource<R, W> {
|
||||
pub fn new((rd, wr): (R, W)) -> Self {
|
||||
Self {
|
||||
rd: rd.into(),
|
||||
wr: wr.into(),
|
||||
cancel_handle: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn untrack_task(&mut self, task_waker_id: usize) {
|
||||
self.waker.remove(&task_waker_id);
|
||||
pub fn into_inner(self) -> (R, W) {
|
||||
(self.rd.into_inner(), self.wr.into_inner())
|
||||
}
|
||||
|
||||
pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
|
||||
RcRef::map(self, |r| &r.rd).borrow_mut()
|
||||
}
|
||||
|
||||
pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
|
||||
RcRef::map(self, |r| &r.wr).borrow_mut()
|
||||
}
|
||||
|
||||
pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
|
||||
RcRef::map(self, |r| &r.cancel_handle)
|
||||
}
|
||||
|
||||
pub fn cancel_read_ops(&self) {
|
||||
self.cancel_handle.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
pub enum StreamResource {
|
||||
FsFile(Option<(tokio::fs::File, FileMetadata)>),
|
||||
TcpStream(Option<tokio::net::TcpStream>),
|
||||
#[cfg(not(windows))]
|
||||
UnixStream(tokio::net::UnixStream),
|
||||
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
|
||||
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
|
||||
ChildStdin(tokio::process::ChildStdin),
|
||||
ChildStdout(tokio::process::ChildStdout),
|
||||
ChildStderr(tokio::process::ChildStderr),
|
||||
impl<R, W> FullDuplexResource<R, W>
|
||||
where
|
||||
R: AsyncRead + Unpin + 'static,
|
||||
W: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
|
||||
let mut rd = self.rd_borrow_mut().await;
|
||||
let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
|
||||
Ok(nread)
|
||||
}
|
||||
|
||||
async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
|
||||
let mut wr = self.wr_borrow_mut().await;
|
||||
let nwritten = wr.write(buf).await?;
|
||||
Ok(nwritten)
|
||||
}
|
||||
}
|
||||
|
||||
trait UnpinAsyncRead: AsyncRead + Unpin {}
|
||||
trait UnpinAsyncWrite: AsyncWrite + Unpin {}
|
||||
pub type TcpStreamResource =
|
||||
FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
|
||||
|
||||
impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {}
|
||||
impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {}
|
||||
impl Resource for TcpStreamResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"tcpStream".into()
|
||||
}
|
||||
|
||||
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
|
||||
/// but uses an `AnyError` error instead of `std::io:Error`
|
||||
pub trait DenoAsyncRead {
|
||||
fn poll_read(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<Result<usize, AnyError>>;
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel_read_ops();
|
||||
}
|
||||
}
|
||||
|
||||
impl DenoAsyncRead for StreamResource {
|
||||
fn poll_read(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<Result<usize, AnyError>> {
|
||||
use StreamResource::*;
|
||||
let f: &mut dyn UnpinAsyncRead = match self {
|
||||
FsFile(Some((f, _))) => f,
|
||||
FsFile(None) => return Poll::Ready(Err(resource_unavailable())),
|
||||
TcpStream(Some(f)) => f,
|
||||
#[cfg(not(windows))]
|
||||
UnixStream(f) => f,
|
||||
ClientTlsStream(f) => f,
|
||||
ServerTlsStream(f) => f,
|
||||
ChildStdout(f) => f,
|
||||
ChildStderr(f) => f,
|
||||
_ => return Err(bad_resource_id()).into(),
|
||||
};
|
||||
let v = ready!(Pin::new(f).poll_read(cx, buf))?;
|
||||
Ok(v).into()
|
||||
#[derive(Default)]
|
||||
pub struct StreamResource {
|
||||
pub fs_file:
|
||||
Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
|
||||
|
||||
#[cfg(unix)]
|
||||
pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>,
|
||||
|
||||
child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>,
|
||||
|
||||
child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>,
|
||||
|
||||
child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>,
|
||||
|
||||
client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>,
|
||||
|
||||
server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>,
|
||||
|
||||
cancel: CancelHandle,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for StreamResource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "StreamResource")
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamResource {
|
||||
pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self {
|
||||
Self {
|
||||
fs_file: Some(AsyncRefCell::new((
|
||||
Some(fs_file),
|
||||
Some(FileMetadata::default()),
|
||||
))),
|
||||
name: name.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fs_file(fs_file: tokio::fs::File) -> Self {
|
||||
Self {
|
||||
fs_file: Some(AsyncRefCell::new((
|
||||
Some(fs_file),
|
||||
Some(FileMetadata::default()),
|
||||
))),
|
||||
name: "fsFile".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self {
|
||||
Self {
|
||||
unix_stream: Some(AsyncRefCell::new(unix_stream)),
|
||||
name: "unixStream".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn child_stdout(child: tokio::process::ChildStdout) -> Self {
|
||||
Self {
|
||||
child_stdout: Some(AsyncRefCell::new(child)),
|
||||
name: "childStdout".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn child_stderr(child: tokio::process::ChildStderr) -> Self {
|
||||
Self {
|
||||
child_stderr: Some(AsyncRefCell::new(child)),
|
||||
name: "childStderr".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn child_stdin(child: tokio::process::ChildStdin) -> Self {
|
||||
Self {
|
||||
child_stdin: Some(AsyncRefCell::new(child)),
|
||||
name: "childStdin".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self {
|
||||
Self {
|
||||
client_tls_stream: Some(AsyncRefCell::new(stream)),
|
||||
name: "clientTlsStream".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self {
|
||||
Self {
|
||||
server_tls_stream: Some(AsyncRefCell::new(stream)),
|
||||
name: "serverTlsStream".to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
|
||||
// TODO(bartlomieju): in the future, it would be better for `StreamResource`
|
||||
// to be an enum instead a struct with many `Option` fields, however I
|
||||
// wasn't able to get it to work with `AsyncRefCell`s.
|
||||
if self.fs_file.is_some() {
|
||||
debug_assert!(self.child_stdin.is_none());
|
||||
debug_assert!(self.child_stdout.is_none());
|
||||
debug_assert!(self.child_stderr.is_none());
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
debug_assert!(self.client_tls_stream.is_none());
|
||||
let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?;
|
||||
return Ok(nwritten);
|
||||
} else if self.child_stdout.is_some() {
|
||||
debug_assert!(self.child_stdin.is_none());
|
||||
debug_assert!(self.child_stderr.is_none());
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
debug_assert!(self.client_tls_stream.is_none());
|
||||
let mut child_stdout =
|
||||
RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = child_stdout.read(buf).try_or_cancel(cancel).await?;
|
||||
return Ok(nread);
|
||||
} else if self.child_stderr.is_some() {
|
||||
debug_assert!(self.child_stdin.is_none());
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
debug_assert!(self.client_tls_stream.is_none());
|
||||
let mut child_stderr =
|
||||
RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = child_stderr.read(buf).try_or_cancel(cancel).await?;
|
||||
return Ok(nread);
|
||||
} else if self.client_tls_stream.is_some() {
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
let mut client_tls_stream =
|
||||
RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?;
|
||||
return Ok(nread);
|
||||
} else if self.server_tls_stream.is_some() {
|
||||
let mut server_tls_stream =
|
||||
RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?;
|
||||
return Ok(nread);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
if self.unix_stream.is_some() {
|
||||
let mut unix_stream =
|
||||
RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
let nread = unix_stream.read(buf).try_or_cancel(cancel).await?;
|
||||
return Ok(nread);
|
||||
}
|
||||
|
||||
Err(bad_resource_id())
|
||||
}
|
||||
|
||||
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
|
||||
// TODO(bartlomieju): in the future, it would be better for `StreamResource`
|
||||
// to be an enum instead a struct with many `Option` fields, however I
|
||||
// wasn't able to get it to work with `AsyncRefCell`s.
|
||||
if self.fs_file.is_some() {
|
||||
debug_assert!(self.child_stdin.is_none());
|
||||
debug_assert!(self.child_stdout.is_none());
|
||||
debug_assert!(self.child_stderr.is_none());
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
debug_assert!(self.client_tls_stream.is_none());
|
||||
let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?;
|
||||
(*fs_file).0.as_mut().unwrap().flush().await?;
|
||||
return Ok(nwritten);
|
||||
} else if self.child_stdin.is_some() {
|
||||
debug_assert!(self.child_stdout.is_none());
|
||||
debug_assert!(self.child_stderr.is_none());
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
debug_assert!(self.client_tls_stream.is_none());
|
||||
let mut child_stdin =
|
||||
RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = child_stdin.write(buf).await?;
|
||||
child_stdin.flush().await?;
|
||||
return Ok(nwritten);
|
||||
} else if self.client_tls_stream.is_some() {
|
||||
debug_assert!(self.server_tls_stream.is_none());
|
||||
let mut client_tls_stream =
|
||||
RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = client_tls_stream.write(buf).await?;
|
||||
client_tls_stream.flush().await?;
|
||||
return Ok(nwritten);
|
||||
} else if self.server_tls_stream.is_some() {
|
||||
let mut server_tls_stream =
|
||||
RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = server_tls_stream.write(buf).await?;
|
||||
server_tls_stream.flush().await?;
|
||||
return Ok(nwritten);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
if self.unix_stream.is_some() {
|
||||
let mut unix_stream =
|
||||
RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
let nwritten = unix_stream.write(buf).await?;
|
||||
unix_stream.flush().await?;
|
||||
return Ok(nwritten);
|
||||
}
|
||||
|
||||
Err(bad_resource_id())
|
||||
}
|
||||
}
|
||||
|
||||
impl Resource for StreamResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -263,92 +485,26 @@ pub fn op_read(
|
|||
})
|
||||
} else {
|
||||
let mut zero_copy = zero_copy[0].clone();
|
||||
MinimalOp::Async(
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let resource_holder = state
|
||||
MinimalOp::Async({
|
||||
async move {
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get_mut::<StreamResourceHolder>(rid as u32)
|
||||
.get_any(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
let mut task_tracker_id: Option<usize> = None;
|
||||
let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy)
|
||||
let nread = if let Some(stream) =
|
||||
resource.downcast_rc::<TcpStreamResource>()
|
||||
{
|
||||
Poll::Ready(t) => {
|
||||
if let Some(id) = task_tracker_id {
|
||||
resource_holder.untrack_task(id);
|
||||
}
|
||||
t
|
||||
}
|
||||
Poll::Pending => {
|
||||
task_tracker_id.replace(resource_holder.track_task(cx)?);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}?;
|
||||
Poll::Ready(Ok(nread as i32))
|
||||
})
|
||||
.boxed_local(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
|
||||
/// but uses an `AnyError` error instead of `std::io:Error`
|
||||
pub trait DenoAsyncWrite {
|
||||
fn poll_write(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, AnyError>>;
|
||||
|
||||
fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
|
||||
|
||||
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
|
||||
}
|
||||
|
||||
impl DenoAsyncWrite for StreamResource {
|
||||
fn poll_write(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, AnyError>> {
|
||||
use StreamResource::*;
|
||||
let f: &mut dyn UnpinAsyncWrite = match self {
|
||||
FsFile(Some((f, _))) => f,
|
||||
FsFile(None) => return Poll::Pending,
|
||||
TcpStream(Some(f)) => f,
|
||||
#[cfg(not(windows))]
|
||||
UnixStream(f) => f,
|
||||
ClientTlsStream(f) => f,
|
||||
ServerTlsStream(f) => f,
|
||||
ChildStdin(f) => f,
|
||||
_ => return Err(bad_resource_id()).into(),
|
||||
};
|
||||
|
||||
let v = ready!(Pin::new(f).poll_write(cx, buf))?;
|
||||
Ok(v).into()
|
||||
}
|
||||
|
||||
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> {
|
||||
use StreamResource::*;
|
||||
let f: &mut dyn UnpinAsyncWrite = match self {
|
||||
FsFile(Some((f, _))) => f,
|
||||
FsFile(None) => return Poll::Pending,
|
||||
TcpStream(Some(f)) => f,
|
||||
#[cfg(not(windows))]
|
||||
UnixStream(f) => f,
|
||||
ClientTlsStream(f) => f,
|
||||
ServerTlsStream(f) => f,
|
||||
ChildStdin(f) => f,
|
||||
_ => return Err(bad_resource_id()).into(),
|
||||
};
|
||||
|
||||
ready!(Pin::new(f).poll_flush(cx))?;
|
||||
Ok(()).into()
|
||||
}
|
||||
|
||||
fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> {
|
||||
unimplemented!()
|
||||
stream.read(&mut zero_copy).await?
|
||||
} else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
|
||||
stream.clone().read(&mut zero_copy).await?
|
||||
} else {
|
||||
return Err(bad_resource_id());
|
||||
};
|
||||
Ok(nread as i32)
|
||||
}
|
||||
.boxed_local()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,93 +537,76 @@ pub fn op_write(
|
|||
})
|
||||
} else {
|
||||
let zero_copy = zero_copy[0].clone();
|
||||
MinimalOp::Async(
|
||||
MinimalOp::Async({
|
||||
async move {
|
||||
let nwritten = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let resource_holder = state
|
||||
.resource_table
|
||||
.get_mut::<StreamResourceHolder>(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
resource_holder.resource.poll_write(cx, &zero_copy)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
|
||||
// and the reasons for the need to explicitly flush are not fully known.
|
||||
// Figure out why it's needed and preferably remove it.
|
||||
// https://github.com/denoland/deno/issues/3565
|
||||
poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let resource_holder = state
|
||||
.resource_table
|
||||
.get_mut::<StreamResourceHolder>(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
resource_holder.resource.poll_flush(cx)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get_any(rid as u32)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let nwritten = if let Some(stream) =
|
||||
resource.downcast_rc::<TcpStreamResource>()
|
||||
{
|
||||
stream.write(&zero_copy).await?
|
||||
} else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
|
||||
stream.clone().write(&zero_copy).await?
|
||||
} else {
|
||||
return Err(bad_resource_id());
|
||||
};
|
||||
Ok(nwritten as i32)
|
||||
}
|
||||
.boxed_local(),
|
||||
)
|
||||
.boxed_local()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function for operating on a std::fs::File stored in the resource table.
|
||||
///
|
||||
/// We store file system file resources as tokio::fs::File, so this is a little
|
||||
/// utility function that gets a std::fs:File when you need to do blocking
|
||||
/// operations.
|
||||
///
|
||||
/// Returns ErrorKind::Busy if the resource is being used by another op.
|
||||
pub fn std_file_resource<F, T>(
|
||||
state: &mut OpState,
|
||||
rid: u32,
|
||||
mut f: F,
|
||||
) -> Result<T, AnyError>
|
||||
where
|
||||
F: FnMut(
|
||||
Result<&mut std::fs::File, &mut StreamResource>,
|
||||
) -> Result<T, AnyError>,
|
||||
F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>,
|
||||
{
|
||||
// First we look up the rid in the resource table.
|
||||
let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid);
|
||||
if let Some(ref mut resource_holder) = r {
|
||||
// Sync write only works for FsFile. It doesn't make sense to do this
|
||||
// for non-blocking sockets. So we error out if not FsFile.
|
||||
match &mut resource_holder.resource {
|
||||
StreamResource::FsFile(option_file_metadata) => {
|
||||
// The object in the resource table is a tokio::fs::File - but in
|
||||
// order to do a blocking write on it, we must turn it into a
|
||||
// std::fs::File. Hopefully this code compiles down to nothing.
|
||||
if let Some((tokio_file, metadata)) = option_file_metadata.take() {
|
||||
match tokio_file.try_into_std() {
|
||||
Ok(mut std_file) => {
|
||||
let result = f(Ok(&mut std_file));
|
||||
// Turn the std_file handle back into a tokio file, put it back
|
||||
// in the resource table.
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
resource_holder.resource =
|
||||
StreamResource::FsFile(Some((tokio_file, metadata)));
|
||||
// return the result.
|
||||
result
|
||||
}
|
||||
Err(tokio_file) => {
|
||||
// This function will return an error containing the file if
|
||||
// some operation is in-flight.
|
||||
resource_holder.resource =
|
||||
StreamResource::FsFile(Some((tokio_file, metadata)));
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get::<StreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
// Sync write only works for FsFile. It doesn't make sense to do this
|
||||
// for non-blocking sockets. So we error out if not FsFile.
|
||||
if resource.fs_file.is_none() {
|
||||
return f(Err(()));
|
||||
}
|
||||
|
||||
// The object in the resource table is a tokio::fs::File - but in
|
||||
// order to do a blocking write on it, we must turn it into a
|
||||
// std::fs::File. Hopefully this code compiles down to nothing.
|
||||
|
||||
let fs_file_resource =
|
||||
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
|
||||
|
||||
if let Some(mut fs_file) = fs_file_resource {
|
||||
let tokio_file = fs_file.0.take().unwrap();
|
||||
match tokio_file.try_into_std() {
|
||||
Ok(mut std_file) => {
|
||||
let result = f(Ok(&mut std_file));
|
||||
// Turn the std_file handle back into a tokio file, put it back
|
||||
// in the resource table.
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
fs_file.0 = Some(tokio_file);
|
||||
// return the result.
|
||||
result
|
||||
}
|
||||
Err(tokio_file) => {
|
||||
// This function will return an error containing the file if
|
||||
// some operation is in-flight.
|
||||
fs_file.0 = Some(tokio_file);
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
_ => f(Err(&mut resource_holder.resource)),
|
||||
}
|
||||
} else {
|
||||
Err(bad_resource_id())
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use crate::ops::io::StreamResource;
|
||||
use crate::ops::io::StreamResourceHolder;
|
||||
use crate::ops::io::FullDuplexResource;
|
||||
use crate::ops::io::TcpStreamResource;
|
||||
use crate::permissions::Permissions;
|
||||
use crate::resolve_addr::resolve_addr;
|
||||
use crate::resolve_addr::resolve_addr_sync;
|
||||
|
@ -11,21 +11,24 @@ use deno_core::error::custom_error;
|
|||
use deno_core::error::generic_error;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::CancelTryFuture;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::net::Shutdown;
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::net::udp;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::net::UdpSocket;
|
||||
|
@ -33,12 +36,14 @@ use tokio::net::UdpSocket;
|
|||
#[cfg(unix)]
|
||||
use super::net_unix;
|
||||
#[cfg(unix)]
|
||||
use crate::ops::io::StreamResource;
|
||||
#[cfg(unix)]
|
||||
use std::path::Path;
|
||||
|
||||
pub fn init(rt: &mut deno_core::JsRuntime) {
|
||||
super::reg_json_async(rt, "op_accept", op_accept);
|
||||
super::reg_json_async(rt, "op_connect", op_connect);
|
||||
super::reg_json_sync(rt, "op_shutdown", op_shutdown);
|
||||
super::reg_json_async(rt, "op_shutdown", op_shutdown);
|
||||
super::reg_json_sync(rt, "op_listen", op_listen);
|
||||
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
|
||||
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
|
||||
|
@ -57,39 +62,31 @@ async fn accept_tcp(
|
|||
) -> Result<Value, AnyError> {
|
||||
let rid = args.rid as u32;
|
||||
|
||||
let accept_fut = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let listener_resource = state
|
||||
.resource_table
|
||||
.get_mut::<TcpListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let listener = &mut listener_resource.listener;
|
||||
match listener.poll_accept(cx).map_err(AnyError::from) {
|
||||
Poll::Ready(Ok((stream, addr))) => {
|
||||
listener_resource.untrack_task();
|
||||
Poll::Ready(Ok((stream, addr)))
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<TcpListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let mut listener = RcRef::map(&resource, |r| &r.listener)
|
||||
.try_borrow_mut()
|
||||
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let (tcp_stream, _socket_addr) =
|
||||
listener.accept().try_or_cancel(cancel).await.map_err(|e| {
|
||||
// FIXME(bartlomieju): compatibility with current JS implementation
|
||||
if let std::io::ErrorKind::Interrupted = e.kind() {
|
||||
bad_resource("Listener has been closed")
|
||||
} else {
|
||||
e.into()
|
||||
}
|
||||
Poll::Pending => {
|
||||
listener_resource.track_task(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
listener_resource.untrack_task();
|
||||
Poll::Ready(Err(e))
|
||||
}
|
||||
}
|
||||
});
|
||||
let (tcp_stream, _socket_addr) = accept_fut.await?;
|
||||
})?;
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
|
||||
let mut state = state.borrow_mut();
|
||||
let rid = state.resource_table.add(
|
||||
"tcpStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
|
||||
tcp_stream,
|
||||
)))),
|
||||
);
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add(TcpStreamResource::new(tcp_stream.into_split()));
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
|
@ -138,18 +135,17 @@ async fn receive_udp(
|
|||
|
||||
let rid = args.rid as u32;
|
||||
|
||||
let receive_fut = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get_mut::<UdpSocketResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Socket has been closed"))?;
|
||||
let socket = &mut resource.socket;
|
||||
socket
|
||||
.poll_recv_from(cx, &mut zero_copy)
|
||||
.map_err(AnyError::from)
|
||||
});
|
||||
let (size, remote_addr) = receive_fut.await?;
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<UdpSocketResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Socket has been closed"))?;
|
||||
let (size, remote_addr) = resource
|
||||
.rd_borrow_mut()
|
||||
.await
|
||||
.recv_from(&mut zero_copy)
|
||||
.try_or_cancel(resource.cancel_handle())
|
||||
.await?;
|
||||
Ok(json!({
|
||||
"size": size,
|
||||
"remoteAddr": {
|
||||
|
@ -207,19 +203,18 @@ async fn op_datagram_send(
|
|||
.check_net(&args.hostname, args.port)?;
|
||||
}
|
||||
let addr = resolve_addr(&args.hostname, args.port).await?;
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get_mut::<UdpSocketResource>(rid as u32)
|
||||
.ok_or_else(|| bad_resource("Socket has been closed"))?;
|
||||
resource
|
||||
.socket
|
||||
.poll_send_to(cx, &zero_copy, &addr)
|
||||
.map_ok(|byte_length| json!(byte_length))
|
||||
.map_err(AnyError::from)
|
||||
})
|
||||
.await
|
||||
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<UdpSocketResource>(rid as u32)
|
||||
.ok_or_else(|| bad_resource("Socket has been closed"))?;
|
||||
let byte_length = resource
|
||||
.wr_borrow_mut()
|
||||
.await
|
||||
.send_to(&zero_copy, &addr)
|
||||
.await?;
|
||||
Ok(json!(byte_length))
|
||||
}
|
||||
#[cfg(unix)]
|
||||
SendArgs {
|
||||
|
@ -232,18 +227,17 @@ async fn op_datagram_send(
|
|||
let s = state.borrow();
|
||||
s.borrow::<Permissions>().check_write(&address_path)?;
|
||||
}
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
|
||||
.get::<net_unix::UnixDatagramResource>(rid as u32)
|
||||
.ok_or_else(|| {
|
||||
custom_error("NotConnected", "Socket has been closed")
|
||||
})?;
|
||||
let socket = &mut resource.socket;
|
||||
let byte_length = socket
|
||||
.send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
|
||||
.await?;
|
||||
|
||||
let mut socket = RcRef::map(&resource, |r| &r.socket)
|
||||
.try_borrow_mut()
|
||||
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
|
||||
let byte_length = socket.send_to(&zero_copy, address_path).await?;
|
||||
Ok(json!(byte_length))
|
||||
}
|
||||
_ => Err(type_error("Wrong argument format!")),
|
||||
|
@ -279,12 +273,9 @@ async fn op_connect(
|
|||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
|
||||
let mut state_ = state.borrow_mut();
|
||||
let rid = state_.resource_table.add(
|
||||
"tcpStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
|
||||
tcp_stream,
|
||||
)))),
|
||||
);
|
||||
let rid = state_
|
||||
.resource_table
|
||||
.add(TcpStreamResource::new(tcp_stream.into_split()));
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
|
@ -317,12 +308,8 @@ async fn op_connect(
|
|||
let remote_addr = unix_stream.peer_addr()?;
|
||||
|
||||
let mut state_ = state.borrow_mut();
|
||||
let rid = state_.resource_table.add(
|
||||
"unixStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
|
||||
unix_stream,
|
||||
))),
|
||||
);
|
||||
let resource = StreamResource::unix_stream(unix_stream);
|
||||
let rid = state_.resource_table.add(resource);
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
|
@ -345,12 +332,12 @@ struct ShutdownArgs {
|
|||
how: i32,
|
||||
}
|
||||
|
||||
fn op_shutdown(
|
||||
state: &mut OpState,
|
||||
async fn op_shutdown(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
args: Value,
|
||||
_zero_copy: &mut [ZeroCopyBuf],
|
||||
_zero_copy: BufVec,
|
||||
) -> Result<Value, AnyError> {
|
||||
super::check_unstable(state, "Deno.shutdown");
|
||||
super::check_unstable2(&state, "Deno.shutdown");
|
||||
|
||||
let args: ShutdownArgs = serde_json::from_value(args)?;
|
||||
|
||||
|
@ -358,80 +345,61 @@ fn op_shutdown(
|
|||
let how = args.how;
|
||||
|
||||
let shutdown_mode = match how {
|
||||
0 => Shutdown::Read,
|
||||
0 => Shutdown::Read, // TODO: nonsense, remove me.
|
||||
1 => Shutdown::Write,
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
|
||||
let resource_holder = state
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get_mut::<StreamResourceHolder>(rid)
|
||||
.get_any(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
match resource_holder.resource {
|
||||
StreamResource::TcpStream(Some(ref mut stream)) => {
|
||||
TcpStream::shutdown(stream, shutdown_mode)?;
|
||||
}
|
||||
#[cfg(unix)]
|
||||
StreamResource::UnixStream(ref mut stream) => {
|
||||
net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
|
||||
}
|
||||
_ => return Err(bad_resource_id()),
|
||||
if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
|
||||
let wr = stream.wr_borrow_mut().await;
|
||||
TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
|
||||
return Ok(json!({}));
|
||||
}
|
||||
|
||||
Ok(json!({}))
|
||||
#[cfg(unix)]
|
||||
if let Some(stream) = resource.downcast_rc::<StreamResource>() {
|
||||
if stream.unix_stream.is_some() {
|
||||
let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
|
||||
.borrow_mut()
|
||||
.await;
|
||||
net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
|
||||
return Ok(json!({}));
|
||||
}
|
||||
}
|
||||
|
||||
Err(bad_resource_id())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct TcpListenerResource {
|
||||
listener: TcpListener,
|
||||
waker: Option<futures::task::AtomicWaker>,
|
||||
local_addr: SocketAddr,
|
||||
listener: AsyncRefCell<TcpListener>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Drop for TcpListenerResource {
|
||||
fn drop(&mut self) {
|
||||
self.wake_task();
|
||||
impl Resource for TcpListenerResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"tcpListener".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl TcpListenerResource {
|
||||
/// Track the current task so future awaiting for connection
|
||||
/// can be notified when listener is closed.
|
||||
///
|
||||
/// Throws an error if another task is already tracked.
|
||||
pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
|
||||
// Currently, we only allow tracking a single accept task for a listener.
|
||||
// This might be changed in the future with multiple workers.
|
||||
// Caveat: TcpListener by itself also only tracks an accept task at a time.
|
||||
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
|
||||
if self.waker.is_some() {
|
||||
return Err(custom_error("Busy", "Another accept task is ongoing"));
|
||||
}
|
||||
type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
|
||||
|
||||
let waker = futures::task::AtomicWaker::new();
|
||||
waker.register(cx.waker());
|
||||
self.waker.replace(waker);
|
||||
Ok(())
|
||||
impl Resource for UdpSocketResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"udpSocket".into()
|
||||
}
|
||||
|
||||
/// Notifies a task when listener is closed so accept future can resolve.
|
||||
pub fn wake_task(&mut self) {
|
||||
if let Some(waker) = self.waker.as_ref() {
|
||||
waker.wake();
|
||||
}
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel_read_ops()
|
||||
}
|
||||
|
||||
/// Stop tracking a task.
|
||||
/// Happens when the task is done and thus no further tracking is needed.
|
||||
pub fn untrack_task(&mut self) {
|
||||
if self.waker.is_some() {
|
||||
self.waker.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct UdpSocketResource {
|
||||
socket: UdpSocket,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -463,13 +431,10 @@ fn listen_tcp(
|
|||
let listener = TcpListener::from_std(std_listener)?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
let listener_resource = TcpListenerResource {
|
||||
listener,
|
||||
waker: None,
|
||||
local_addr,
|
||||
listener: AsyncRefCell::new(listener),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("tcpListener", Box::new(listener_resource));
|
||||
let rid = state.resource_table.add(listener_resource);
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
@ -481,10 +446,8 @@ fn listen_udp(
|
|||
let std_socket = std::net::UdpSocket::bind(&addr)?;
|
||||
let socket = UdpSocket::from_std(std_socket)?;
|
||||
let local_addr = socket.local_addr()?;
|
||||
let socket_resource = UdpSocketResource { socket };
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("udpSocket", Box::new(socket_resource));
|
||||
let socket_resource = UdpSocketResource::new(socket.split());
|
||||
let rid = state.resource_table.add(socket_resource);
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
|
|
@ -1,34 +1,59 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use crate::ops::io::StreamResource;
|
||||
use crate::ops::io::StreamResourceHolder;
|
||||
use crate::ops::net::AcceptArgs;
|
||||
use crate::ops::net::ReceiveArgs;
|
||||
use deno_core::error::bad_resource;
|
||||
use deno_core::error::custom_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::CancelTryFuture;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::fs::remove_file;
|
||||
use std::os::unix;
|
||||
use std::path::Path;
|
||||
use std::rc::Rc;
|
||||
use std::task::Poll;
|
||||
use tokio::net::UnixDatagram;
|
||||
use tokio::net::UnixListener;
|
||||
pub use tokio::net::UnixStream;
|
||||
|
||||
struct UnixListenerResource {
|
||||
listener: UnixListener,
|
||||
listener: AsyncRefCell<UnixListener>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for UnixListenerResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"unixListener".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UnixDatagramResource {
|
||||
pub socket: UnixDatagram,
|
||||
pub local_addr: unix::net::SocketAddr,
|
||||
pub socket: AsyncRefCell<UnixDatagram>,
|
||||
pub cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for UnixDatagramResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"unixDatagram".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -43,38 +68,23 @@ pub(crate) async fn accept_unix(
|
|||
) -> Result<Value, AnyError> {
|
||||
let rid = args.rid as u32;
|
||||
|
||||
let accept_fut = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let listener_resource = state
|
||||
.resource_table
|
||||
.get_mut::<UnixListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let listener = &mut listener_resource.listener;
|
||||
use deno_core::futures::StreamExt;
|
||||
match listener.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(stream)) => {
|
||||
//listener_resource.untrack_task();
|
||||
Poll::Ready(stream)
|
||||
}
|
||||
Poll::Ready(None) => todo!(),
|
||||
Poll::Pending => {
|
||||
//listener_resource.track_task(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
.map_err(AnyError::from)
|
||||
});
|
||||
let unix_stream = accept_fut.await?;
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<UnixListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let mut listener = RcRef::map(&resource, |r| &r.listener)
|
||||
.try_borrow_mut()
|
||||
.ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let (unix_stream, _socket_addr) =
|
||||
listener.accept().try_or_cancel(cancel).await?;
|
||||
|
||||
let local_addr = unix_stream.local_addr()?;
|
||||
let remote_addr = unix_stream.peer_addr()?;
|
||||
let resource = StreamResource::unix_stream(unix_stream);
|
||||
let mut state = state.borrow_mut();
|
||||
let rid = state.resource_table.add(
|
||||
"unixStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
|
||||
unix_stream,
|
||||
))),
|
||||
);
|
||||
let rid = state.resource_table.add(resource);
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
|
@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet(
|
|||
let rid = args.rid as u32;
|
||||
let mut buf = bufs.into_iter().next().unwrap();
|
||||
|
||||
let mut state = state.borrow_mut();
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get_mut::<UnixDatagramResource>(rid)
|
||||
.get::<UnixDatagramResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Socket has been closed"))?;
|
||||
let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
|
||||
let mut socket = RcRef::map(&resource, |r| &r.socket)
|
||||
.try_borrow_mut()
|
||||
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let (size, remote_addr) =
|
||||
socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
|
||||
Ok(json!({
|
||||
"size": size,
|
||||
"remoteAddr": {
|
||||
|
@ -122,10 +137,11 @@ pub fn listen_unix(
|
|||
}
|
||||
let listener = UnixListener::bind(&addr)?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
let listener_resource = UnixListenerResource { listener };
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("unixListener", Box::new(listener_resource));
|
||||
let listener_resource = UnixListenerResource {
|
||||
listener: AsyncRefCell::new(listener),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let rid = state.resource_table.add(listener_resource);
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
@ -140,12 +156,10 @@ pub fn listen_unix_packet(
|
|||
let socket = UnixDatagram::bind(&addr)?;
|
||||
let local_addr = socket.local_addr()?;
|
||||
let datagram_resource = UnixDatagramResource {
|
||||
socket,
|
||||
local_addr: local_addr.clone(),
|
||||
socket: AsyncRefCell::new(socket),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("unixDatagram", Box::new(datagram_resource));
|
||||
let rid = state.resource_table.add(datagram_resource);
|
||||
|
||||
Ok((rid, local_addr))
|
||||
}
|
||||
|
|
|
@ -14,9 +14,11 @@ use deno_core::Op;
|
|||
use deno_core::OpAsyncFuture;
|
||||
use deno_core::OpId;
|
||||
use deno_core::OpState;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use dlopen::symbor::Library;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
|
@ -53,9 +55,7 @@ pub fn op_open_plugin(
|
|||
let rid;
|
||||
let deno_plugin_init;
|
||||
{
|
||||
rid = state
|
||||
.resource_table
|
||||
.add("plugin", Box::new(plugin_resource));
|
||||
rid = state.resource_table.add(plugin_resource);
|
||||
deno_plugin_init = *unsafe {
|
||||
state
|
||||
.resource_table
|
||||
|
@ -77,6 +77,12 @@ struct PluginResource {
|
|||
lib: Rc<Library>,
|
||||
}
|
||||
|
||||
impl Resource for PluginResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"plugin".into()
|
||||
}
|
||||
}
|
||||
|
||||
impl PluginResource {
|
||||
fn new(lib: &Rc<Library>) -> Self {
|
||||
Self { lib: lib.clone() }
|
||||
|
|
|
@ -1,19 +1,22 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use super::io::{std_file_resource, StreamResource, StreamResourceHolder};
|
||||
use super::io::{std_file_resource, StreamResource};
|
||||
use crate::permissions::Permissions;
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::futures::future::FutureExt;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::AsyncMutFuture;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use tokio::process::Command;
|
||||
|
@ -61,7 +64,19 @@ struct RunArgs {
|
|||
}
|
||||
|
||||
struct ChildResource {
|
||||
child: tokio::process::Child,
|
||||
child: AsyncRefCell<tokio::process::Child>,
|
||||
}
|
||||
|
||||
impl Resource for ChildResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"child".into()
|
||||
}
|
||||
}
|
||||
|
||||
impl ChildResource {
|
||||
fn borrow_mut(self: Rc<Self>) -> AsyncMutFuture<tokio::process::Child> {
|
||||
RcRef::map(self, |r| &r.child).borrow_mut()
|
||||
}
|
||||
}
|
||||
|
||||
fn op_run(
|
||||
|
@ -117,12 +132,9 @@ fn op_run(
|
|||
|
||||
let stdin_rid = match child.stdin.take() {
|
||||
Some(child_stdin) => {
|
||||
let rid = state.resource_table.add(
|
||||
"childStdin",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
|
||||
child_stdin,
|
||||
))),
|
||||
);
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add(StreamResource::child_stdin(child_stdin));
|
||||
Some(rid)
|
||||
}
|
||||
None => None,
|
||||
|
@ -130,12 +142,9 @@ fn op_run(
|
|||
|
||||
let stdout_rid = match child.stdout.take() {
|
||||
Some(child_stdout) => {
|
||||
let rid = state.resource_table.add(
|
||||
"childStdout",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
|
||||
child_stdout,
|
||||
))),
|
||||
);
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add(StreamResource::child_stdout(child_stdout));
|
||||
Some(rid)
|
||||
}
|
||||
None => None,
|
||||
|
@ -143,19 +152,18 @@ fn op_run(
|
|||
|
||||
let stderr_rid = match child.stderr.take() {
|
||||
Some(child_stderr) => {
|
||||
let rid = state.resource_table.add(
|
||||
"childStderr",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
|
||||
child_stderr,
|
||||
))),
|
||||
);
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add(StreamResource::child_stderr(child_stderr));
|
||||
Some(rid)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let child_resource = ChildResource { child };
|
||||
let child_rid = state.resource_table.add("child", Box::new(child_resource));
|
||||
let child_resource = ChildResource {
|
||||
child: AsyncRefCell::new(child),
|
||||
};
|
||||
let child_rid = state.resource_table.add(child_resource);
|
||||
|
||||
Ok(json!({
|
||||
"rid": child_rid,
|
||||
|
@ -185,17 +193,13 @@ async fn op_run_status(
|
|||
s.borrow::<Permissions>().check_run()?;
|
||||
}
|
||||
|
||||
let run_status = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let child_resource = state
|
||||
.resource_table
|
||||
.get_mut::<ChildResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let child = &mut child_resource.child;
|
||||
child.poll_unpin(cx).map_err(AnyError::from)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<ChildResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let mut child = resource.borrow_mut().await;
|
||||
let run_status = (&mut *child).await?;
|
||||
let code = run_status.code();
|
||||
|
||||
#[cfg(unix)]
|
||||
|
|
|
@ -11,15 +11,23 @@ use std::rc::Rc;
|
|||
#[cfg(unix)]
|
||||
use deno_core::error::bad_resource_id;
|
||||
#[cfg(unix)]
|
||||
use deno_core::futures::future::poll_fn;
|
||||
#[cfg(unix)]
|
||||
use deno_core::serde_json;
|
||||
#[cfg(unix)]
|
||||
use deno_core::serde_json::json;
|
||||
#[cfg(unix)]
|
||||
use deno_core::AsyncRefCell;
|
||||
#[cfg(unix)]
|
||||
use deno_core::CancelFuture;
|
||||
#[cfg(unix)]
|
||||
use deno_core::CancelHandle;
|
||||
#[cfg(unix)]
|
||||
use deno_core::RcRef;
|
||||
#[cfg(unix)]
|
||||
use deno_core::Resource;
|
||||
#[cfg(unix)]
|
||||
use serde::Deserialize;
|
||||
#[cfg(unix)]
|
||||
use std::task::Waker;
|
||||
use std::borrow::Cow;
|
||||
#[cfg(unix)]
|
||||
use tokio::signal::unix::{signal, Signal, SignalKind};
|
||||
|
||||
|
@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
|
|||
#[cfg(unix)]
|
||||
/// The resource for signal stream.
|
||||
/// The second element is the waker of polling future.
|
||||
pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
|
||||
struct SignalStreamResource {
|
||||
signal: AsyncRefCell<Signal>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl Resource for SignalStreamResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"signal".into()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[derive(Deserialize)]
|
||||
|
@ -54,13 +76,13 @@ fn op_signal_bind(
|
|||
) -> Result<Value, AnyError> {
|
||||
super::check_unstable(state, "Deno.signal");
|
||||
let args: BindSignalArgs = serde_json::from_value(args)?;
|
||||
let rid = state.resource_table.add(
|
||||
"signal",
|
||||
Box::new(SignalStreamResource(
|
||||
let resource = SignalStreamResource {
|
||||
signal: AsyncRefCell::new(
|
||||
signal(SignalKind::from_raw(args.signo)).expect(""),
|
||||
None,
|
||||
)),
|
||||
);
|
||||
),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let rid = state.resource_table.add(resource);
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
}))
|
||||
|
@ -76,18 +98,18 @@ async fn op_signal_poll(
|
|||
let args: SignalArgs = serde_json::from_value(args)?;
|
||||
let rid = args.rid as u32;
|
||||
|
||||
let future = poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
if let Some(mut signal) =
|
||||
state.resource_table.get_mut::<SignalStreamResource>(rid)
|
||||
{
|
||||
signal.1 = Some(cx.waker().clone());
|
||||
return signal.0.poll_recv(cx);
|
||||
}
|
||||
std::task::Poll::Ready(None)
|
||||
});
|
||||
let result = future.await;
|
||||
Ok(json!({ "done": result.is_none() }))
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<SignalStreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let cancel = RcRef::map(&resource, |r| &r.cancel);
|
||||
let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await;
|
||||
|
||||
match signal.recv().or_cancel(cancel).await {
|
||||
Ok(result) => Ok(json!({ "done": result.is_none() })),
|
||||
Err(_) => Ok(json!({ "done": true })),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
|
@ -99,14 +121,6 @@ pub fn op_signal_unbind(
|
|||
super::check_unstable(state, "Deno.signal");
|
||||
let args: SignalArgs = serde_json::from_value(args)?;
|
||||
let rid = args.rid as u32;
|
||||
let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
|
||||
if let Some(signal) = resource {
|
||||
if let Some(waker) = &signal.1 {
|
||||
// Wakes up the pending poll if exists.
|
||||
// This prevents the poll future from getting stuck forever.
|
||||
waker.clone().wake();
|
||||
}
|
||||
}
|
||||
state
|
||||
.resource_table
|
||||
.close(rid)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use super::io::{StreamResource, StreamResourceHolder};
|
||||
use super::io::StreamResource;
|
||||
use super::io::TcpStreamResource;
|
||||
use crate::permissions::Permissions;
|
||||
use crate::resolve_addr::resolve_addr;
|
||||
use crate::resolve_addr::resolve_addr_sync;
|
||||
|
@ -8,25 +9,26 @@ use deno_core::error::bad_resource;
|
|||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::custom_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::CancelTryFuture;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use serde::Deserialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::From;
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
|
||||
|
@ -85,60 +87,53 @@ async fn op_start_tls(
|
|||
permissions.check_read(Path::new(&path))?;
|
||||
}
|
||||
}
|
||||
let mut resource_holder = {
|
||||
let mut state_ = state.borrow_mut();
|
||||
match state_.resource_table.remove::<StreamResourceHolder>(rid) {
|
||||
Some(resource) => *resource,
|
||||
None => return Err(bad_resource_id()),
|
||||
}
|
||||
};
|
||||
|
||||
if let StreamResource::TcpStream(ref mut tcp_stream) =
|
||||
resource_holder.resource
|
||||
{
|
||||
let tcp_stream = tcp_stream.take().unwrap();
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
if let Some(path) = cert_file {
|
||||
let key_file = File::open(path)?;
|
||||
let reader = &mut BufReader::new(key_file);
|
||||
config.root_store.add_pem_file(reader).unwrap();
|
||||
}
|
||||
let resource_rc = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<TcpStreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let resource = Rc::try_unwrap(resource_rc)
|
||||
.expect("Only a single use of this resource should happen");
|
||||
let (read_half, write_half) = resource.into_inner();
|
||||
let tcp_stream = read_half.reunite(write_half)?;
|
||||
|
||||
let tls_connector = TlsConnector::from(Arc::new(config));
|
||||
let dnsname =
|
||||
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
|
||||
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
|
||||
|
||||
let rid = {
|
||||
let mut state_ = state.borrow_mut();
|
||||
state_.resource_table.add(
|
||||
"clientTlsStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
|
||||
Box::new(tls_stream),
|
||||
))),
|
||||
)
|
||||
};
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
"hostname": local_addr.ip().to_string(),
|
||||
"port": local_addr.port(),
|
||||
"transport": "tcp",
|
||||
},
|
||||
"remoteAddr": {
|
||||
"hostname": remote_addr.ip().to_string(),
|
||||
"port": remote_addr.port(),
|
||||
"transport": "tcp",
|
||||
}
|
||||
}))
|
||||
} else {
|
||||
Err(bad_resource_id())
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
if let Some(path) = cert_file {
|
||||
let key_file = File::open(path)?;
|
||||
let reader = &mut BufReader::new(key_file);
|
||||
config.root_store.add_pem_file(reader).unwrap();
|
||||
}
|
||||
|
||||
let tls_connector = TlsConnector::from(Arc::new(config));
|
||||
let dnsname =
|
||||
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
|
||||
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
|
||||
|
||||
let rid = {
|
||||
let mut state_ = state.borrow_mut();
|
||||
state_
|
||||
.resource_table
|
||||
.add(StreamResource::client_tls_stream(tls_stream))
|
||||
};
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
"hostname": local_addr.ip().to_string(),
|
||||
"port": local_addr.port(),
|
||||
"transport": "tcp",
|
||||
},
|
||||
"remoteAddr": {
|
||||
"hostname": remote_addr.ip().to_string(),
|
||||
"port": remote_addr.port(),
|
||||
"transport": "tcp",
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
async fn op_connect_tls(
|
||||
|
@ -180,12 +175,9 @@ async fn op_connect_tls(
|
|||
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
|
||||
let rid = {
|
||||
let mut state_ = state.borrow_mut();
|
||||
state_.resource_table.add(
|
||||
"clientTlsStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
|
||||
Box::new(tls_stream),
|
||||
))),
|
||||
)
|
||||
state_
|
||||
.resource_table
|
||||
.add(StreamResource::client_tls_stream(tls_stream))
|
||||
};
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
|
@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
|
|||
Ok(keys)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct TlsListenerResource {
|
||||
listener: TcpListener,
|
||||
listener: AsyncRefCell<TcpListener>,
|
||||
tls_acceptor: TlsAcceptor,
|
||||
waker: Option<futures::task::AtomicWaker>,
|
||||
local_addr: SocketAddr,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Drop for TlsListenerResource {
|
||||
fn drop(&mut self) {
|
||||
self.wake_task();
|
||||
}
|
||||
}
|
||||
|
||||
impl TlsListenerResource {
|
||||
/// Track the current task so future awaiting for connection
|
||||
/// can be notified when listener is closed.
|
||||
///
|
||||
/// Throws an error if another task is already tracked.
|
||||
pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
|
||||
// Currently, we only allow tracking a single accept task for a listener.
|
||||
// This might be changed in the future with multiple workers.
|
||||
// Caveat: TcpListener by itself also only tracks an accept task at a time.
|
||||
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
|
||||
if self.waker.is_some() {
|
||||
return Err(custom_error("Busy", "Another accept task is ongoing"));
|
||||
}
|
||||
|
||||
let waker = futures::task::AtomicWaker::new();
|
||||
waker.register(cx.waker());
|
||||
self.waker.replace(waker);
|
||||
Ok(())
|
||||
impl Resource for TlsListenerResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"tlsListener".into()
|
||||
}
|
||||
|
||||
/// Notifies a task when listener is closed so accept future can resolve.
|
||||
pub fn wake_task(&mut self) {
|
||||
if let Some(waker) = self.waker.as_ref() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop tracking a task.
|
||||
/// Happens when the task is done and thus no further tracking is needed.
|
||||
pub fn untrack_task(&mut self) {
|
||||
self.waker.take();
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,15 +300,12 @@ fn op_listen_tls(
|
|||
let listener = TcpListener::from_std(std_listener)?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
let tls_listener_resource = TlsListenerResource {
|
||||
listener,
|
||||
listener: AsyncRefCell::new(listener),
|
||||
tls_acceptor,
|
||||
waker: None,
|
||||
local_addr,
|
||||
cancel: Default::default(),
|
||||
};
|
||||
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("tlsListener", Box::new(tls_listener_resource));
|
||||
let rid = state.resource_table.add(tls_listener_resource);
|
||||
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
|
@ -372,50 +329,46 @@ async fn op_accept_tls(
|
|||
) -> Result<Value, AnyError> {
|
||||
let args: AcceptTlsArgs = serde_json::from_value(args)?;
|
||||
let rid = args.rid as u32;
|
||||
let accept_fut = poll_fn(|cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let listener_resource = state
|
||||
.resource_table
|
||||
.get_mut::<TlsListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let listener = &mut listener_resource.listener;
|
||||
match listener.poll_accept(cx).map_err(AnyError::from) {
|
||||
Poll::Ready(Ok((stream, addr))) => {
|
||||
listener_resource.untrack_task();
|
||||
Poll::Ready(Ok((stream, addr)))
|
||||
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<TlsListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let mut listener = RcRef::map(&resource, |r| &r.listener)
|
||||
.try_borrow_mut()
|
||||
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let (tcp_stream, _socket_addr) =
|
||||
listener.accept().try_or_cancel(cancel).await.map_err(|e| {
|
||||
// FIXME(bartlomieju): compatibility with current JS implementation
|
||||
if let std::io::ErrorKind::Interrupted = e.kind() {
|
||||
bad_resource("Listener has been closed")
|
||||
} else {
|
||||
e.into()
|
||||
}
|
||||
Poll::Pending => {
|
||||
listener_resource.track_task(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
listener_resource.untrack_task();
|
||||
Poll::Ready(Err(e))
|
||||
}
|
||||
}
|
||||
});
|
||||
let (tcp_stream, _socket_addr) = accept_fut.await?;
|
||||
})?;
|
||||
let local_addr = tcp_stream.local_addr()?;
|
||||
let remote_addr = tcp_stream.peer_addr()?;
|
||||
let tls_acceptor = {
|
||||
let state_ = state.borrow();
|
||||
let resource = state_
|
||||
.resource_table
|
||||
.get::<TlsListenerResource>(rid)
|
||||
.ok_or_else(bad_resource_id)
|
||||
.expect("Can't find tls listener");
|
||||
resource.tls_acceptor.clone()
|
||||
};
|
||||
let tls_stream = tls_acceptor.accept(tcp_stream).await?;
|
||||
let resource = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<TlsListenerResource>(rid)
|
||||
.ok_or_else(|| bad_resource("Listener has been closed"))?;
|
||||
let cancel = RcRef::map(&resource, |r| &r.cancel);
|
||||
let tls_acceptor = resource.tls_acceptor.clone();
|
||||
let tls_stream = tls_acceptor
|
||||
.accept(tcp_stream)
|
||||
.try_or_cancel(cancel)
|
||||
.await?;
|
||||
|
||||
let rid = {
|
||||
let mut state_ = state.borrow_mut();
|
||||
state_.resource_table.add(
|
||||
"serverTlsStream",
|
||||
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
|
||||
Box::new(tls_stream),
|
||||
))),
|
||||
)
|
||||
state_
|
||||
.resource_table
|
||||
.add(StreamResource::server_tls_stream(tls_stream))
|
||||
};
|
||||
|
||||
Ok(json!({
|
||||
"rid": rid,
|
||||
"localAddr": {
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
use super::io::std_file_resource;
|
||||
use super::io::StreamResource;
|
||||
use super::io::StreamResourceHolder;
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::not_supported;
|
||||
use deno_core::error::resource_unavailable;
|
||||
|
@ -11,6 +10,7 @@ use deno_core::serde_json;
|
|||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
@ -88,48 +88,47 @@ fn op_set_raw(
|
|||
use winapi::shared::minwindef::FALSE;
|
||||
use winapi::um::{consoleapi, handleapi};
|
||||
|
||||
let resource_holder =
|
||||
state.resource_table.get_mut::<StreamResourceHolder>(rid);
|
||||
if resource_holder.is_none() {
|
||||
return Err(bad_resource_id());
|
||||
}
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get::<StreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
if cbreak {
|
||||
return Err(not_supported());
|
||||
}
|
||||
let resource_holder = resource_holder.unwrap();
|
||||
|
||||
// For now, only stdin.
|
||||
let handle = match &mut resource_holder.resource {
|
||||
StreamResource::FsFile(ref mut option_file_metadata) => {
|
||||
if let Some((tokio_file, metadata)) = option_file_metadata.take() {
|
||||
match tokio_file.try_into_std() {
|
||||
Ok(std_file) => {
|
||||
let raw_handle = std_file.as_raw_handle();
|
||||
// Turn the std_file handle back into a tokio file, put it back
|
||||
// in the resource table.
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
resource_holder.resource =
|
||||
StreamResource::FsFile(Some((tokio_file, metadata)));
|
||||
// return the result.
|
||||
raw_handle
|
||||
}
|
||||
Err(tokio_file) => {
|
||||
// This function will return an error containing the file if
|
||||
// some operation is in-flight.
|
||||
resource_holder.resource =
|
||||
StreamResource::FsFile(Some((tokio_file, metadata)));
|
||||
return Err(resource_unavailable());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(resource_unavailable());
|
||||
if resource.fs_file.is_none() {
|
||||
return Err(bad_resource_id());
|
||||
}
|
||||
|
||||
let fs_file_resource =
|
||||
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
|
||||
|
||||
let handle_result = if let Some(mut fs_file) = fs_file_resource {
|
||||
let tokio_file = fs_file.0.take().unwrap();
|
||||
match tokio_file.try_into_std() {
|
||||
Ok(std_file) => {
|
||||
let raw_handle = std_file.as_raw_handle();
|
||||
// Turn the std_file handle back into a tokio file, put it back
|
||||
// in the resource table.
|
||||
let tokio_file = tokio::fs::File::from_std(std_file);
|
||||
fs_file.0 = Some(tokio_file);
|
||||
// return the result.
|
||||
Ok(raw_handle)
|
||||
}
|
||||
Err(tokio_file) => {
|
||||
// This function will return an error containing the file if
|
||||
// some operation is in-flight.
|
||||
fs_file.0 = Some(tokio_file);
|
||||
Err(resource_unavailable())
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(bad_resource_id());
|
||||
}
|
||||
} else {
|
||||
Err(resource_unavailable())
|
||||
};
|
||||
|
||||
let handle = handle_result?;
|
||||
|
||||
if handle == handleapi::INVALID_HANDLE_VALUE {
|
||||
return Err(Error::last_os_error().into());
|
||||
} else if handle.is_null() {
|
||||
|
@ -156,24 +155,31 @@ fn op_set_raw(
|
|||
{
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
let resource_holder =
|
||||
state.resource_table.get_mut::<StreamResourceHolder>(rid);
|
||||
if resource_holder.is_none() {
|
||||
return Err(bad_resource_id());
|
||||
let resource = state
|
||||
.resource_table
|
||||
.get::<StreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
if resource.fs_file.is_none() {
|
||||
return Err(not_supported());
|
||||
}
|
||||
|
||||
if is_raw {
|
||||
let (raw_fd, maybe_tty_mode) =
|
||||
match &mut resource_holder.unwrap().resource {
|
||||
StreamResource::FsFile(Some((f, ref mut metadata))) => {
|
||||
(f.as_raw_fd(), &mut metadata.tty.mode)
|
||||
}
|
||||
StreamResource::FsFile(None) => return Err(resource_unavailable()),
|
||||
_ => {
|
||||
return Err(not_supported());
|
||||
}
|
||||
};
|
||||
let maybe_fs_file_resource =
|
||||
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
|
||||
|
||||
if maybe_fs_file_resource.is_none() {
|
||||
return Err(resource_unavailable());
|
||||
}
|
||||
|
||||
let mut fs_file_resource = maybe_fs_file_resource.unwrap();
|
||||
if fs_file_resource.0.is_none() {
|
||||
return Err(resource_unavailable());
|
||||
}
|
||||
|
||||
let raw_fd = fs_file_resource.0.as_ref().unwrap().as_raw_fd();
|
||||
let maybe_tty_mode = &mut fs_file_resource.1.as_mut().unwrap().tty.mode;
|
||||
|
||||
if is_raw {
|
||||
if maybe_tty_mode.is_none() {
|
||||
// Save original mode.
|
||||
let original_mode = termios::tcgetattr(raw_fd)?;
|
||||
|
@ -199,28 +205,14 @@ fn op_set_raw(
|
|||
raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1;
|
||||
raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0;
|
||||
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
|
||||
Ok(json!({}))
|
||||
} else {
|
||||
// Try restore saved mode.
|
||||
let (raw_fd, maybe_tty_mode) =
|
||||
match &mut resource_holder.unwrap().resource {
|
||||
StreamResource::FsFile(Some((f, ref mut metadata))) => {
|
||||
(f.as_raw_fd(), &mut metadata.tty.mode)
|
||||
}
|
||||
StreamResource::FsFile(None) => {
|
||||
return Err(resource_unavailable());
|
||||
}
|
||||
_ => {
|
||||
return Err(bad_resource_id());
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(mode) = maybe_tty_mode.take() {
|
||||
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
|
||||
}
|
||||
|
||||
Ok(json!({}))
|
||||
}
|
||||
|
||||
Ok(json!({}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,7 +247,6 @@ fn op_isatty(
|
|||
Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
|
||||
}
|
||||
}
|
||||
Err(StreamResource::FsFile(_)) => unreachable!(),
|
||||
_ => Ok(false),
|
||||
})?;
|
||||
Ok(json!(isatty))
|
||||
|
|
|
@ -1,18 +1,23 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use crate::permissions::Permissions;
|
||||
use core::task::Poll;
|
||||
use deno_core::error::bad_resource_id;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use deno_core::futures::stream::SplitSink;
|
||||
use deno_core::futures::stream::SplitStream;
|
||||
use deno_core::futures::SinkExt;
|
||||
use deno_core::futures::StreamExt;
|
||||
use deno_core::futures::{ready, SinkExt};
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::url;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::BufVec;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::{serde_json, ZeroCopyBuf};
|
||||
use http::{Method, Request, Uri};
|
||||
use serde::Deserialize;
|
||||
|
@ -62,6 +67,22 @@ type MaybeTlsStream =
|
|||
StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
|
||||
|
||||
type WsStream = WebSocketStream<MaybeTlsStream>;
|
||||
struct WsStreamResource {
|
||||
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
|
||||
rx: AsyncRefCell<SplitStream<WsStream>>,
|
||||
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
|
||||
// canceled, while 'write' ops are allowed to complete. Therefore only
|
||||
// 'read' futures are attached to this cancel handle.
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
||||
impl Resource for WsStreamResource {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"webSocketStream".into()
|
||||
}
|
||||
}
|
||||
|
||||
impl WsStreamResource {}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
|
@ -165,10 +186,14 @@ pub async fn op_ws_create(
|
|||
))
|
||||
})?;
|
||||
|
||||
let (ws_tx, ws_rx) = stream.split();
|
||||
let resource = WsStreamResource {
|
||||
rx: AsyncRefCell::new(ws_rx),
|
||||
tx: AsyncRefCell::new(ws_tx),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
let mut state = state.borrow_mut();
|
||||
let rid = state
|
||||
.resource_table
|
||||
.add("webSocketStream", Box::new(stream));
|
||||
let rid = state.resource_table.add(resource);
|
||||
|
||||
let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
|
||||
Some(header) => header.to_str().unwrap(),
|
||||
|
@ -202,30 +227,21 @@ pub async fn op_ws_send(
|
|||
) -> Result<Value, AnyError> {
|
||||
let args: SendArgs = serde_json::from_value(args)?;
|
||||
|
||||
let mut maybe_msg = Some(match args.text {
|
||||
let msg = match args.text {
|
||||
Some(text) => Message::Text(text),
|
||||
None => Message::Binary(bufs[0].to_vec()),
|
||||
});
|
||||
};
|
||||
let rid = args.rid;
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let stream = state
|
||||
.resource_table
|
||||
.get_mut::<WsStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
// TODO(ry) Handle errors below instead of unwrap.
|
||||
// Need to map `TungsteniteError` to `AnyError`.
|
||||
ready!(stream.poll_ready_unpin(cx)).unwrap();
|
||||
if let Some(msg) = maybe_msg.take() {
|
||||
stream.start_send_unpin(msg).unwrap();
|
||||
}
|
||||
ready!(stream.poll_flush_unpin(cx)).unwrap();
|
||||
|
||||
Poll::Ready(Ok(json!({})))
|
||||
})
|
||||
.await
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<WsStreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
|
||||
tx.send(msg).await?;
|
||||
eprintln!("sent!");
|
||||
Ok(json!({}))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -243,33 +259,22 @@ pub async fn op_ws_close(
|
|||
) -> Result<Value, AnyError> {
|
||||
let args: CloseArgs = serde_json::from_value(args)?;
|
||||
let rid = args.rid;
|
||||
let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
|
||||
let msg = Message::Close(args.code.map(|c| CloseFrame {
|
||||
code: CloseCode::from(c),
|
||||
reason: match args.reason {
|
||||
Some(reason) => Cow::from(reason),
|
||||
None => Default::default(),
|
||||
},
|
||||
})));
|
||||
}));
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let stream = state
|
||||
.resource_table
|
||||
.get_mut::<WsStream>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
// TODO(ry) Handle errors below instead of unwrap.
|
||||
// Need to map `TungsteniteError` to `AnyError`.
|
||||
ready!(stream.poll_ready_unpin(cx)).unwrap();
|
||||
if let Some(msg) = maybe_msg.take() {
|
||||
stream.start_send_unpin(msg).unwrap();
|
||||
}
|
||||
ready!(stream.poll_flush_unpin(cx)).unwrap();
|
||||
ready!(stream.poll_close_unpin(cx)).unwrap();
|
||||
|
||||
Poll::Ready(Ok(json!({})))
|
||||
})
|
||||
.await
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<WsStreamResource>(rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
|
||||
tx.send(msg).await?;
|
||||
Ok(json!({}))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -284,43 +289,41 @@ pub async fn op_ws_next_event(
|
|||
_bufs: BufVec,
|
||||
) -> Result<Value, AnyError> {
|
||||
let args: NextEventArgs = serde_json::from_value(args)?;
|
||||
poll_fn(move |cx| {
|
||||
let mut state = state.borrow_mut();
|
||||
let stream = state
|
||||
.resource_table
|
||||
.get_mut::<WsStream>(args.rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
stream
|
||||
.poll_next_unpin(cx)
|
||||
.map(|val| {
|
||||
match val {
|
||||
Some(Ok(Message::Text(text))) => json!({
|
||||
"type": "string",
|
||||
"data": text
|
||||
}),
|
||||
Some(Ok(Message::Binary(data))) => {
|
||||
// TODO(ry): don't use json to send binary data.
|
||||
json!({
|
||||
"type": "binary",
|
||||
"data": data
|
||||
})
|
||||
}
|
||||
Some(Ok(Message::Close(Some(frame)))) => json!({
|
||||
"type": "close",
|
||||
"code": u16::from(frame.code),
|
||||
"reason": frame.reason.as_ref()
|
||||
}),
|
||||
Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
|
||||
Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
|
||||
Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
|
||||
Some(Err(_)) => json!({"type": "error"}),
|
||||
None => {
|
||||
state.resource_table.close(args.rid).unwrap();
|
||||
json!({"type": "closed"})
|
||||
}
|
||||
}
|
||||
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<WsStreamResource>(args.rid)
|
||||
.ok_or_else(bad_resource_id)?;
|
||||
|
||||
let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
|
||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
||||
let val = rx.next().or_cancel(cancel).await?;
|
||||
let res = match val {
|
||||
Some(Ok(Message::Text(text))) => json!({
|
||||
"type": "string",
|
||||
"data": text
|
||||
}),
|
||||
Some(Ok(Message::Binary(data))) => {
|
||||
// TODO(ry): don't use json to send binary data.
|
||||
json!({
|
||||
"type": "binary",
|
||||
"data": data
|
||||
})
|
||||
.map(Ok)
|
||||
})
|
||||
.await
|
||||
}
|
||||
Some(Ok(Message::Close(Some(frame)))) => json!({
|
||||
"type": "close",
|
||||
"code": u16::from(frame.code),
|
||||
"reason": frame.reason.as_ref()
|
||||
}),
|
||||
Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
|
||||
Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
|
||||
Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
|
||||
Some(Err(_)) => json!({"type": "error"}),
|
||||
None => {
|
||||
state.borrow_mut().resource_table.close(args.rid).unwrap();
|
||||
json!({"type": "closed"})
|
||||
}
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
|
|
|
@ -11,20 +11,16 @@
|
|||
0: "Read",
|
||||
1: "Write",
|
||||
2: "ReadWrite",
|
||||
Read: 0,
|
||||
Read: 0, // TODO: nonsense, remove me.
|
||||
Write: 1,
|
||||
ReadWrite: 2, // unused
|
||||
};
|
||||
|
||||
function shutdown(rid, how) {
|
||||
core.jsonOpSync("op_shutdown", { rid, how });
|
||||
return Promise.resolve();
|
||||
return core.jsonOpAsync("op_shutdown", { rid, how });
|
||||
}
|
||||
|
||||
function opAccept(
|
||||
rid,
|
||||
transport,
|
||||
) {
|
||||
function opAccept(rid, transport) {
|
||||
return core.jsonOpAsync("op_accept", { rid, transport });
|
||||
}
|
||||
|
||||
|
@ -36,11 +32,7 @@
|
|||
return core.jsonOpAsync("op_connect", args);
|
||||
}
|
||||
|
||||
function opReceive(
|
||||
rid,
|
||||
transport,
|
||||
zeroCopy,
|
||||
) {
|
||||
function opReceive(rid, transport, zeroCopy) {
|
||||
return core.jsonOpAsync(
|
||||
"op_datagram_receive",
|
||||
{ rid, transport },
|
||||
|
@ -56,11 +48,7 @@
|
|||
#rid = 0;
|
||||
#remoteAddr = null;
|
||||
#localAddr = null;
|
||||
constructor(
|
||||
rid,
|
||||
remoteAddr,
|
||||
localAddr,
|
||||
) {
|
||||
constructor(rid, remoteAddr, localAddr) {
|
||||
this.#rid = rid;
|
||||
this.#remoteAddr = remoteAddr;
|
||||
this.#localAddr = localAddr;
|
||||
|
@ -149,11 +137,7 @@
|
|||
#rid = 0;
|
||||
#addr = null;
|
||||
|
||||
constructor(
|
||||
rid,
|
||||
addr,
|
||||
bufSize = 1024,
|
||||
) {
|
||||
constructor(rid, addr, bufSize = 1024) {
|
||||
this.#rid = rid;
|
||||
this.#addr = addr;
|
||||
this.bufSize = bufSize;
|
||||
|
@ -213,9 +197,7 @@
|
|||
return new Listener(res.rid, res.localAddr);
|
||||
}
|
||||
|
||||
async function connect(
|
||||
options,
|
||||
) {
|
||||
async function connect(options) {
|
||||
let res;
|
||||
|
||||
if (options.transport === "unix") {
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
} catch (error) {
|
||||
if (error instanceof errors.BadResource) {
|
||||
return { value: undefined, done: true };
|
||||
} else if (error instanceof errors.Interrupted) {
|
||||
return { value: undefined, done: true };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const { build } = window.__bootstrap.build;
|
||||
const { errors } = window.__bootstrap.errors;
|
||||
|
||||
function bindSignal(signo) {
|
||||
return core.jsonOpSync("op_signal_bind", { signo });
|
||||
|
@ -212,7 +213,15 @@
|
|||
}
|
||||
|
||||
#pollSignal = async () => {
|
||||
const res = await pollSignal(this.#rid);
|
||||
let res;
|
||||
try {
|
||||
res = await pollSignal(this.#rid);
|
||||
} catch (error) {
|
||||
if (error instanceof errors.BadResource) {
|
||||
return true;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
return res.done;
|
||||
};
|
||||
|
||||
|
|
|
@ -256,15 +256,16 @@ impl WebWorker {
|
|||
|
||||
let op_state = js_runtime.op_state();
|
||||
let mut op_state = op_state.borrow_mut();
|
||||
let t = &mut op_state.resource_table;
|
||||
let (stdin, stdout, stderr) = ops::io::get_stdio();
|
||||
if let Some(stream) = stdin {
|
||||
op_state.resource_table.add("stdin", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
if let Some(stream) = stdout {
|
||||
op_state.resource_table.add("stdout", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
if let Some(stream) = stderr {
|
||||
op_state.resource_table.add("stderr", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -152,13 +152,13 @@ impl MainWorker {
|
|||
let t = &mut op_state.resource_table;
|
||||
let (stdin, stdout, stderr) = ops::io::get_stdio();
|
||||
if let Some(stream) = stdin {
|
||||
t.add("stdin", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
if let Some(stream) = stdout {
|
||||
t.add("stdout", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
if let Some(stream) = stderr {
|
||||
t.add("stderr", Box::new(stream));
|
||||
t.add(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue