From 8d00c32ee2e159ff8b833bf108e6e89c564ef562 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 5 Oct 2020 11:08:19 +0200 Subject: [PATCH] refactor(core): JsRuntime::poll (#7825) This commit does reorganization of "JsRuntime::poll" to allow fixing of top-level-await bug. --- core/runtime.rs | 449 +++++++++++++++++++++++------------------------- 1 file changed, 216 insertions(+), 233 deletions(-) diff --git a/core/runtime.rs b/core/runtime.rs index ecb9828f46..514703f343 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -310,6 +310,10 @@ impl JsRuntime { state.global_context.clone().unwrap() } + fn v8_isolate(&mut self) -> &mut v8::OwnedIsolate { + self.v8_isolate.as_mut().unwrap() + } + fn setup_isolate(mut isolate: v8::OwnedIsolate) -> v8::OwnedIsolate { isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 10); isolate.set_promise_reject_callback(bindings::promise_reject_callback); @@ -328,7 +332,7 @@ impl JsRuntime { } /// Executes a bit of built-in JavaScript to provide Deno.sharedQueue. - pub(crate) fn shared_init(&mut self) { + fn shared_init(&mut self) { if self.needs_init { self.needs_init = false; self.execute("core.js", include_str!("core.js")).unwrap(); @@ -353,15 +357,9 @@ impl JsRuntime { ) -> Result<(), AnyError> { self.shared_init(); - let state_rc = Self::state(self); - let state = state_rc.borrow(); + let context = self.global_context(); - let scope = &mut v8::HandleScope::with_context( - self.v8_isolate.as_mut().unwrap(), - state.global_context.as_ref().unwrap(), - ); - - drop(state); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let source = v8::String::new(scope, js_source).unwrap(); let name = v8::String::new(scope, js_filename).unwrap(); @@ -442,16 +440,12 @@ impl JsRuntime { .replace((boxed_cb, near_heap_limit_callback::)); if let Some((_, prev_cb)) = prev { self - .v8_isolate - .as_mut() - .unwrap() + .v8_isolate() .remove_near_heap_limit_callback(prev_cb, 0); } self - .v8_isolate - .as_mut() - .unwrap() + .v8_isolate() .add_near_heap_limit_callback(near_heap_limit_callback::, data); } @@ -459,9 +453,7 @@ impl JsRuntime { if let Some((_, cb)) = self.allocations.near_heap_limit_callback_data.take() { self - .v8_isolate - .as_mut() - .unwrap() + .v8_isolate() .remove_near_heap_limit_callback(cb, heap_limit); } } @@ -492,109 +484,43 @@ impl Future for JsRuntime { state.waker.register(cx.waker()); } - let has_preparing = { - let state = state_rc.borrow(); - !state.preparing_dyn_imports.is_empty() - }; - if has_preparing { + // Dynamic module loading - ie. modules loaded using "import()" + { let poll_imports = runtime.prepare_dyn_imports(cx)?; assert!(poll_imports.is_ready()); - } - let has_pending = { - let state = state_rc.borrow(); - !state.pending_dyn_imports.is_empty() - }; - if has_pending { let poll_imports = runtime.poll_dyn_imports(cx)?; assert!(poll_imports.is_ready()); + + runtime.check_promise_exceptions()?; } - let scope = &mut v8::HandleScope::with_context( - &mut **runtime, - state_rc.borrow().global_context.as_ref().unwrap(), - ); - - check_promise_exceptions(scope)?; - - let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; - - loop { - let mut state = state_rc.borrow_mut(); - // Now handle actual ops. - state.have_unpolled_ops.set(false); - - let pending_r = state.pending_ops.poll_next_unpin(cx); - match pending_r { - Poll::Ready(None) => break, - Poll::Pending => break, - Poll::Ready(Some((op_id, buf))) => { - let successful_push = state.shared.push(op_id, &buf); - if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; - } - } - }; - } - - loop { - let mut state = state_rc.borrow_mut(); - let unref_r = state.pending_unref_ops.poll_next_unpin(cx); - #[allow(clippy::match_wild_err_arm)] - match unref_r { - Poll::Ready(None) => break, - Poll::Pending => break, - Poll::Ready(Some((op_id, buf))) => { - let successful_push = state.shared.push(op_id, &buf); - if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; - } - } - }; - } - + // Ops { - let state = state_rc.borrow(); - if state.shared.size() > 0 { - drop(state); - async_op_response(scope, None)?; - // The other side should have shifted off all the messages. - let state = state_rc.borrow(); - assert_eq!(state.shared.size(), 0); - } - } - - { - if let Some((op_id, buf)) = overflow_response.take() { - async_op_response(scope, Some((op_id, buf)))?; - } - - drain_macrotasks(scope)?; - - check_promise_exceptions(scope)?; + let overflow_response = runtime.poll_pending_ops(cx); + runtime.async_op_response(overflow_response)?; + runtime.drain_macrotasks()?; + runtime.check_promise_exceptions()?; } let state = state_rc.borrow(); - // We're idle if pending_ops is empty. - if state.pending_ops.is_empty() - && state.pending_dyn_imports.is_empty() - && state.preparing_dyn_imports.is_empty() - { - Poll::Ready(Ok(())) - } else { - if state.have_unpolled_ops.get() { - state.waker.wake(); - } - Poll::Pending + let is_idle = { + state.pending_ops.is_empty() + && state.pending_dyn_imports.is_empty() + && state.preparing_dyn_imports.is_empty() + }; + + if is_idle { + return Poll::Ready(Ok(())); } + + // Check if more async ops have been dispatched + // during this turn of event loop. + if state.have_unpolled_ops.get() { + state.waker.wake(); + } + + Poll::Pending } } @@ -635,74 +561,6 @@ impl JsRuntimeState { } } -fn async_op_response<'s>( - scope: &mut v8::HandleScope<'s>, - maybe_buf: Option<(OpId, Box<[u8]>)>, -) -> Result<(), AnyError> { - let context = scope.get_current_context(); - let global: v8::Local = context.global(scope).into(); - let js_recv_cb = JsRuntime::state(scope) - .borrow() - .js_recv_cb - .as_ref() - .map(|cb| v8::Local::new(scope, cb)) - .expect("Deno.core.recv has not been called."); - - let tc_scope = &mut v8::TryCatch::new(scope); - - match maybe_buf { - Some((op_id, buf)) => { - let op_id: v8::Local = - v8::Integer::new(tc_scope, op_id as i32).into(); - let ui8: v8::Local = - boxed_slice_to_uint8array(tc_scope, buf).into(); - js_recv_cb.call(tc_scope, global, &[op_id, ui8]) - } - None => js_recv_cb.call(tc_scope, global, &[]), - }; - - match tc_scope.exception() { - None => Ok(()), - Some(exception) => exception_to_err_result(tc_scope, exception), - } -} - -fn drain_macrotasks<'s>( - scope: &mut v8::HandleScope<'s>, -) -> Result<(), AnyError> { - let context = scope.get_current_context(); - let global: v8::Local = context.global(scope).into(); - - let js_macrotask_cb = match JsRuntime::state(scope) - .borrow_mut() - .js_macrotask_cb - .as_ref() - { - Some(cb) => v8::Local::new(scope, cb), - None => return Ok(()), - }; - - // Repeatedly invoke macrotask callback until it returns true (done), - // such that ready microtasks would be automatically run before - // next macrotask is processed. - let tc_scope = &mut v8::TryCatch::new(scope); - - loop { - let is_done = js_macrotask_cb.call(tc_scope, global, &[]); - - if let Some(exception) = tc_scope.exception() { - return exception_to_err_result(tc_scope, exception); - } - - let is_done = is_done.unwrap(); - if is_done.is_true() { - break; - } - } - - Ok(()) -} - pub(crate) fn exception_to_err_result<'s, T>( scope: &mut v8::HandleScope<'s>, exception: v8::Local, @@ -743,35 +601,6 @@ pub(crate) fn exception_to_err_result<'s, T>( Err(js_error) } -fn check_promise_exceptions<'s>( - scope: &mut v8::HandleScope<'s>, -) -> Result<(), AnyError> { - let state_rc = JsRuntime::state(scope); - let mut state = state_rc.borrow_mut(); - - if let Some(&key) = state.pending_promise_exceptions.keys().next() { - let handle = state.pending_promise_exceptions.remove(&key).unwrap(); - drop(state); - let exception = v8::Local::new(scope, handle); - exception_to_err_result(scope, exception) - } else { - Ok(()) - } -} - -fn boxed_slice_to_uint8array<'sc>( - scope: &mut v8::HandleScope<'sc>, - buf: Box<[u8]>, -) -> v8::Local<'sc, v8::Uint8Array> { - assert!(!buf.is_empty()); - let buf_len = buf.len(); - let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf); - let backing_store_shared = backing_store.make_shared(); - let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared); - v8::Uint8Array::new(scope, ab, 0, buf_len) - .expect("Failed to create UintArray8") -} - // Related to module loading impl JsRuntime { /// Low-level module creation. @@ -784,10 +613,8 @@ impl JsRuntime { source: &str, ) -> Result { let state_rc = Self::state(self); - let scope = &mut v8::HandleScope::with_context( - &mut **self, - state_rc.borrow().global_context.as_ref().unwrap(), - ); + let context = self.global_context(); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let name_str = v8::String::new(scope, name).unwrap(); let source_str = v8::String::new(scope, source).unwrap(); @@ -840,13 +667,12 @@ impl JsRuntime { /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. fn mod_instantiate(&mut self, id: ModuleId) -> Result<(), AnyError> { let state_rc = Self::state(self); - let state = state_rc.borrow(); - let scope = &mut v8::HandleScope::with_context( - &mut **self, - state.global_context.as_ref().unwrap(), - ); + let context = self.global_context(); + + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let tc_scope = &mut v8::TryCatch::new(scope); + let state = state_rc.borrow(); let module = match state.modules.get_info(id) { Some(info) => v8::Local::new(tc_scope, &info.handle), None if id == 0 => return Ok(()), @@ -878,11 +704,9 @@ impl JsRuntime { self.shared_init(); let state_rc = Self::state(self); + let context = self.global_context(); - let scope = &mut v8::HandleScope::with_context( - &mut **self, - state_rc.borrow().global_context.as_ref().unwrap(), - ); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let module = state_rc .borrow() @@ -946,11 +770,9 @@ impl JsRuntime { err: AnyError, ) -> Result<(), AnyError> { let state_rc = Self::state(self); + let context = self.global_context(); - let scope = &mut v8::HandleScope::with_context( - &mut **self, - state_rc.borrow().global_context.as_ref().unwrap(), - ); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let resolver_handle = state_rc .borrow_mut() @@ -979,13 +801,11 @@ impl JsRuntime { mod_id: ModuleId, ) -> Result<(), AnyError> { let state_rc = Self::state(self); + let context = self.global_context(); debug!("dyn_import_done {} {:?}", id, mod_id); assert!(mod_id != 0); - let scope = &mut v8::HandleScope::with_context( - &mut **self, - state_rc.borrow().global_context.as_ref().unwrap(), - ); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); let resolver_handle = state_rc .borrow_mut() @@ -1017,6 +837,10 @@ impl JsRuntime { ) -> Poll> { let state_rc = Self::state(self); + if state_rc.borrow().preparing_dyn_imports.is_empty() { + return Poll::Ready(Ok(())); + } + loop { let r = { let mut state = state_rc.borrow_mut(); @@ -1050,6 +874,11 @@ impl JsRuntime { cx: &mut Context, ) -> Poll> { let state_rc = Self::state(self); + + if state_rc.borrow().pending_dyn_imports.is_empty() { + return Poll::Ready(Ok(())); + } + loop { let poll_result = { let mut state = state_rc.borrow_mut(); @@ -1222,6 +1051,163 @@ impl JsRuntime { let root_id = load.root_module_id.expect("Root module id empty"); self.mod_instantiate(root_id).map(|_| root_id) } + + fn poll_pending_ops( + &mut self, + cx: &mut Context, + ) -> Option<(OpId, Box<[u8]>)> { + let state_rc = Self::state(self); + let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; + + loop { + let mut state = state_rc.borrow_mut(); + // Now handle actual ops. + state.have_unpolled_ops.set(false); + + let pending_r = state.pending_ops.poll_next_unpin(cx); + match pending_r { + Poll::Ready(None) => break, + Poll::Pending => break, + Poll::Ready(Some((op_id, buf))) => { + let successful_push = state.shared.push(op_id, &buf); + if !successful_push { + // If we couldn't push the response to the shared queue, because + // there wasn't enough size, we will return the buffer via the + // legacy route, using the argument of deno_respond. + overflow_response = Some((op_id, buf)); + break; + } + } + }; + } + + loop { + let mut state = state_rc.borrow_mut(); + let unref_r = state.pending_unref_ops.poll_next_unpin(cx); + #[allow(clippy::match_wild_err_arm)] + match unref_r { + Poll::Ready(None) => break, + Poll::Pending => break, + Poll::Ready(Some((op_id, buf))) => { + let successful_push = state.shared.push(op_id, &buf); + if !successful_push { + // If we couldn't push the response to the shared queue, because + // there wasn't enough size, we will return the buffer via the + // legacy route, using the argument of deno_respond. + overflow_response = Some((op_id, buf)); + break; + } + } + }; + } + + overflow_response + } + + fn check_promise_exceptions(&mut self) -> Result<(), AnyError> { + let state_rc = Self::state(self); + let mut state = state_rc.borrow_mut(); + + if state.pending_promise_exceptions.is_empty() { + return Ok(()); + } + + let key = { *state.pending_promise_exceptions.keys().next().unwrap() }; + let handle = state.pending_promise_exceptions.remove(&key).unwrap(); + drop(state); + + let context = self.global_context(); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); + + let exception = v8::Local::new(scope, handle); + exception_to_err_result(scope, exception) + } + + // Respond using shared queue and optionally overflown response + fn async_op_response( + &mut self, + maybe_overflown_response: Option<(OpId, Box<[u8]>)>, + ) -> Result<(), AnyError> { + let state_rc = Self::state(self); + + let shared_queue_size = state_rc.borrow().shared.size(); + + if shared_queue_size == 0 && maybe_overflown_response.is_none() { + return Ok(()); + } + + // FIXME(bartlomieju): without check above this call would panic + // because of lazy initialization in core.js. It seems this lazy initialization + // hides unnecessary complexity. + let js_recv_cb_handle = state_rc + .borrow() + .js_recv_cb + .clone() + .expect("Deno.core.recv has not been called."); + + let context = self.global_context(); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); + let context = scope.get_current_context(); + let global: v8::Local = context.global(scope).into(); + let js_recv_cb = js_recv_cb_handle.get(scope); + + let tc_scope = &mut v8::TryCatch::new(scope); + + if shared_queue_size > 0 { + js_recv_cb.call(tc_scope, global, &[]); + // The other side should have shifted off all the messages. + let shared_queue_size = state_rc.borrow().shared.size(); + assert_eq!(shared_queue_size, 0); + } + + if let Some(overflown_response) = maybe_overflown_response { + let (op_id, buf) = overflown_response; + let op_id: v8::Local = + v8::Integer::new(tc_scope, op_id as i32).into(); + let ui8: v8::Local = + bindings::boxed_slice_to_uint8array(tc_scope, buf).into(); + js_recv_cb.call(tc_scope, global, &[op_id, ui8]); + } + + match tc_scope.exception() { + None => Ok(()), + Some(exception) => exception_to_err_result(tc_scope, exception), + } + } + + fn drain_macrotasks(&mut self) -> Result<(), AnyError> { + let js_macrotask_cb_handle = + match &Self::state(self).borrow().js_macrotask_cb { + Some(handle) => handle.clone(), + None => return Ok(()), + }; + + let context = self.global_context(); + let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context); + let context = scope.get_current_context(); + let global: v8::Local = context.global(scope).into(); + let js_macrotask_cb = js_macrotask_cb_handle.get(scope); + + // Repeatedly invoke macrotask callback until it returns true (done), + // such that ready microtasks would be automatically run before + // next macrotask is processed. + let tc_scope = &mut v8::TryCatch::new(scope); + + loop { + let is_done = js_macrotask_cb.call(tc_scope, global, &[]); + + if let Some(exception) = tc_scope.exception() { + return exception_to_err_result(tc_scope, exception); + } + + let is_done = is_done.unwrap(); + if is_done.is_true() { + break; + } + } + + Ok(()) + } } #[cfg(test)] @@ -1516,8 +1502,7 @@ pub mod tests { let (mut isolate, _dispatch_count) = setup(Mode::Async); // TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method // should not require a mutable reference to `struct rusty_v8::Isolate`. - let v8_isolate_handle = - isolate.v8_isolate.as_mut().unwrap().thread_safe_handle(); + let v8_isolate_handle = isolate.v8_isolate().thread_safe_handle(); let terminator_thread = std::thread::spawn(move || { // allow deno to boot and run @@ -1541,9 +1526,7 @@ pub mod tests { // TODO(piscisaureus): in rusty_v8, `cancel_terminate_execution()` should // also be implemented on `struct Isolate`. let ok = isolate - .v8_isolate - .as_mut() - .unwrap() + .v8_isolate() .thread_safe_handle() .cancel_terminate_execution(); assert!(ok); @@ -1563,7 +1546,7 @@ pub mod tests { let (mut runtime, _dispatch_count) = setup(Mode::Async); // TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method // should not require a mutable reference to `struct rusty_v8::Isolate`. - runtime.v8_isolate.as_mut().unwrap().thread_safe_handle() + runtime.v8_isolate().thread_safe_handle() }; // this should not SEGFAULT