1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

Move resource_table from deno::State to deno_core::Isolate (#4834)

This commit is contained in:
Ryan Dahl 2020-04-21 09:48:44 -04:00 committed by GitHub
parent ef6ee25e09
commit cc1720132a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 298 additions and 263 deletions

View file

@ -69,6 +69,8 @@ target/debug/deno -A cli/js/tests/unit_test_runner.ts --worker --addr=127.0.0.1:
# Run specific tests
target/debug/deno --allow-net cli/js/tests/unit_test_runner.ts -- netTcpListenClose
RUST_BACKTRACE=1 cargo run -- --allow-read --allow-write cli/js/tests/unit_test_runner.ts -- netUnixDialListen
```
### Http server

View file

@ -140,19 +140,20 @@ fn create_main_worker(
) -> Result<MainWorker, ErrBox> {
let state = State::new(global_state, None, main_module, DebugType::Main)?;
{
let mut s = state.borrow_mut();
let (stdin, stdout, stderr) = get_stdio();
s.resource_table.add("stdin", Box::new(stdin));
s.resource_table.add("stdout", Box::new(stdout));
s.resource_table.add("stderr", Box::new(stderr));
}
let mut worker = MainWorker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
state,
);
{
let (stdin, stdout, stderr) = get_stdio();
let mut t = worker.resource_table.borrow_mut();
t.add("stdin", Box::new(stdin));
t.add("stdout", Box::new(stdout));
t.add("stderr", Box::new(stderr));
}
worker.execute("bootstrapMainRuntime()")?;
Ok(worker)
}

View file

@ -117,9 +117,9 @@ pub fn minimal_op<D>(
d: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
D: Fn(&mut deno_core::Isolate, bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{
move |_isolate: &mut deno_core::Isolate,
move |isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
let mut record = match parse_min_record(control) {
@ -137,7 +137,7 @@ where
};
let is_sync = record.promise_id == 0;
let rid = record.arg;
let min_op = d(is_sync, rid, zero_copy);
let min_op = d(isolate, is_sync, rid, zero_copy);
match min_op {
MinimalOp::Sync(sync_result) => Op::Sync(match sync_result {

View file

@ -12,7 +12,7 @@ use http::Method;
use std::convert::From;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_fetch", s.stateful_json_op(op_fetch));
i.register_op("op_fetch", s.stateful_json_op2(op_fetch));
}
#[derive(Deserialize)]
@ -23,6 +23,7 @@ struct FetchArgs {
}
pub fn op_fetch(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
@ -64,8 +65,8 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let future = async move {
let res = request.send().await?;
debug!("Fetch response {}", url);
@ -76,8 +77,8 @@ pub fn op_fetch(
}
let body = HttpBody::from(res);
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"httpBody",
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),

View file

@ -18,8 +18,8 @@ use std::time::UNIX_EPOCH;
use rand::{thread_rng, Rng};
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_open", s.stateful_json_op(op_open));
i.register_op("op_seek", s.stateful_json_op(op_seek));
i.register_op("op_open", s.stateful_json_op2(op_open));
i.register_op("op_seek", s.stateful_json_op2(op_seek));
i.register_op("op_umask", s.stateful_json_op(op_umask));
i.register_op("op_chdir", s.stateful_json_op(op_chdir));
i.register_op("op_mkdir", s.stateful_json_op(op_mkdir));
@ -68,13 +68,14 @@ struct OpenOptions {
}
fn op_open(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: OpenArgs = serde_json::from_value(args)?;
let path = resolve_from_cwd(Path::new(&args.path))?;
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let mut open_options = std::fs::OpenOptions::new();
@ -166,8 +167,8 @@ fn op_open(
if is_sync {
let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
tokio_file,
@ -180,8 +181,8 @@ fn op_open(
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"fsFile",
Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
tokio_file,
@ -204,7 +205,8 @@ struct SeekArgs {
}
fn op_seek(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -226,12 +228,12 @@ fn op_seek(
}
};
let state = state.clone();
let resource_table = isolate.resource_table.clone();
let is_sync = args.promise_id.is_none();
if is_sync {
let mut s = state.borrow_mut();
let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
let mut resource_table = resource_table.borrow_mut();
let pos = std_file_resource(&mut resource_table, rid, |r| match r {
Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
Err(_) => Err(OpError::type_error(
"cannot seek on this type of resource".to_string(),
@ -242,8 +244,8 @@ fn op_seek(
// TODO(ry) This is a fake async op. We need to use poll_fn,
// tokio::fs::File::start_seek and tokio::fs::File::poll_complete
let fut = async move {
let mut s = state.borrow_mut();
let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
let mut resource_table = resource_table.borrow_mut();
let pos = std_file_resource(&mut resource_table, rid, |r| match r {
Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
Err(_) => Err(OpError::type_error(
"cannot seek on this type of resource".to_string(),

View file

@ -17,8 +17,8 @@ use std::path::PathBuf;
use tokio::sync::mpsc;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_fs_events_open", s.stateful_json_op(op_fs_events_open));
i.register_op("op_fs_events_poll", s.stateful_json_op(op_fs_events_poll));
i.register_op("op_fs_events_open", s.stateful_json_op2(op_fs_events_open));
i.register_op("op_fs_events_poll", s.stateful_json_op2(op_fs_events_poll));
}
struct FsEventsResource {
@ -60,6 +60,7 @@ impl From<NotifyEvent> for FsEvent {
}
pub fn op_fs_events_open(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -91,13 +92,14 @@ pub fn op_fs_events_open(
watcher.watch(path, recursive_mode).map_err(ErrBox::from)?;
}
let resource = FsEventsResource { watcher, receiver };
let table = &mut state.borrow_mut().resource_table;
let rid = table.add("fsEvents", Box::new(resource));
let mut resource_table = isolate.resource_table.borrow_mut();
let rid = resource_table.add("fsEvents", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
pub fn op_fs_events_poll(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -106,9 +108,9 @@ pub fn op_fs_events_poll(
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
let state = state.clone();
let resource_table = isolate.resource_table.clone();
let f = poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let watcher = resource_table
.get_mut::<FsEventsResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;

View file

@ -1,7 +1,6 @@
use super::dispatch_minimal::MinimalOp;
use crate::http_util::HttpBody;
use crate::op_error::OpError;
use crate::ops::minimal_op;
use crate::state::State;
use deno_core::*;
use futures::future::poll_fn;
@ -60,14 +59,8 @@ lazy_static! {
}
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
"op_read",
s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
);
i.register_op(
"op_write",
s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
);
i.register_op("op_read", s.stateful_minimal_op2(op_read));
i.register_op("op_write", s.stateful_minimal_op2(op_write));
}
pub fn get_stdio() -> (
@ -211,7 +204,8 @@ impl DenoAsyncRead for StreamResource {
}
pub fn op_read(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
@ -220,15 +214,15 @@ pub fn op_read(
if zero_copy.is_none() {
return MinimalOp::Sync(Err(no_buffer_specified()));
}
let resource_table = isolate.resource_table.clone();
let state = state.clone();
let mut buf = zero_copy.unwrap();
if is_sync {
MinimalOp::Sync({
// First we look up the rid in the resource table.
let resource_table = &mut state.borrow_mut().resource_table;
std_file_resource(resource_table, rid as u32, move |r| match r {
let mut resource_table = resource_table.borrow_mut();
std_file_resource(&mut resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
use std::io::Read;
std_file
@ -244,7 +238,7 @@ pub fn op_read(
} else {
MinimalOp::Async(
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
@ -334,7 +328,8 @@ impl DenoAsyncWrite for StreamResource {
}
pub fn op_write(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
@ -344,14 +339,13 @@ pub fn op_write(
return MinimalOp::Sync(Err(no_buffer_specified()));
}
let state = state.clone();
let buf = zero_copy.unwrap();
if is_sync {
MinimalOp::Sync({
// First we look up the rid in the resource table.
let resource_table = &mut state.borrow_mut().resource_table;
std_file_resource(resource_table, rid as u32, move |r| match r {
let mut resource_table = isolate.resource_table.borrow_mut();
std_file_resource(&mut resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
use std::io::Write;
std_file
@ -365,10 +359,11 @@ pub fn op_write(
})
})
} else {
let resource_table = isolate.resource_table.clone();
MinimalOp::Async(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
@ -381,7 +376,7 @@ pub fn op_write(
// Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;

View file

@ -20,12 +20,12 @@ use tokio::net::UdpSocket;
use super::net_unix;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_accept", s.stateful_json_op(op_accept));
i.register_op("op_connect", s.stateful_json_op(op_connect));
i.register_op("op_shutdown", s.stateful_json_op(op_shutdown));
i.register_op("op_listen", s.stateful_json_op(op_listen));
i.register_op("op_receive", s.stateful_json_op(op_receive));
i.register_op("op_send", s.stateful_json_op(op_send));
i.register_op("op_accept", s.stateful_json_op2(op_accept));
i.register_op("op_connect", s.stateful_json_op2(op_connect));
i.register_op("op_shutdown", s.stateful_json_op2(op_shutdown));
i.register_op("op_listen", s.stateful_json_op2(op_listen));
i.register_op("op_receive", s.stateful_json_op2(op_receive));
i.register_op("op_send", s.stateful_json_op2(op_send));
}
#[derive(Deserialize)]
@ -35,25 +35,16 @@ struct AcceptArgs {
}
fn accept_tcp(
state: &State,
isolate: &mut deno_core::Isolate,
args: AcceptArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let rid = args.rid as u32;
let state_ = state.clone();
{
let state = state.borrow();
state
.resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;
}
let state = state.clone();
let resource_table = isolate.resource_table.clone();
let op = async move {
let accept_fut = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| {
@ -78,8 +69,8 @@ fn accept_tcp(
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@ -104,15 +95,16 @@ fn accept_tcp(
}
fn op_accept(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"tcp" => accept_tcp(state, args, zero_copy),
"tcp" => accept_tcp(isolate, args, zero_copy),
#[cfg(unix)]
"unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy),
"unix" => net_unix::accept_unix(isolate, args.rid as u32, zero_copy),
_ => Err(OpError::other(format!(
"Unsupported transport protocol {}",
args.transport
@ -127,7 +119,8 @@ struct ReceiveArgs {
}
fn receive_udp(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -135,11 +128,11 @@ fn receive_udp(
let rid = args.rid as u32;
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let op = async move {
let receive_fut = poll_fn(|cx| {
let resource_table = &mut state_.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| {
@ -163,6 +156,7 @@ fn receive_udp(
}
fn op_receive(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
@ -170,10 +164,10 @@ fn op_receive(
assert!(zero_copy.is_some());
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"udp" => receive_udp(state, args, zero_copy),
"udp" => receive_udp(isolate, state, args, zero_copy),
#[cfg(unix)]
"unixpacket" => {
net_unix::receive_unix_packet(state, args.rid as u32, zero_copy)
net_unix::receive_unix_packet(isolate, args.rid as u32, zero_copy)
}
_ => Err(OpError::other(format!(
"Unsupported transport protocol {}",
@ -191,13 +185,14 @@ struct SendArgs {
}
fn op_send(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
match serde_json::from_value(args)? {
SendArgs {
rid,
@ -207,9 +202,8 @@ fn op_send(
state.check_net(&args.hostname, args.port)?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid as u32)
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
@ -231,9 +225,8 @@ fn op_send(
let address_path = net_unix::Path::new(&args.address);
state.check_read(&address_path)?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
OpError::other("Socket has been closed".to_string())
@ -261,24 +254,25 @@ struct ConnectArgs {
}
fn op_connect(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let resource_table = isolate.resource_table.clone();
match serde_json::from_value(args)? {
ConnectArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "tcp" => {
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = async move {
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@ -306,7 +300,6 @@ fn op_connect(
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" => {
let address_path = net_unix::Path::new(&args.address);
let state_ = state.clone();
state.check_read(&address_path)?;
let op = async move {
let address = args.address;
@ -314,8 +307,8 @@ fn op_connect(
net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
let rid = resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
@ -346,7 +339,8 @@ struct ShutdownArgs {
}
fn op_shutdown(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -361,9 +355,8 @@ fn op_shutdown(
_ => unimplemented!(),
};
let mut state = state.borrow_mut();
let resource_holder = state
.resource_table
let mut resource_table = isolate.resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
match resource_holder.resource {
@ -456,10 +449,9 @@ struct ListenArgs {
}
fn listen_tcp(
state: &State,
resource_table: &mut ResourceTable,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), OpError> {
let mut state = state.borrow_mut();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
@ -468,34 +460,31 @@ fn listen_tcp(
waker: None,
local_addr,
};
let rid = state
.resource_table
.add("tcpListener", Box::new(listener_resource));
let rid = resource_table.add("tcpListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
fn listen_udp(
state: &State,
resource_table: &mut ResourceTable,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), OpError> {
let mut state = state.borrow_mut();
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource { socket };
let rid = state
.resource_table
.add("udpSocket", Box::new(socket_resource));
let rid = resource_table.add("udpSocket", Box::new(socket_resource));
Ok((rid, local_addr))
}
fn op_listen(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let mut resource_table = isolate.resource_table.borrow_mut();
match serde_json::from_value(args)? {
ListenArgs {
transport,
@ -504,9 +493,9 @@ fn op_listen(
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let (rid, local_addr) = if transport == "tcp" {
listen_tcp(state, addr)?
listen_tcp(&mut resource_table, addr)?
} else {
listen_udp(state, addr)?
listen_udp(&mut resource_table, addr)?
};
debug!(
"New listener {} {}:{}",
@ -531,9 +520,9 @@ fn op_listen(
let address_path = net_unix::Path::new(&args.address);
state.check_read(&address_path)?;
let (rid, local_addr) = if transport == "unix" {
net_unix::listen_unix(state, &address_path)?
net_unix::listen_unix(&mut resource_table, &address_path)?
} else {
net_unix::listen_unix_packet(state, &address_path)?
net_unix::listen_unix_packet(&mut resource_table, &address_path)?
};
debug!(
"New listener {} {}",

View file

@ -1,7 +1,6 @@
use super::dispatch_json::{Deserialize, JsonOp};
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::state::State;
use futures::future::FutureExt;
use deno_core::*;
@ -27,31 +26,35 @@ pub struct UnixListenArgs {
}
pub fn accept_unix(
state: &State,
isolate: &mut deno_core::Isolate,
rid: u32,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
{
let state = state.borrow();
state
.resource_table
let _ = resource_table
.borrow()
.get::<UnixListenerResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;
}
let op = async move {
let mut state = state_.borrow_mut();
let listener_resource = state
.resource_table
let mut resource_table_ = resource_table.borrow_mut();
let listener_resource = {
resource_table_
.get_mut::<UnixListenerResource>(rid)
.ok_or_else(|| {
OpError::bad_resource("Listener has been closed".to_string())
})?;
})?
};
let (unix_stream, _socket_addr) =
listener_resource.listener.accept().await?;
drop(resource_table_);
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let rid = state.resource_table.add(
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
@ -74,17 +77,16 @@ pub fn accept_unix(
}
pub fn receive_unix_packet(
state: &State,
isolate: &mut deno_core::Isolate,
rid: u32,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let mut buf = zero_copy.unwrap();
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
let mut resource_table_ = resource_table.borrow_mut();
let resource = resource_table_
.get_mut::<UnixDatagramResource>(rid)
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
@ -103,28 +105,24 @@ pub fn receive_unix_packet(
}
pub fn listen_unix(
state: &State,
resource_table: &mut ResourceTable,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), OpError> {
let mut state = state.borrow_mut();
if addr.exists() {
remove_file(&addr).unwrap();
}
let listener = UnixListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
let listener_resource = UnixListenerResource { listener };
let rid = state
.resource_table
.add("unixListener", Box::new(listener_resource));
let rid = resource_table.add("unixListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
pub fn listen_unix_packet(
state: &State,
resource_table: &mut ResourceTable,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), OpError> {
let mut state = state.borrow_mut();
if addr.exists() {
remove_file(&addr).unwrap();
}
@ -134,9 +132,7 @@ pub fn listen_unix_packet(
socket,
local_addr: local_addr.clone(),
};
let rid = state
.resource_table
.add("unixDatagram", Box::new(datagram_resource));
let rid = resource_table.add("unixDatagram", Box::new(datagram_resource));
Ok((rid, local_addr))
}

View file

@ -46,14 +46,10 @@ pub fn op_open_plugin(
let lib = open_plugin(filename).unwrap();
let plugin_resource = PluginResource { lib };
let mut state_ = state.borrow_mut();
let rid = state_
.resource_table
.add("plugin", Box::new(plugin_resource));
let plugin_resource = state_
.resource_table
.get_mut::<PluginResource>(rid)
.unwrap();
let mut resource_table = isolate.resource_table.borrow_mut();
let rid = resource_table.add("plugin", Box::new(plugin_resource));
let plugin_resource = resource_table.get::<PluginResource>(rid).unwrap();
let deno_plugin_init = *unsafe {
plugin_resource
@ -61,6 +57,8 @@ pub fn op_open_plugin(
.symbol::<PluginInitFn>("deno_plugin_init")
}
.unwrap();
drop(resource_table);
deno_plugin_init(isolate);
Ok(JsonOp::Sync(json!(rid)))

View file

@ -15,15 +15,16 @@ use tokio::process::Command;
use std::os::unix::process::ExitStatusExt;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_run", s.stateful_json_op(op_run));
i.register_op("op_run_status", s.stateful_json_op(op_run_status));
i.register_op("op_run", s.stateful_json_op2(op_run));
i.register_op("op_run_status", s.stateful_json_op2(op_run_status));
i.register_op("op_kill", s.stateful_json_op(op_kill));
}
fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
let mut state = state.borrow_mut();
std_file_resource(&mut state.resource_table, rid, move |r| match r {
fn clone_file(
rid: u32,
resource_table: &mut ResourceTable,
) -> Result<std::fs::File, OpError> {
std_file_resource(resource_table, rid, move |r| match r {
Ok(std_file) => std_file.try_clone().map_err(OpError::from),
Err(_) => Err(OpError::bad_resource_id()),
})
@ -57,6 +58,7 @@ struct ChildResource {
}
fn op_run(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -64,7 +66,7 @@ fn op_run(
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
let state_ = state.clone();
let mut resource_table = isolate.resource_table.borrow_mut();
let args = run_args.cmd;
let env = run_args.env;
@ -83,7 +85,7 @@ fn op_run(
// TODO: make this work with other resources, eg. sockets
let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
let file = clone_file(stdin_rid, &state_)?;
let file = clone_file(stdin_rid, &mut resource_table)?;
c.stdin(file);
} else {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
@ -91,7 +93,7 @@ fn op_run(
let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
let file = clone_file(stdout_rid, &state_)?;
let file = clone_file(stdout_rid, &mut resource_table)?;
c.stdout(file);
} else {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
@ -99,7 +101,7 @@ fn op_run(
let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
let file = clone_file(stderr_rid, &state_)?;
let file = clone_file(stderr_rid, &mut resource_table)?;
c.stderr(file);
} else {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
@ -112,12 +114,9 @@ fn op_run(
let mut child = c.spawn()?;
let pid = child.id();
let mut state = state_.borrow_mut();
let table = &mut state.resource_table;
let stdin_rid = match child.stdin.take() {
Some(child_stdin) => {
let rid = table.add(
let rid = resource_table.add(
"childStdin",
Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
child_stdin,
@ -130,7 +129,7 @@ fn op_run(
let stdout_rid = match child.stdout.take() {
Some(child_stdout) => {
let rid = table.add(
let rid = resource_table.add(
"childStdout",
Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
child_stdout,
@ -143,7 +142,7 @@ fn op_run(
let stderr_rid = match child.stderr.take() {
Some(child_stderr) => {
let rid = table.add(
let rid = resource_table.add(
"childStderr",
Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
child_stderr,
@ -155,7 +154,7 @@ fn op_run(
};
let child_resource = ChildResource { child };
let child_rid = table.add("child", Box::new(child_resource));
let child_rid = resource_table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
"rid": child_rid,
@ -173,6 +172,7 @@ struct RunStatusArgs {
}
fn op_run_status(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -181,11 +181,11 @@ fn op_run_status(
let rid = args.rid as u32;
state.check_run()?;
let state = state.clone();
let resource_table = isolate.resource_table.clone();
let future = async move {
let run_status = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let child_resource = resource_table
.get_mut::<ChildResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;

View file

@ -9,8 +9,8 @@ use std::sync::Arc;
use std::sync::Mutex;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_repl_start", s.stateful_json_op(op_repl_start));
i.register_op("op_repl_readline", s.stateful_json_op(op_repl_readline));
i.register_op("op_repl_start", s.stateful_json_op2(op_repl_start));
i.register_op("op_repl_readline", s.stateful_json_op2(op_repl_readline));
}
struct ReplResource(Arc<Mutex<Repl>>);
@ -22,19 +22,19 @@ struct ReplStartArgs {
}
fn op_repl_start(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: ReplStartArgs = serde_json::from_value(args)?;
debug!("op_repl_start {}", args.history_file);
let history_path =
repl::history_path(&state.borrow().global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let mut state = state.borrow_mut();
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let rid = state.resource_table.add("repl", Box::new(resource));
let mut resource_table = isolate.resource_table.borrow_mut();
let rid = resource_table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
@ -45,7 +45,8 @@ struct ReplReadlineArgs {
}
fn op_repl_readline(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -53,9 +54,8 @@ fn op_repl_readline(
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
let state = state.borrow();
let resource = state
.resource_table
let resource_table = isolate.resource_table.borrow();
let resource = resource_table
.get::<ReplResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;
let repl = resource.0.clone();

View file

@ -5,23 +5,24 @@ use crate::state::State;
use deno_core::*;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_resources", s.stateful_json_op(op_resources));
i.register_op("op_close", s.stateful_json_op(op_close));
i.register_op("op_resources", s.stateful_json_op2(op_resources));
i.register_op("op_close", s.stateful_json_op2(op_close));
}
fn op_resources(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let state = state.borrow();
let serialized_resources = state.resource_table.entries();
let serialized_resources = isolate.resource_table.borrow().entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}
/// op_close removes a resource from the resource table.
fn op_close(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -30,9 +31,8 @@ fn op_close(
rid: i32,
}
let args: CloseArgs = serde_json::from_value(args).unwrap();
let mut state = state.borrow_mut();
state
.resource_table
let mut resource_table = isolate.resource_table.borrow_mut();
resource_table
.close(args.rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
Ok(JsonOp::Sync(json!({})))

View file

@ -14,9 +14,9 @@ use std::task::Waker;
use tokio::signal::unix::{signal, Signal, SignalKind};
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_signal_bind", s.stateful_json_op(op_signal_bind));
i.register_op("op_signal_unbind", s.stateful_json_op(op_signal_unbind));
i.register_op("op_signal_poll", s.stateful_json_op(op_signal_poll));
i.register_op("op_signal_bind", s.stateful_json_op2(op_signal_bind));
i.register_op("op_signal_unbind", s.stateful_json_op2(op_signal_unbind));
i.register_op("op_signal_poll", s.stateful_json_op2(op_signal_poll));
}
#[cfg(unix)]
@ -38,13 +38,14 @@ struct SignalArgs {
#[cfg(unix)]
fn op_signal_bind(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: BindSignalArgs = serde_json::from_value(args)?;
let mut state = state.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table = isolate.resource_table.borrow_mut();
let rid = resource_table.add(
"signal",
Box::new(SignalStreamResource(
signal(SignalKind::from_raw(args.signo)).expect(""),
@ -58,18 +59,19 @@ fn op_signal_bind(
#[cfg(unix)]
fn op_signal_poll(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let future = poll_fn(move |cx| {
let mut state = state_.borrow_mut();
let mut resource_table = resource_table.borrow_mut();
if let Some(mut signal) =
state.resource_table.get_mut::<SignalStreamResource>(rid)
resource_table.get_mut::<SignalStreamResource>(rid)
{
signal.1 = Some(cx.waker().clone());
return signal.0.poll_recv(cx);
@ -83,14 +85,15 @@ fn op_signal_poll(
#[cfg(unix)]
pub fn op_signal_unbind(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let mut state = state.borrow_mut();
let resource = state.resource_table.get::<SignalStreamResource>(rid);
let mut resource_table = isolate.resource_table.borrow_mut();
let resource = resource_table.get::<SignalStreamResource>(rid);
if let Some(signal) = resource {
if let Some(waker) = &signal.1 {
// Wakes up the pending poll if exists.
@ -98,8 +101,7 @@ pub fn op_signal_unbind(
waker.clone().wake();
}
}
state
.resource_table
resource_table
.close(rid)
.ok_or_else(OpError::bad_resource_id)?;
Ok(JsonOp::Sync(json!({})))
@ -107,6 +109,7 @@ pub fn op_signal_unbind(
#[cfg(not(unix))]
pub fn op_signal_bind(
_isolate: &mut deno_core::Isolate,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -116,6 +119,7 @@ pub fn op_signal_bind(
#[cfg(not(unix))]
fn op_signal_unbind(
_isolate: &mut deno_core::Isolate,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -125,6 +129,7 @@ fn op_signal_unbind(
#[cfg(not(unix))]
fn op_signal_poll(
_isolate: &mut deno_core::Isolate,
_state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,

View file

@ -28,10 +28,10 @@ use tokio_rustls::{
use webpki::DNSNameRef;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_start_tls", s.stateful_json_op(op_start_tls));
i.register_op("op_connect_tls", s.stateful_json_op(op_connect_tls));
i.register_op("op_listen_tls", s.stateful_json_op(op_listen_tls));
i.register_op("op_accept_tls", s.stateful_json_op(op_accept_tls));
i.register_op("op_start_tls", s.stateful_json_op2(op_start_tls));
i.register_op("op_connect_tls", s.stateful_json_op2(op_connect_tls));
i.register_op("op_listen_tls", s.stateful_json_op2(op_listen_tls));
i.register_op("op_accept_tls", s.stateful_json_op2(op_accept_tls));
}
#[derive(Deserialize)]
@ -52,14 +52,15 @@ struct StartTLSArgs {
}
pub fn op_start_tls(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: StartTLSArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let cert_file = args.cert_file.clone();
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
let mut domain = args.hostname;
if domain.is_empty() {
@ -67,12 +68,12 @@ pub fn op_start_tls(
}
let op = async move {
let mut state = state_.borrow_mut();
let mut resource_holder =
match state.resource_table.remove::<StreamResourceHolder>(rid) {
let mut resource_holder = {
let mut resource_table_ = resource_table.borrow_mut();
match resource_table_.remove::<StreamResourceHolder>(rid) {
Some(resource) => *resource,
None => return Err(OpError::bad_resource_id()),
}
};
if let StreamResource::TcpStream(ref mut tcp_stream) =
@ -96,7 +97,8 @@ pub fn op_start_tls(
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let rid = state.resource_table.add(
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"clientTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
@ -123,13 +125,14 @@ pub fn op_start_tls(
}
pub fn op_connect_tls(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: ConnectTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file.clone();
let state_ = state.clone();
let resource_table = isolate.resource_table.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(Path::new(&path))?;
@ -158,8 +161,8 @@ pub fn op_connect_tls(
let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
let mut resource_table_ = resource_table.borrow_mut();
let rid = resource_table_.add(
"clientTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
@ -298,6 +301,7 @@ struct ListenTlsArgs {
}
fn op_listen_tls(
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
@ -327,10 +331,9 @@ fn op_listen_tls(
waker: None,
local_addr,
};
let mut state = state.borrow_mut();
let rid = state
.resource_table
.add("tlsListener", Box::new(tls_listener_resource));
let mut resource_table = isolate.resource_table.borrow_mut();
let rid = resource_table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
"rid": rid,
@ -348,16 +351,17 @@ struct AcceptTlsArgs {
}
fn op_accept_tls(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state = state.clone();
let resource_table = isolate.resource_table.clone();
let op = async move {
let accept_fut = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TlsListenerResource>(rid)
.ok_or_else(|| {
@ -383,9 +387,8 @@ fn op_accept_tls(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let tls_acceptor = {
let state = state.borrow();
let resource = state
.resource_table
let resource_table = resource_table.borrow();
let resource = resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(OpError::bad_resource_id)
.expect("Can't find tls listener");
@ -393,8 +396,8 @@ fn op_accept_tls(
};
let tls_stream = tls_acceptor.accept(tcp_stream).await?;
let rid = {
let mut state = state.borrow_mut();
state.resource_table.add(
let mut resource_table = resource_table.borrow_mut();
resource_table.add(
"serverTlsStream",
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),

View file

@ -2,7 +2,6 @@ use super::dispatch_json::JsonOp;
use super::io::std_file_resource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::ops::json_op;
use crate::state::State;
use deno_core::*;
#[cfg(unix)]
@ -35,8 +34,8 @@ fn get_windows_handle(
}
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_set_raw", s.core_op(json_op(s.stateful_op(op_set_raw))));
i.register_op("op_isatty", s.core_op(json_op(s.stateful_op(op_isatty))));
i.register_op("op_set_raw", s.stateful_json_op2(op_set_raw));
i.register_op("op_isatty", s.stateful_json_op2(op_isatty));
}
#[derive(Deserialize)]
@ -46,7 +45,8 @@ struct SetRawArgs {
}
pub fn op_set_raw(
state_: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
@ -65,9 +65,8 @@ pub fn op_set_raw(
use winapi::shared::minwindef::FALSE;
use winapi::um::{consoleapi, handleapi};
let mut state = state_.borrow_mut();
let resource_holder =
state.resource_table.get_mut::<StreamResourceHolder>(rid);
let mut resource_table = isolate.resource_table.borrow_mut();
let resource_holder = resource_table.get_mut::<StreamResourceHolder>(rid);
if resource_holder.is_none() {
return Err(OpError::bad_resource_id());
}
@ -132,9 +131,8 @@ pub fn op_set_raw(
{
use std::os::unix::io::AsRawFd;
let mut state = state_.borrow_mut();
let resource_holder =
state.resource_table.get_mut::<StreamResourceHolder>(rid);
let mut resource_table = isolate.resource_table.borrow_mut();
let resource_holder = resource_table.get_mut::<StreamResourceHolder>(rid);
if resource_holder.is_none() {
return Err(OpError::bad_resource_id());
}
@ -215,16 +213,17 @@ struct IsattyArgs {
}
pub fn op_isatty(
state: &State,
isolate: &mut deno_core::Isolate,
_state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid;
let resource_table = &mut state.borrow_mut().resource_table;
let mut resource_table = isolate.resource_table.borrow_mut();
let isatty: bool =
std_file_resource(resource_table, rid as u32, move |r| match r {
std_file_resource(&mut resource_table, rid as u32, move |r| match r {
Ok(std_file) => {
#[cfg(windows)]
{

View file

@ -43,14 +43,6 @@ fn create_web_worker(
let state =
State::new_for_worker(global_state, Some(permissions), specifier)?;
if has_deno_namespace {
let mut s = state.borrow_mut();
let (stdin, stdout, stderr) = get_stdio();
s.resource_table.add("stdin", Box::new(stdin));
s.resource_table.add("stdout", Box::new(stdout));
s.resource_table.add("stderr", Box::new(stderr));
}
let mut worker = WebWorker::new(
name.clone(),
startup_data::deno_isolate_init(),
@ -58,6 +50,14 @@ fn create_web_worker(
has_deno_namespace,
);
if has_deno_namespace {
let mut resource_table = worker.resource_table.borrow_mut();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr));
}
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
let script = format!(

View file

@ -14,7 +14,6 @@ use deno_core::ErrBox;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::Op;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use rand::rngs::StdRng;
@ -64,7 +63,6 @@ pub struct StateInner {
pub next_worker_id: u32,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
pub resource_table: ResourceTable,
pub target_lib: TargetLib,
pub debug_type: DebugType,
}
@ -81,7 +79,25 @@ impl State {
self.core_op(json_op(self.stateful_op(dispatcher)))
}
pub fn stateful_json_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(
&mut deno_core::Isolate,
&State,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
use crate::ops::json_op;
self.core_op(json_op(self.stateful_op2(dispatcher)))
}
/// Wrap core `OpDispatcher` to collect metrics.
// TODO(ry) this should be private. Is called by stateful_json_op or
// stateful_minimal_op
pub fn core_op<D>(
&self,
dispatcher: D,
@ -142,19 +158,29 @@ impl State {
}
}
/// This is a special function that provides `state` argument to dispatcher.
pub fn stateful_minimal_op<D>(
pub fn stateful_minimal_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(&State, bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
D: Fn(
&mut deno_core::Isolate,
&State,
bool,
i32,
Option<ZeroCopyBuf>,
) -> MinimalOp,
{
let state = self.clone();
move |is_sync: bool,
self.core_op(crate::ops::minimal_op(
move |isolate: &mut deno_core::Isolate,
is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>|
-> MinimalOp { dispatcher(&state, is_sync, rid, zero_copy) }
-> MinimalOp {
dispatcher(isolate, &state, is_sync, rid, zero_copy)
},
))
}
/// This is a special function that provides `state` argument to dispatcher.
@ -300,7 +326,6 @@ impl State {
next_worker_id: 0,
start_time: Instant::now(),
seeded_rng,
resource_table: ResourceTable::default(),
target_lib: TargetLib::Main,
debug_type,
}));
@ -336,7 +361,6 @@ impl State {
next_worker_id: 0,
start_time: Instant::now(),
seeded_rng,
resource_table: ResourceTable::default(),
target_lib: TargetLib::Worker,
debug_type: DebugType::Dependent,
}));

View file

@ -209,6 +209,19 @@ impl Future for Worker {
}
}
impl Deref for Worker {
type Target = deno_core::EsIsolate;
fn deref(&self) -> &Self::Target {
&self.isolate
}
}
impl DerefMut for Worker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.isolate
}
}
/// This worker is created and used by Deno executable.
///
/// It provides ops available in the `Deno` namespace.

View file

@ -13,6 +13,7 @@ use crate::js_errors::JSError;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::ResourceTable;
use futures::future::FutureExt;
use futures::stream::select;
use futures::stream::FuturesUnordered;
@ -20,6 +21,7 @@ use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
use libc::c_void;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
use std::error::Error;
@ -28,6 +30,7 @@ use std::mem::forget;
use std::ops::{Deref, DerefMut};
use std::option::Option;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Once};
use std::task::Context;
use std::task::Poll;
@ -163,6 +166,7 @@ pub struct Isolate {
snapshot_creator: Option<v8::SnapshotCreator>,
has_snapshotted: bool,
snapshot: Option<SnapshotConfig>,
pub resource_table: Rc<RefCell<ResourceTable>>,
pub global_context: v8::Global<v8::Context>,
pub(crate) shared_ab: v8::Global<v8::SharedArrayBuffer>,
pub(crate) js_recv_cb: v8::Global<v8::Function>,
@ -297,6 +301,7 @@ impl Isolate {
let core_isolate = Self {
v8_isolate: None,
global_context,
resource_table: Rc::new(RefCell::new(ResourceTable::default())),
pending_promise_exceptions: HashMap::new(),
shared_ab: v8::Global::<v8::SharedArrayBuffer>::new(),
js_recv_cb: v8::Global::<v8::Function>::new(),