1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

Fix 100% CPU idling problem by reverting #7672 (#7911)

* Revert "refactor: Worker is not a Future (#7895)"

This reverts commit f4357f0ff9.

* Revert "refactor(core): JsRuntime is not a Future (#7855)"

This reverts commit d8879feb8c.

* Revert "fix(core): module execution with top level await (#7672)"

This reverts commit c7c7677825.
This commit is contained in:
Ryan Dahl 2020-10-10 05:41:11 -04:00 committed by GitHub
parent 782e6a2ed5
commit 08bb8b3d53
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 225 additions and 495 deletions

View file

@ -1,6 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::colors;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
use deno_core::error::AnyError;
use deno_core::serde_json;
@ -13,7 +14,8 @@ pub struct CoverageCollector {
}
impl CoverageCollector {
pub fn new(session: Box<InspectorSession>) -> Self {
pub fn new(inspector_ptr: *mut DenoInspector) -> Self {
let session = InspectorSession::new(inspector_ptr);
Self { session }
}

View file

@ -275,7 +275,7 @@ async fn eval_command(
debug!("main_module {}", &main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -423,7 +423,7 @@ async fn run_repl(flags: Flags) -> Result<(), AnyError> {
ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap();
let global_state = GlobalState::new(flags)?;
let mut worker = MainWorker::new(&global_state, main_module.clone());
worker.run_event_loop().await?;
(&mut *worker).await?;
repl::run(&global_state, worker).await
}
@ -454,7 +454,7 @@ async fn run_from_stdin(flags: Flags) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -500,7 +500,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -525,7 +525,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -578,8 +578,12 @@ async fn test_command(
.save_source_file_in_cache(&main_module, source_file);
let mut maybe_coverage_collector = if flags.coverage {
let session = worker.create_inspector_session();
let mut coverage_collector = CoverageCollector::new(session);
let inspector = worker
.inspector
.as_mut()
.expect("Inspector is not created.");
let mut coverage_collector = CoverageCollector::new(&mut **inspector);
coverage_collector.start_collecting().await?;
Some(coverage_collector)
@ -590,9 +594,9 @@ async fn test_command(
let execute_result = worker.execute_module(&main_module).await;
execute_result?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
worker.run_event_loop().await?;
(&mut *worker).await?;
if let Some(coverage_collector) = maybe_coverage_collector.as_mut() {
let coverages = coverage_collector.collect().await?;

View file

@ -155,13 +155,6 @@ fn run_worker_thread(
if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone();
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
if sender.is_closed() {
return;
}
sender
.try_send(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");
@ -173,8 +166,7 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
rt.block_on(worker.run_event_loop())
.expect("Panic in event loop");
rt.block_on(worker).expect("Panic in event loop");
debug!("Worker thread shuts down {}", &name);
})?;

View file

@ -47,7 +47,7 @@ async fn post_message_and_poll(
return result
}
_ = worker.run_event_loop() => {
_ = &mut *worker => {
// A zero delay is long enough to yield the thread in order to prevent the loop from
// running hot for messages that are taking longer to resolve like for example an
// evaluation of top level await.
@ -75,7 +75,7 @@ async fn read_line_and_poll(
result = &mut line => {
return result.unwrap();
}
_ = worker.run_event_loop(), if poll_worker => {
_ = &mut *worker, if poll_worker => {
poll_worker = false;
}
_ = &mut timeout => {
@ -92,7 +92,12 @@ pub async fn run(
// Our inspector is unable to default to the default context id so we have to specify it here.
let context_id: u32 = 1;
let mut session = worker.create_inspector_session();
let inspector = worker
.inspector
.as_mut()
.expect("Inspector is not created.");
let mut session = InspectorSession::new(&mut **inspector);
let history_file = global_state.dir.root.join("deno_history.txt");

View file

@ -2662,16 +2662,6 @@ itest!(ignore_require {
exit_code: 0,
});
itest!(top_level_await_bug {
args: "run --allow-read top_level_await_bug.js",
output: "top_level_await_bug.out",
});
itest!(top_level_await_bug2 {
args: "run --allow-read top_level_await_bug2.js",
output: "top_level_await_bug2.out",
});
#[test]
fn cafile_env_fetch() {
use deno_core::url::Url;

View file

@ -1,2 +0,0 @@
const mod = await import("./top_level_await_bug_nested.js");
console.log(mod);

View file

@ -1 +0,0 @@
Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" }

View file

@ -1,15 +0,0 @@
const mod = await import("./top_level_await_bug_nested.js");
console.log(mod);
const sleep = (n) => new Promise((r) => setTimeout(r, n));
await sleep(100);
console.log("slept");
window.addEventListener("load", () => {
console.log("load event");
});
setTimeout(() => {
console.log("timeout");
}, 1000);

View file

@ -1,4 +0,0 @@
Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" }
slept
load event
timeout

View file

@ -1,5 +0,0 @@
const sleep = (n) => new Promise((r) => setTimeout(r, n));
await sleep(100);
export default 1;

View file

@ -3,7 +3,6 @@
use crate::fmt_errors::JsError;
use crate::global_state::GlobalState;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
use crate::js;
use crate::metrics::Metrics;
use crate::ops;
@ -12,7 +11,6 @@ use crate::permissions::Permissions;
use crate::state::CliModuleLoader;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
@ -24,8 +22,10 @@ use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::Snapshot;
use std::env;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@ -95,15 +95,13 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
/// - `MainWorker`
/// - `WebWorker`
pub struct Worker {
external_channels: WorkerHandle,
inspector: Option<Box<DenoInspector>>,
// Following fields are pub because they are accessed
// when creating a new WebWorker instance.
pub name: String,
pub js_runtime: JsRuntime,
pub inspector: Option<Box<DenoInspector>>,
pub waker: AtomicWaker,
pub(crate) internal_channels: WorkerChannelsInternal,
pub(crate) js_runtime: JsRuntime,
pub(crate) name: String,
external_channels: WorkerHandle,
should_break_on_first_statement: bool,
waker: AtomicWaker,
}
impl Worker {
@ -149,13 +147,13 @@ impl Worker {
let (internal_channels, external_channels) = create_channels();
Self {
external_channels,
inspector,
internal_channels,
js_runtime,
name,
should_break_on_first_statement,
js_runtime,
inspector,
waker: AtomicWaker::new(),
internal_channels,
external_channels,
should_break_on_first_statement,
}
}
@ -191,7 +189,7 @@ impl Worker {
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session();
self.js_runtime.mod_evaluate(id).await
self.js_runtime.mod_evaluate(id)
}
/// Loads, instantiates and executes provided source code
@ -206,7 +204,7 @@ impl Worker {
.load_module(module_specifier, Some(code))
.await?;
self.wait_for_inspector_session();
self.js_runtime.mod_evaluate(id).await
self.js_runtime.mod_evaluate(id)
}
/// Returns a way to communicate with the Worker from other threads.
@ -223,28 +221,6 @@ impl Worker {
.wait_for_session_and_break_on_next_statement()
}
}
/// Create new inspector session. This function panics if Worker
/// was not configured to create inspector.
pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
let inspector = self.inspector.as_mut().unwrap();
InspectorSession::new(&mut **inspector)
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
// We always poll the inspector if it exists.
let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
}
}
impl Drop for Worker {
@ -255,6 +231,32 @@ impl Drop for Worker {
}
}
impl Future for Worker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
// We always poll the inspector if it exists.
let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx));
inner.waker.register(cx.waker());
inner.js_runtime.poll_unpin(cx)
}
}
impl Deref for Worker {
type Target = JsRuntime;
fn deref(&self) -> &Self::Target {
&self.js_runtime
}
}
impl DerefMut for Worker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.js_runtime
}
}
/// This worker is created and used by Deno executable.
///
/// It provides ops available in the `Deno` namespace.
@ -276,46 +278,45 @@ impl MainWorker {
loader,
true,
);
let js_runtime = &mut worker.js_runtime;
{
// All ops registered in this function depend on these
{
let op_state = js_runtime.op_state();
let op_state = worker.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(global_state.permissions.clone());
}
ops::runtime::init(js_runtime, main_module);
ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(js_runtime);
ops::worker_host::init(js_runtime);
ops::random::init(js_runtime, global_state.flags.seed);
ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::runtime::init(&mut worker, main_module);
ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut worker);
ops::worker_host::init(&mut worker);
ops::random::init(&mut worker, global_state.flags.seed);
ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close);
ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources);
ops::reg_json_sync(
js_runtime,
&mut worker,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
ops::errors::init(js_runtime);
ops::fs_events::init(js_runtime);
ops::fs::init(js_runtime);
ops::io::init(js_runtime);
ops::net::init(js_runtime);
ops::os::init(js_runtime);
ops::permissions::init(js_runtime);
ops::plugin::init(js_runtime);
ops::process::init(js_runtime);
ops::runtime_compiler::init(js_runtime);
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
ops::websocket::init(js_runtime);
ops::errors::init(&mut worker);
ops::fs_events::init(&mut worker);
ops::fs::init(&mut worker);
ops::io::init(&mut worker);
ops::net::init(&mut worker);
ops::os::init(&mut worker);
ops::permissions::init(&mut worker);
ops::plugin::init(&mut worker);
ops::process::init(&mut worker);
ops::runtime_compiler::init(&mut worker);
ops::signal::init(&mut worker);
ops::tls::init(&mut worker);
ops::tty::init(&mut worker);
ops::websocket::init(&mut worker);
}
{
let op_state = js_runtime.op_state();
let op_state = worker.op_state();
let mut op_state = op_state.borrow_mut();
let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = get_stdio();
@ -448,45 +449,49 @@ impl WebWorker {
{
let handle = web_worker.thread_safe_handle();
let sender = web_worker.worker.internal_channels.sender.clone();
let js_runtime = &mut web_worker.js_runtime;
// All ops registered in this function depend on these
{
let op_state = js_runtime.op_state();
let op_state = web_worker.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(permissions);
}
ops::web_worker::init(js_runtime, sender, handle);
ops::runtime::init(js_runtime, main_module);
ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(js_runtime);
ops::worker_host::init(js_runtime);
ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::web_worker::init(&mut web_worker, sender, handle);
ops::runtime::init(&mut web_worker, main_module);
ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut web_worker);
ops::worker_host::init(&mut web_worker);
ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close);
ops::reg_json_sync(
js_runtime,
&mut web_worker,
"op_resources",
deno_core::op_resources,
);
ops::reg_json_sync(
&mut web_worker,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
ops::errors::init(js_runtime);
ops::io::init(js_runtime);
ops::websocket::init(js_runtime);
ops::errors::init(&mut web_worker);
ops::io::init(&mut web_worker);
ops::websocket::init(&mut web_worker);
if has_deno_namespace {
ops::fs_events::init(js_runtime);
ops::fs::init(js_runtime);
ops::net::init(js_runtime);
ops::os::init(js_runtime);
ops::permissions::init(js_runtime);
ops::plugin::init(js_runtime);
ops::process::init(js_runtime);
ops::random::init(js_runtime, global_state.flags.seed);
ops::runtime_compiler::init(js_runtime);
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
ops::fs_events::init(&mut web_worker);
ops::fs::init(&mut web_worker);
ops::net::init(&mut web_worker);
ops::os::init(&mut web_worker);
ops::permissions::init(&mut web_worker);
ops::plugin::init(&mut web_worker);
ops::process::init(&mut web_worker);
ops::random::init(&mut web_worker, global_state.flags.seed);
ops::runtime_compiler::init(&mut web_worker);
ops::signal::init(&mut web_worker);
ops::tls::init(&mut web_worker);
ops::tty::init(&mut web_worker);
}
}
@ -499,27 +504,38 @@ impl WebWorker {
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.handle.clone()
}
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
&self.worker
}
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
let worker = &mut self.worker;
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
let terminated = self.handle.terminated.load(Ordering::Relaxed);
impl Future for WebWorker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let worker = &mut inner.worker;
let terminated = inner.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
if !self.event_loop_idle {
match worker.poll_event_loop(cx) {
if !inner.event_loop_idle {
match worker.poll_unpin(cx) {
Poll::Ready(r) => {
let terminated = self.handle.terminated.load(Ordering::Relaxed);
let terminated = inner.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
@ -530,13 +546,13 @@ impl WebWorker {
.try_send(WorkerEvent::Error(e))
.expect("Failed to post message to host");
}
self.event_loop_idle = true;
inner.event_loop_idle = true;
}
Poll::Pending => {}
}
}
if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) {
// terminate_rx should never be closed
assert!(r.is_some());
return Poll::Ready(Ok(()));
@ -553,7 +569,7 @@ impl WebWorker {
if let Err(e) = worker.execute(&script) {
// If execution was terminated during message callback then
// just ignore it
if self.handle.terminated.load(Ordering::Relaxed) {
if inner.handle.terminated.load(Ordering::Relaxed) {
return Poll::Ready(Ok(()));
}
@ -565,7 +581,7 @@ impl WebWorker {
}
// Let event loop be polled again
self.event_loop_idle = false;
inner.event_loop_idle = false;
worker.waker.wake();
}
None => unreachable!(),
@ -576,19 +592,6 @@ impl WebWorker {
}
}
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
&self.worker
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -625,7 +628,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = worker.run_event_loop().await {
if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -643,7 +646,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = worker.run_event_loop().await {
if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -662,7 +665,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = worker.run_event_loop().await {
if let Err(e) = (&mut *worker).await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -730,7 +733,7 @@ mod tests {
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker.run_event_loop());
let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});
@ -777,7 +780,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker.run_event_loop());
let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});

View file

@ -9,12 +9,9 @@ bindings.
This Rust crate contains the essential V8 bindings for Deno's command-line
interface (Deno CLI). The main abstraction here is the JsRuntime which provides
a way to execute JavaScript.
The JsRuntime implements an event loop abstraction for the executed code that
keeps track of all pending tasks (async ops, dynamic module loads). It is user's
responsibility to drive that loop by using `JsRuntime::run_event_loop` method -
it must be executed in the context of Rust's future executor (eg. tokio, smol).
a way to execute JavaScript. The JsRuntime is modeled as a
`Future<Item=(), Error=JsError>` which completes once all of its ops have
completed.
In order to bind Rust functions into JavaScript, use the `Deno.core.dispatch()`
function to trigger the "dispatch" callback in Rust. The user is responsible for

View file

@ -260,7 +260,7 @@ fn main() {
include_str!("http_bench_bin_ops.js"),
)
.unwrap();
js_runtime.run_event_loop().await
js_runtime.await
};
runtime.block_on(future).unwrap();
}

View file

@ -193,7 +193,7 @@ fn main() {
include_str!("http_bench_json_ops.js"),
)
.unwrap();
js_runtime.run_event_loop().await
js_runtime.await
};
runtime.block_on(future).unwrap();
}

View file

@ -341,13 +341,6 @@ pub struct ModuleInfo {
pub name: String,
pub handle: v8::Global<v8::Module>,
pub import_specifiers: Vec<ModuleSpecifier>,
// TODO(bartlomieju): there should be "state"
// field that describes if module is already being loaded,
// so concurent dynamic imports don't introduce dead lock
// pub state: LoadState {
// Loading(shared_future),
// Loaded,
// },
}
/// A symbolic module entity.
@ -674,7 +667,7 @@ mod tests {
let a_id_fut = runtime.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap();
runtime.mod_evaluate(a_id).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@ -741,7 +734,7 @@ mod tests {
let result = runtime.load_module(&spec, None).await;
assert!(result.is_ok());
let circular1_id = result.unwrap();
runtime.mod_evaluate(circular1_id).await.unwrap();
runtime.mod_evaluate(circular1_id).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
@ -818,7 +811,7 @@ mod tests {
println!(">> result {:?}", result);
assert!(result.is_ok());
let redirect1_id = result.unwrap();
runtime.mod_evaluate(redirect1_id).await.unwrap();
runtime.mod_evaluate(redirect1_id).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@ -968,7 +961,7 @@ mod tests {
let main_id =
futures::executor::block_on(main_id_fut).expect("Failed to load");
futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap();
runtime.mod_evaluate(main_id).unwrap();
let l = loads.lock().unwrap();
assert_eq!(

View file

@ -23,8 +23,6 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::BufVec;
use crate::OpState;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::stream::StreamFuture;
@ -84,11 +82,6 @@ pub struct JsRuntime {
allocations: IsolateAllocations,
}
type DynImportModEvaluate =
(ModuleId, v8::Global<v8::Promise>, v8::Global<v8::Module>);
type ModEvaluate =
(v8::Global<v8::Promise>, mpsc::Sender<Result<(), AnyError>>);
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@ -97,8 +90,6 @@ pub(crate) struct JsRuntimeState {
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>,
pub(crate) pending_dyn_mod_evaluate: HashMap<i32, DynImportModEvaluate>,
pub(crate) pending_mod_evaluate: HashMap<ModuleId, ModEvaluate>,
pub(crate) js_error_create_fn: Box<JsErrorCreateFn>,
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
@ -272,8 +263,6 @@ impl JsRuntime {
isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState {
global_context: Some(global_context),
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: HashMap::new(),
pending_mod_evaluate: HashMap::new(),
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
@ -453,51 +442,50 @@ impl JsRuntime {
.remove_near_heap_limit_callback(cb, heap_limit);
}
}
}
/// Runs event loop to completion
///
/// This future resolves when:
/// - there are no more pending dynamic imports
/// - there are no more pending ops
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
}
extern "C" fn near_heap_limit_callback<F>(
data: *mut c_void,
current_heap_limit: usize,
initial_heap_limit: usize,
) -> usize
where
F: FnMut(usize, usize) -> usize,
{
let callback = unsafe { &mut *(data as *mut F) };
callback(current_heap_limit, initial_heap_limit)
}
/// Runs a single tick of event loop
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
self.shared_init();
impl Future for JsRuntime {
type Output = Result<(), AnyError>;
let state_rc = Self::state(self.v8_isolate());
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let runtime = self.get_mut();
runtime.shared_init();
let state_rc = Self::state(runtime.v8_isolate());
{
let state = state_rc.borrow();
state.waker.register(cx.waker());
}
// Top level modules
self.evaluate_pending_modules()?;
// Dynamic module loading - ie. modules loaded using "import()"
{
let poll_imports = self.prepare_dyn_imports(cx)?;
let poll_imports = runtime.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
let poll_imports = self.poll_dyn_imports(cx)?;
let poll_imports = runtime.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
self.evaluate_dyn_imports()?;
self.check_promise_exceptions()?;
runtime.check_promise_exceptions()?;
}
// Ops
{
let overflow_response = self.poll_pending_ops(cx);
self.async_op_response(overflow_response)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
let overflow_response = runtime.poll_pending_ops(cx);
runtime.async_op_response(overflow_response)?;
runtime.drain_macrotasks()?;
runtime.check_promise_exceptions()?;
}
let state = state_rc.borrow();
@ -505,8 +493,6 @@ impl JsRuntime {
state.pending_ops.is_empty()
&& state.pending_dyn_imports.is_empty()
&& state.preparing_dyn_imports.is_empty()
&& state.pending_dyn_mod_evaluate.is_empty()
&& state.pending_mod_evaluate.is_empty()
};
if is_idle {
@ -523,18 +509,6 @@ impl JsRuntime {
}
}
extern "C" fn near_heap_limit_callback<F>(
data: *mut c_void,
current_heap_limit: usize,
initial_heap_limit: usize,
) -> usize
where
F: FnMut(usize, usize) -> usize,
{
let callback = unsafe { &mut *(data as *mut F) };
callback(current_heap_limit, initial_heap_limit)
}
impl JsRuntimeState {
// Called by V8 during `Isolate::mod_instantiate`.
pub fn module_resolve_cb(
@ -711,93 +685,7 @@ impl JsRuntime {
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
pub fn dyn_mod_evaluate(
&mut self,
load_id: ModuleLoadId,
id: ModuleId,
) -> Result<(), AnyError> {
self.shared_init();
let state_rc = Self::state(self.v8_isolate());
let context = self.global_context();
let context1 = self.global_context();
let module_handle = state_rc
.borrow()
.modules
.get_info(id)
.expect("ModuleInfo not found")
.handle
.clone();
let status = {
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let module = module_handle.get(scope);
module.get_status()
};
if status == v8::ModuleStatus::Instantiated {
// IMPORTANT: Top-level-await is enabled, which means that return value
// of module evaluation is a promise.
//
// Because that promise is created internally by V8, when error occurs during
// module evaluation the promise is rejected, and since the promise has no rejection
// handler it will result in call to `bindings::promise_reject_callback` adding
// the promise to pending promise rejection table - meaning JsRuntime will return
// error on next poll().
//
// This situation is not desirable as we want to manually return error at the
// end of this function to handle it further. It means we need to manually
// remove this promise from pending promise rejection table.
//
// For more details see:
// https://github.com/denoland/deno/issues/4908
// https://v8.dev/features/top-level-await#module-execution-order
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context1);
let module = v8::Local::new(scope, &module_handle);
let maybe_value = module.evaluate(scope);
// Update status after evaluating.
let status = module.get_status();
if let Some(value) = maybe_value {
assert!(
status == v8::ModuleStatus::Evaluated
|| status == v8::ModuleStatus::Errored
);
let promise = v8::Local::<v8::Promise>::try_from(value)
.expect("Expected to get promise as module evaluation result");
let promise_id = promise.get_identity_hash();
let mut state = state_rc.borrow_mut();
state.pending_promise_exceptions.remove(&promise_id);
let promise_global = v8::Global::new(scope, promise);
let module_global = v8::Global::new(scope, module);
state
.pending_dyn_mod_evaluate
.insert(load_id, (id, promise_global, module_global));
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
if status == v8::ModuleStatus::Evaluated {
self.dyn_import_done(load_id, id)?;
}
Ok(())
}
/// Evaluates an already instantiated ES module.
///
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
fn mod_evaluate_inner(
&mut self,
id: ModuleId,
) -> Result<mpsc::Receiver<Result<(), AnyError>>, AnyError> {
pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
self.shared_init();
let state_rc = Self::state(self.v8_isolate());
@ -813,8 +701,6 @@ impl JsRuntime {
.expect("ModuleInfo not found");
let mut status = module.get_status();
let (sender, receiver) = mpsc::channel(1);
if status == v8::ModuleStatus::Instantiated {
// IMPORTANT: Top-level-await is enabled, which means that return value
// of module evaluation is a promise.
@ -847,30 +733,20 @@ impl JsRuntime {
let promise_id = promise.get_identity_hash();
let mut state = state_rc.borrow_mut();
state.pending_promise_exceptions.remove(&promise_id);
let promise_global = v8::Global::new(scope, promise);
state
.pending_mod_evaluate
.insert(id, (promise_global, sender));
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
Ok(receiver)
}
pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
let mut receiver = self.mod_evaluate_inner(id)?;
poll_fn(|cx| {
if let Poll::Ready(result) = receiver.poll_next_unpin(cx) {
debug!("received module evaluate");
return Poll::Ready(result.unwrap());
match status {
v8::ModuleStatus::Evaluated => Ok(()),
v8::ModuleStatus::Errored => {
let exception = module.get_exception();
exception_to_err_result(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
}
let _r = self.poll_event_loop(cx)?;
Poll::Pending
})
.await
other => panic!("Unexpected module status {:?}", other),
}
}
fn dyn_import_error(
@ -1031,122 +907,16 @@ impl JsRuntime {
// Load is done.
let module_id = load.root_module_id.unwrap();
self.mod_instantiate(module_id)?;
self.dyn_mod_evaluate(dyn_import_id, module_id)?;
match self.mod_evaluate(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?,
Err(err) => self.dyn_import_error(dyn_import_id, err)?,
};
}
}
}
}
}
fn evaluate_pending_modules(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let context = self.global_context();
{
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let mut state = state_rc.borrow_mut();
if let Some(&module_id) = state.pending_mod_evaluate.keys().next() {
let handle = state.pending_mod_evaluate.remove(&module_id).unwrap();
drop(state);
let promise = handle.0.get(scope);
let mut sender = handle.1.clone();
let promise_state = promise.state();
match promise_state {
v8::PromiseState::Pending => {
state_rc
.borrow_mut()
.pending_mod_evaluate
.insert(module_id, handle);
state_rc.borrow().waker.wake();
}
v8::PromiseState::Fulfilled => {
sender.try_send(Ok(())).unwrap();
}
v8::PromiseState::Rejected => {
let exception = promise.result(scope);
let err1 = exception_to_err_result::<()>(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err();
sender.try_send(Err(err1)).unwrap();
}
}
}
};
Ok(())
}
fn evaluate_dyn_imports(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
loop {
let context = self.global_context();
let maybe_result = {
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let mut state = state_rc.borrow_mut();
if let Some(&dyn_import_id) =
state.pending_dyn_mod_evaluate.keys().next()
{
let handle = state
.pending_dyn_mod_evaluate
.remove(&dyn_import_id)
.unwrap();
drop(state);
let module_id = handle.0;
let promise = handle.1.get(scope);
let _module = handle.2.get(scope);
let promise_state = promise.state();
match promise_state {
v8::PromiseState::Pending => {
state_rc
.borrow_mut()
.pending_dyn_mod_evaluate
.insert(dyn_import_id, handle);
state_rc.borrow().waker.wake();
None
}
v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))),
v8::PromiseState::Rejected => {
let exception = promise.result(scope);
let err1 = exception_to_err_result::<()>(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err();
Some(Err((dyn_import_id, err1)))
}
}
} else {
None
}
};
if let Some(result) = maybe_result {
match result {
Ok((dyn_import_id, module_id)) => {
self.dyn_import_done(dyn_import_id, module_id)?;
}
Err((dyn_import_id, err1)) => {
self.dyn_import_error(dyn_import_id, err1)?;
}
}
} else {
break;
}
}
Ok(())
}
fn register_during_load(
&mut self,
info: ModuleSource,
@ -1445,13 +1215,13 @@ pub mod tests {
futures::executor::block_on(lazy(move |cx| f(cx)));
}
fn poll_until_ready(
runtime: &mut JsRuntime,
max_poll_count: usize,
) -> Result<(), AnyError> {
fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output
where
F: Future + Unpin,
{
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
for _ in 0..max_poll_count {
match runtime.poll_event_loop(&mut cx) {
match future.poll_unpin(&mut cx) {
Poll::Pending => continue,
Poll::Ready(val) => return val,
}
@ -1667,7 +1437,7 @@ pub mod tests {
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
runtime
.execute(
@ -1680,11 +1450,11 @@ pub mod tests {
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
runtime.execute("check3.js", "assert(nrecv == 2)").unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
// We are idle, so the next poll should be the last.
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
});
}
@ -1708,7 +1478,7 @@ pub mod tests {
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but runtime can finish
// because the op is an unreffed async op.
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
})
}
@ -1838,7 +1608,7 @@ pub mod tests {
)
.unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
runtime
.execute("check.js", "assert(asyncRecv == 1);")
.unwrap();
@ -1930,7 +1700,7 @@ pub mod tests {
"#,
)
.unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -1943,7 +1713,7 @@ pub mod tests {
runtime
.execute("core_test.js", include_str!("core_test.js"))
.unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -1969,7 +1739,7 @@ pub mod tests {
include_str!("encode_decode_test.js"),
)
.unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -2218,7 +1988,7 @@ pub mod tests {
runtime.mod_instantiate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
runtime.mod_evaluate_inner(mod_a).unwrap();
runtime.mod_evaluate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
@ -2277,7 +2047,7 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 0);
// We should get an error here.
let result = runtime.poll_event_loop(cx);
let result = runtime.poll_unpin(cx);
if let Poll::Ready(Ok(_)) = result {
unreachable!();
}
@ -2370,14 +2140,14 @@ pub mod tests {
.unwrap();
// First poll runs `prepare_load` hook.
assert!(matches!(runtime.poll_event_loop(cx), Poll::Pending));
assert!(matches!(runtime.poll_unpin(cx), Poll::Pending));
assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1);
// Second poll actually loads modules into the isolate.
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
assert_eq!(resolve_count.load(Ordering::Relaxed), 4);
assert_eq!(load_count.load(Ordering::Relaxed), 2);
assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_))));
assert_eq!(resolve_count.load(Ordering::Relaxed), 4);
assert_eq!(load_count.load(Ordering::Relaxed), 2);
})
@ -2409,10 +2179,10 @@ pub mod tests {
)
.unwrap();
// First poll runs `prepare_load` hook.
let _ = runtime.poll_event_loop(cx);
let _ = runtime.poll_unpin(cx);
assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1);
// Second poll triggers error
let _ = runtime.poll_event_loop(cx);
let _ = runtime.poll_unpin(cx);
})
}
@ -2461,7 +2231,7 @@ pub mod tests {
)
.unwrap();
futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap();
runtime.mod_evaluate(module_id).unwrap();
let _snapshot = runtime.snapshot();
}
@ -2543,7 +2313,7 @@ main();
at async error_async_stack.js:10:5
"#;
match runtime.poll_event_loop(cx) {
match runtime.poll_unpin(cx) {
Poll::Ready(Err(e)) => {
assert_eq!(e.to_string(), expected_error);
}

View file

@ -75,6 +75,7 @@ pub fn get_declaration() -> PathBuf {
mod tests {
use deno_core::JsRuntime;
use futures::future::lazy;
use futures::future::FutureExt;
use futures::task::Context;
use futures::task::Poll;
@ -101,7 +102,7 @@ mod tests {
include_str!("abort_controller_test.js"),
)
.unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -114,7 +115,7 @@ mod tests {
isolate
.execute("event_test.js", include_str!("event_test.js"))
.unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -133,7 +134,7 @@ mod tests {
} else {
unreachable!();
}
if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -146,7 +147,7 @@ mod tests {
isolate
.execute("event_target_test.js", include_str!("event_target_test.js"))
.unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
unreachable!();
}
});
@ -162,7 +163,7 @@ mod tests {
include_str!("text_encoding_test.js"),
)
.unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
unreachable!();
}
});