1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

refactor(core): InspectorSession implements Stream (#13270)

This commit rewrites "InspectorSession" to no longer implement "Future"
trait but instead implement "Stream" trait. "Stream" trait is implemented
by yielding a raw pointer to the "v8::inspector::V8InspectorSession" and
received message. In effect received messages are no longer dispatched
from within the future, but are explicitly dispatched by the caller.

This change should allow us to dispatch a message to the session when
another message is being dispatched, ie.
"V8InspectorSesssion::dispatch_protocol_message" is already on the
call stack.
This commit is contained in:
Bartek Iwańczuk 2022-01-04 18:26:00 +01:00
parent fbdd6c81e7
commit 8a10f4bfcb
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750

View file

@ -13,7 +13,7 @@ use crate::futures::future::select;
use crate::futures::future::Either; use crate::futures::future::Either;
use crate::futures::future::Future; use crate::futures::future::Future;
use crate::futures::prelude::*; use crate::futures::prelude::*;
use crate::futures::stream::FuturesUnordered; use crate::futures::stream::SelectAll;
use crate::futures::stream::StreamExt; use crate::futures::stream::StreamExt;
use crate::futures::task; use crate::futures::task;
use crate::futures::task::Context; use crate::futures::task::Context;
@ -226,10 +226,20 @@ impl JsRuntimeInspector {
loop { loop {
// Do one "handshake" with a newly connected session at a time. // Do one "handshake" with a newly connected session at a time.
if let Some(mut session) = sessions.handshake.take() { if let Some(mut session) = sessions.handshake.take() {
if session.poll_unpin(cx).is_pending() { let poll_result = session.poll_next_unpin(cx);
match poll_result {
Poll::Pending => {
sessions.established.push(session); sessions.established.push(session);
continue; continue;
} }
Poll::Ready(Some(session_stream_item)) => {
let (v8_session_ptr, msg) = session_stream_item;
InspectorSession::dispatch_message(v8_session_ptr, msg);
sessions.established.push(session);
continue;
}
Poll::Ready(None) => {}
}
} }
// Accept new connections. // Accept new connections.
@ -243,7 +253,11 @@ impl JsRuntimeInspector {
// Poll established sessions. // Poll established sessions.
match sessions.established.poll_next_unpin(cx) { match sessions.established.poll_next_unpin(cx) {
Poll::Ready(Some(_)) => continue, Poll::Ready(Some(session_stream_item)) => {
let (v8_session_ptr, msg) = session_stream_item;
InspectorSession::dispatch_message(v8_session_ptr, msg);
continue;
}
Poll::Ready(None) => break, Poll::Ready(None) => break,
Poll::Pending => break, Poll::Pending => break,
}; };
@ -371,7 +385,7 @@ struct SessionContainer {
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>, v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
session_rx: UnboundedReceiver<InspectorSessionProxy>, session_rx: UnboundedReceiver<InspectorSessionProxy>,
handshake: Option<Box<InspectorSession>>, handshake: Option<Box<InspectorSession>>,
established: FuturesUnordered<Box<InspectorSession>>, established: SelectAll<Box<InspectorSession>>,
} }
impl SessionContainer { impl SessionContainer {
@ -383,7 +397,7 @@ impl SessionContainer {
v8_inspector, v8_inspector,
session_rx: new_session_rx, session_rx: new_session_rx,
handshake: None, handshake: None,
established: FuturesUnordered::new(), established: SelectAll::new(),
} }
} }
@ -411,7 +425,7 @@ impl SessionContainer {
v8_inspector: Default::default(), v8_inspector: Default::default(),
session_rx: rx, session_rx: rx,
handshake: None, handshake: None,
established: FuturesUnordered::new(), established: SelectAll::new(),
} }
} }
} }
@ -492,7 +506,7 @@ impl task::ArcWake for InspectorWaker {
/// eg. Websocket or another set of channels. /// eg. Websocket or another set of channels.
struct InspectorSession { struct InspectorSession {
v8_channel: v8::inspector::ChannelBase, v8_channel: v8::inspector::ChannelBase,
v8_session: Rc<RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>>, v8_session: v8::UniqueRef<v8::inspector::V8InspectorSession>,
proxy: InspectorSessionProxy, proxy: InspectorSessionProxy,
} }
@ -507,13 +521,13 @@ impl InspectorSession {
let v8_channel = v8::inspector::ChannelBase::new::<Self>(); let v8_channel = v8::inspector::ChannelBase::new::<Self>();
let mut v8_inspector = v8_inspector_rc.borrow_mut(); let mut v8_inspector = v8_inspector_rc.borrow_mut();
let v8_inspector_ptr = v8_inspector.as_mut().unwrap(); let v8_inspector_ptr = v8_inspector.as_mut().unwrap();
let v8_session = Rc::new(RefCell::new(v8_inspector_ptr.connect( let v8_session = v8_inspector_ptr.connect(
Self::CONTEXT_GROUP_ID, Self::CONTEXT_GROUP_ID,
// Todo(piscisaureus): V8Inspector::connect() should require that // Todo(piscisaureus): V8Inspector::connect() should require that
// the 'v8_channel' argument cannot move. // the 'v8_channel' argument cannot move.
unsafe { &mut *self_ptr }, unsafe { &mut *self_ptr },
v8::inspector::StringView::empty(), v8::inspector::StringView::empty(),
))); );
Self { Self {
v8_channel, v8_channel,
@ -524,12 +538,14 @@ impl InspectorSession {
} }
// Dispatch message to V8 session // Dispatch message to V8 session
#[allow(unused)] fn dispatch_message(
fn dispatch_message(&mut self, msg: Vec<u8>) { v8_session_ptr: *mut v8::inspector::V8InspectorSession,
let msg = v8::inspector::StringView::from(msg.as_slice()); msg: String,
let mut v8_session = self.v8_session.borrow_mut(); ) {
let v8_session_ptr = v8_session.as_mut(); let msg = v8::inspector::StringView::from(msg.as_bytes());
v8_session_ptr.dispatch_protocol_message(msg); unsafe {
(*v8_session_ptr).dispatch_protocol_message(msg);
};
} }
fn send_message( fn send_message(
@ -547,11 +563,9 @@ impl InspectorSession {
pub fn break_on_next_statement(&mut self) { pub fn break_on_next_statement(&mut self) {
let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
let detail = v8::inspector::StringView::empty(); let detail = v8::inspector::StringView::empty();
self // TODO(bartlomieju): use raw `*mut V8InspectorSession` pointer, as this
.v8_session // reference may become aliased.
.borrow_mut() (*self.v8_session).schedule_pause_on_next_statement(reason, detail);
.as_mut()
.schedule_pause_on_next_statement(reason, detail);
} }
} }
@ -582,19 +596,19 @@ impl v8::inspector::ChannelImpl for InspectorSession {
fn flush_protocol_notifications(&mut self) {} fn flush_protocol_notifications(&mut self) {}
} }
/// This is a "pump" future takes care of receiving messages and dispatching impl Stream for InspectorSession {
/// them to the inspector. It resolves when receiver closes. type Item = (*mut v8::inspector::V8InspectorSession, String);
impl Future for InspectorSession {
type Output = (); fn poll_next(
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { self: Pin<&mut Self>,
while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) { cx: &mut Context,
) -> Poll<Option<Self::Item>> {
let inner = self.get_mut();
if let Poll::Ready(maybe_msg) = inner.proxy.rx.poll_next_unpin(cx) {
if let Some(msg) = maybe_msg { if let Some(msg) = maybe_msg {
let msg = v8::inspector::StringView::from(msg.as_bytes()); return Poll::Ready(Some((&mut *inner.v8_session, msg)));
let mut v8_session = self.v8_session.borrow_mut();
let v8_session_ptr = v8_session.as_mut();
v8_session_ptr.dispatch_protocol_message(msg);
} else { } else {
return Poll::Ready(()); return Poll::Ready(None);
} }
} }