1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 23:34:47 -05:00

refactor(core): JsRuntime::poll (#7825)

This commit does reorganization of "JsRuntime::poll" to allow fixing of top-level-await bug.
This commit is contained in:
Bartek Iwańczuk 2020-10-05 11:08:19 +02:00 committed by GitHub
parent f377b611ba
commit 8d00c32ee2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -310,6 +310,10 @@ impl JsRuntime {
state.global_context.clone().unwrap()
}
fn v8_isolate(&mut self) -> &mut v8::OwnedIsolate {
self.v8_isolate.as_mut().unwrap()
}
fn setup_isolate(mut isolate: v8::OwnedIsolate) -> v8::OwnedIsolate {
isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 10);
isolate.set_promise_reject_callback(bindings::promise_reject_callback);
@ -328,7 +332,7 @@ impl JsRuntime {
}
/// Executes a bit of built-in JavaScript to provide Deno.sharedQueue.
pub(crate) fn shared_init(&mut self) {
fn shared_init(&mut self) {
if self.needs_init {
self.needs_init = false;
self.execute("core.js", include_str!("core.js")).unwrap();
@ -353,15 +357,9 @@ impl JsRuntime {
) -> Result<(), AnyError> {
self.shared_init();
let state_rc = Self::state(self);
let state = state_rc.borrow();
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(
self.v8_isolate.as_mut().unwrap(),
state.global_context.as_ref().unwrap(),
);
drop(state);
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let source = v8::String::new(scope, js_source).unwrap();
let name = v8::String::new(scope, js_filename).unwrap();
@ -442,16 +440,12 @@ impl JsRuntime {
.replace((boxed_cb, near_heap_limit_callback::<C>));
if let Some((_, prev_cb)) = prev {
self
.v8_isolate
.as_mut()
.unwrap()
.v8_isolate()
.remove_near_heap_limit_callback(prev_cb, 0);
}
self
.v8_isolate
.as_mut()
.unwrap()
.v8_isolate()
.add_near_heap_limit_callback(near_heap_limit_callback::<C>, data);
}
@ -459,9 +453,7 @@ impl JsRuntime {
if let Some((_, cb)) = self.allocations.near_heap_limit_callback_data.take()
{
self
.v8_isolate
.as_mut()
.unwrap()
.v8_isolate()
.remove_near_heap_limit_callback(cb, heap_limit);
}
}
@ -492,109 +484,43 @@ impl Future for JsRuntime {
state.waker.register(cx.waker());
}
let has_preparing = {
let state = state_rc.borrow();
!state.preparing_dyn_imports.is_empty()
};
if has_preparing {
// Dynamic module loading - ie. modules loaded using "import()"
{
let poll_imports = runtime.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
}
let has_pending = {
let state = state_rc.borrow();
!state.pending_dyn_imports.is_empty()
};
if has_pending {
let poll_imports = runtime.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
runtime.check_promise_exceptions()?;
}
let scope = &mut v8::HandleScope::with_context(
&mut **runtime,
state_rc.borrow().global_context.as_ref().unwrap(),
);
check_promise_exceptions(scope)?;
let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
state.have_unpolled_ops.set(false);
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
}
}
};
}
loop {
let mut state = state_rc.borrow_mut();
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
#[allow(clippy::match_wild_err_arm)]
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
}
}
};
}
// Ops
{
let state = state_rc.borrow();
if state.shared.size() > 0 {
drop(state);
async_op_response(scope, None)?;
// The other side should have shifted off all the messages.
let state = state_rc.borrow();
assert_eq!(state.shared.size(), 0);
}
}
{
if let Some((op_id, buf)) = overflow_response.take() {
async_op_response(scope, Some((op_id, buf)))?;
}
drain_macrotasks(scope)?;
check_promise_exceptions(scope)?;
let overflow_response = runtime.poll_pending_ops(cx);
runtime.async_op_response(overflow_response)?;
runtime.drain_macrotasks()?;
runtime.check_promise_exceptions()?;
}
let state = state_rc.borrow();
// We're idle if pending_ops is empty.
if state.pending_ops.is_empty()
&& state.pending_dyn_imports.is_empty()
&& state.preparing_dyn_imports.is_empty()
{
Poll::Ready(Ok(()))
} else {
if state.have_unpolled_ops.get() {
state.waker.wake();
}
Poll::Pending
let is_idle = {
state.pending_ops.is_empty()
&& state.pending_dyn_imports.is_empty()
&& state.preparing_dyn_imports.is_empty()
};
if is_idle {
return Poll::Ready(Ok(()));
}
// Check if more async ops have been dispatched
// during this turn of event loop.
if state.have_unpolled_ops.get() {
state.waker.wake();
}
Poll::Pending
}
}
@ -635,74 +561,6 @@ impl JsRuntimeState {
}
}
fn async_op_response<'s>(
scope: &mut v8::HandleScope<'s>,
maybe_buf: Option<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let context = scope.get_current_context();
let global: v8::Local<v8::Value> = context.global(scope).into();
let js_recv_cb = JsRuntime::state(scope)
.borrow()
.js_recv_cb
.as_ref()
.map(|cb| v8::Local::new(scope, cb))
.expect("Deno.core.recv has not been called.");
let tc_scope = &mut v8::TryCatch::new(scope);
match maybe_buf {
Some((op_id, buf)) => {
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8])
}
None => js_recv_cb.call(tc_scope, global, &[]),
};
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception),
}
}
fn drain_macrotasks<'s>(
scope: &mut v8::HandleScope<'s>,
) -> Result<(), AnyError> {
let context = scope.get_current_context();
let global: v8::Local<v8::Value> = context.global(scope).into();
let js_macrotask_cb = match JsRuntime::state(scope)
.borrow_mut()
.js_macrotask_cb
.as_ref()
{
Some(cb) => v8::Local::new(scope, cb),
None => return Ok(()),
};
// Repeatedly invoke macrotask callback until it returns true (done),
// such that ready microtasks would be automatically run before
// next macrotask is processed.
let tc_scope = &mut v8::TryCatch::new(scope);
loop {
let is_done = js_macrotask_cb.call(tc_scope, global, &[]);
if let Some(exception) = tc_scope.exception() {
return exception_to_err_result(tc_scope, exception);
}
let is_done = is_done.unwrap();
if is_done.is_true() {
break;
}
}
Ok(())
}
pub(crate) fn exception_to_err_result<'s, T>(
scope: &mut v8::HandleScope<'s>,
exception: v8::Local<v8::Value>,
@ -743,35 +601,6 @@ pub(crate) fn exception_to_err_result<'s, T>(
Err(js_error)
}
fn check_promise_exceptions<'s>(
scope: &mut v8::HandleScope<'s>,
) -> Result<(), AnyError> {
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
if let Some(&key) = state.pending_promise_exceptions.keys().next() {
let handle = state.pending_promise_exceptions.remove(&key).unwrap();
drop(state);
let exception = v8::Local::new(scope, handle);
exception_to_err_result(scope, exception)
} else {
Ok(())
}
}
fn boxed_slice_to_uint8array<'sc>(
scope: &mut v8::HandleScope<'sc>,
buf: Box<[u8]>,
) -> v8::Local<'sc, v8::Uint8Array> {
assert!(!buf.is_empty());
let buf_len = buf.len();
let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf);
let backing_store_shared = backing_store.make_shared();
let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared);
v8::Uint8Array::new(scope, ab, 0, buf_len)
.expect("Failed to create UintArray8")
}
// Related to module loading
impl JsRuntime {
/// Low-level module creation.
@ -784,10 +613,8 @@ impl JsRuntime {
source: &str,
) -> Result<ModuleId, AnyError> {
let state_rc = Self::state(self);
let scope = &mut v8::HandleScope::with_context(
&mut **self,
state_rc.borrow().global_context.as_ref().unwrap(),
);
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let name_str = v8::String::new(scope, name).unwrap();
let source_str = v8::String::new(scope, source).unwrap();
@ -840,13 +667,12 @@ impl JsRuntime {
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
fn mod_instantiate(&mut self, id: ModuleId) -> Result<(), AnyError> {
let state_rc = Self::state(self);
let state = state_rc.borrow();
let scope = &mut v8::HandleScope::with_context(
&mut **self,
state.global_context.as_ref().unwrap(),
);
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let tc_scope = &mut v8::TryCatch::new(scope);
let state = state_rc.borrow();
let module = match state.modules.get_info(id) {
Some(info) => v8::Local::new(tc_scope, &info.handle),
None if id == 0 => return Ok(()),
@ -878,11 +704,9 @@ impl JsRuntime {
self.shared_init();
let state_rc = Self::state(self);
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(
&mut **self,
state_rc.borrow().global_context.as_ref().unwrap(),
);
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let module = state_rc
.borrow()
@ -946,11 +770,9 @@ impl JsRuntime {
err: AnyError,
) -> Result<(), AnyError> {
let state_rc = Self::state(self);
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(
&mut **self,
state_rc.borrow().global_context.as_ref().unwrap(),
);
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let resolver_handle = state_rc
.borrow_mut()
@ -979,13 +801,11 @@ impl JsRuntime {
mod_id: ModuleId,
) -> Result<(), AnyError> {
let state_rc = Self::state(self);
let context = self.global_context();
debug!("dyn_import_done {} {:?}", id, mod_id);
assert!(mod_id != 0);
let scope = &mut v8::HandleScope::with_context(
&mut **self,
state_rc.borrow().global_context.as_ref().unwrap(),
);
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let resolver_handle = state_rc
.borrow_mut()
@ -1017,6 +837,10 @@ impl JsRuntime {
) -> Poll<Result<(), AnyError>> {
let state_rc = Self::state(self);
if state_rc.borrow().preparing_dyn_imports.is_empty() {
return Poll::Ready(Ok(()));
}
loop {
let r = {
let mut state = state_rc.borrow_mut();
@ -1050,6 +874,11 @@ impl JsRuntime {
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
let state_rc = Self::state(self);
if state_rc.borrow().pending_dyn_imports.is_empty() {
return Poll::Ready(Ok(()));
}
loop {
let poll_result = {
let mut state = state_rc.borrow_mut();
@ -1222,6 +1051,163 @@ impl JsRuntime {
let root_id = load.root_module_id.expect("Root module id empty");
self.mod_instantiate(root_id).map(|_| root_id)
}
fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Option<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self);
let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
state.have_unpolled_ops.set(false);
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
}
}
};
}
loop {
let mut state = state_rc.borrow_mut();
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
#[allow(clippy::match_wild_err_arm)]
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
}
}
};
}
overflow_response
}
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self);
let mut state = state_rc.borrow_mut();
if state.pending_promise_exceptions.is_empty() {
return Ok(());
}
let key = { *state.pending_promise_exceptions.keys().next().unwrap() };
let handle = state.pending_promise_exceptions.remove(&key).unwrap();
drop(state);
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let exception = v8::Local::new(scope, handle);
exception_to_err_result(scope, exception)
}
// Respond using shared queue and optionally overflown response
fn async_op_response(
&mut self,
maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self);
let shared_queue_size = state_rc.borrow().shared.size();
if shared_queue_size == 0 && maybe_overflown_response.is_none() {
return Ok(());
}
// FIXME(bartlomieju): without check above this call would panic
// because of lazy initialization in core.js. It seems this lazy initialization
// hides unnecessary complexity.
let js_recv_cb_handle = state_rc
.borrow()
.js_recv_cb
.clone()
.expect("Deno.core.recv has not been called.");
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let context = scope.get_current_context();
let global: v8::Local<v8::Value> = context.global(scope).into();
let js_recv_cb = js_recv_cb_handle.get(scope);
let tc_scope = &mut v8::TryCatch::new(scope);
if shared_queue_size > 0 {
js_recv_cb.call(tc_scope, global, &[]);
// The other side should have shifted off all the messages.
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
}
if let Some(overflown_response) = maybe_overflown_response {
let (op_id, buf) = overflown_response;
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
}
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception),
}
}
fn drain_macrotasks(&mut self) -> Result<(), AnyError> {
let js_macrotask_cb_handle =
match &Self::state(self).borrow().js_macrotask_cb {
Some(handle) => handle.clone(),
None => return Ok(()),
};
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let context = scope.get_current_context();
let global: v8::Local<v8::Value> = context.global(scope).into();
let js_macrotask_cb = js_macrotask_cb_handle.get(scope);
// Repeatedly invoke macrotask callback until it returns true (done),
// such that ready microtasks would be automatically run before
// next macrotask is processed.
let tc_scope = &mut v8::TryCatch::new(scope);
loop {
let is_done = js_macrotask_cb.call(tc_scope, global, &[]);
if let Some(exception) = tc_scope.exception() {
return exception_to_err_result(tc_scope, exception);
}
let is_done = is_done.unwrap();
if is_done.is_true() {
break;
}
}
Ok(())
}
}
#[cfg(test)]
@ -1516,8 +1502,7 @@ pub mod tests {
let (mut isolate, _dispatch_count) = setup(Mode::Async);
// TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method
// should not require a mutable reference to `struct rusty_v8::Isolate`.
let v8_isolate_handle =
isolate.v8_isolate.as_mut().unwrap().thread_safe_handle();
let v8_isolate_handle = isolate.v8_isolate().thread_safe_handle();
let terminator_thread = std::thread::spawn(move || {
// allow deno to boot and run
@ -1541,9 +1526,7 @@ pub mod tests {
// TODO(piscisaureus): in rusty_v8, `cancel_terminate_execution()` should
// also be implemented on `struct Isolate`.
let ok = isolate
.v8_isolate
.as_mut()
.unwrap()
.v8_isolate()
.thread_safe_handle()
.cancel_terminate_execution();
assert!(ok);
@ -1563,7 +1546,7 @@ pub mod tests {
let (mut runtime, _dispatch_count) = setup(Mode::Async);
// TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method
// should not require a mutable reference to `struct rusty_v8::Isolate`.
runtime.v8_isolate.as_mut().unwrap().thread_safe_handle()
runtime.v8_isolate().thread_safe_handle()
};
// this should not SEGFAULT