mirror of
https://github.com/denoland/deno.git
synced 2024-12-22 15:24:46 -05:00
refactor(core): InspectorSession implements Stream (#13270)
This commit rewrites "InspectorSession" to no longer implement "Future" trait but instead implement "Stream" trait. "Stream" trait is implemented by yielding a raw pointer to the "v8::inspector::V8InspectorSession" and received message. In effect received messages are no longer dispatched from within the future, but are explicitly dispatched by the caller. This change should allow us to dispatch a message to the session when another message is being dispatched, ie. "V8InspectorSesssion::dispatch_protocol_message" is already on the call stack.
This commit is contained in:
parent
9ce28fd3ca
commit
9872fbf189
1 changed files with 47 additions and 33 deletions
|
@ -13,7 +13,7 @@ use crate::futures::future::select;
|
|||
use crate::futures::future::Either;
|
||||
use crate::futures::future::Future;
|
||||
use crate::futures::prelude::*;
|
||||
use crate::futures::stream::FuturesUnordered;
|
||||
use crate::futures::stream::SelectAll;
|
||||
use crate::futures::stream::StreamExt;
|
||||
use crate::futures::task;
|
||||
use crate::futures::task::Context;
|
||||
|
@ -226,9 +226,19 @@ impl JsRuntimeInspector {
|
|||
loop {
|
||||
// Do one "handshake" with a newly connected session at a time.
|
||||
if let Some(mut session) = sessions.handshake.take() {
|
||||
if session.poll_unpin(cx).is_pending() {
|
||||
sessions.established.push(session);
|
||||
continue;
|
||||
let poll_result = session.poll_next_unpin(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => {
|
||||
sessions.established.push(session);
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(session_stream_item)) => {
|
||||
let (v8_session_ptr, msg) = session_stream_item;
|
||||
InspectorSession::dispatch_message(v8_session_ptr, msg);
|
||||
sessions.established.push(session);
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(None) => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,7 +253,11 @@ impl JsRuntimeInspector {
|
|||
|
||||
// Poll established sessions.
|
||||
match sessions.established.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(_)) => continue,
|
||||
Poll::Ready(Some(session_stream_item)) => {
|
||||
let (v8_session_ptr, msg) = session_stream_item;
|
||||
InspectorSession::dispatch_message(v8_session_ptr, msg);
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(None) => break,
|
||||
Poll::Pending => break,
|
||||
};
|
||||
|
@ -371,7 +385,7 @@ struct SessionContainer {
|
|||
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
|
||||
session_rx: UnboundedReceiver<InspectorSessionProxy>,
|
||||
handshake: Option<Box<InspectorSession>>,
|
||||
established: FuturesUnordered<Box<InspectorSession>>,
|
||||
established: SelectAll<Box<InspectorSession>>,
|
||||
}
|
||||
|
||||
impl SessionContainer {
|
||||
|
@ -383,7 +397,7 @@ impl SessionContainer {
|
|||
v8_inspector,
|
||||
session_rx: new_session_rx,
|
||||
handshake: None,
|
||||
established: FuturesUnordered::new(),
|
||||
established: SelectAll::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -411,7 +425,7 @@ impl SessionContainer {
|
|||
v8_inspector: Default::default(),
|
||||
session_rx: rx,
|
||||
handshake: None,
|
||||
established: FuturesUnordered::new(),
|
||||
established: SelectAll::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -492,7 +506,7 @@ impl task::ArcWake for InspectorWaker {
|
|||
/// eg. Websocket or another set of channels.
|
||||
struct InspectorSession {
|
||||
v8_channel: v8::inspector::ChannelBase,
|
||||
v8_session: Rc<RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>>,
|
||||
v8_session: v8::UniqueRef<v8::inspector::V8InspectorSession>,
|
||||
proxy: InspectorSessionProxy,
|
||||
}
|
||||
|
||||
|
@ -507,13 +521,13 @@ impl InspectorSession {
|
|||
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
|
||||
let mut v8_inspector = v8_inspector_rc.borrow_mut();
|
||||
let v8_inspector_ptr = v8_inspector.as_mut().unwrap();
|
||||
let v8_session = Rc::new(RefCell::new(v8_inspector_ptr.connect(
|
||||
let v8_session = v8_inspector_ptr.connect(
|
||||
Self::CONTEXT_GROUP_ID,
|
||||
// Todo(piscisaureus): V8Inspector::connect() should require that
|
||||
// the 'v8_channel' argument cannot move.
|
||||
unsafe { &mut *self_ptr },
|
||||
v8::inspector::StringView::empty(),
|
||||
)));
|
||||
);
|
||||
|
||||
Self {
|
||||
v8_channel,
|
||||
|
@ -524,12 +538,14 @@ impl InspectorSession {
|
|||
}
|
||||
|
||||
// Dispatch message to V8 session
|
||||
#[allow(unused)]
|
||||
fn dispatch_message(&mut self, msg: Vec<u8>) {
|
||||
let msg = v8::inspector::StringView::from(msg.as_slice());
|
||||
let mut v8_session = self.v8_session.borrow_mut();
|
||||
let v8_session_ptr = v8_session.as_mut();
|
||||
v8_session_ptr.dispatch_protocol_message(msg);
|
||||
fn dispatch_message(
|
||||
v8_session_ptr: *mut v8::inspector::V8InspectorSession,
|
||||
msg: String,
|
||||
) {
|
||||
let msg = v8::inspector::StringView::from(msg.as_bytes());
|
||||
unsafe {
|
||||
(*v8_session_ptr).dispatch_protocol_message(msg);
|
||||
};
|
||||
}
|
||||
|
||||
fn send_message(
|
||||
|
@ -547,11 +563,9 @@ impl InspectorSession {
|
|||
pub fn break_on_next_statement(&mut self) {
|
||||
let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
|
||||
let detail = v8::inspector::StringView::empty();
|
||||
self
|
||||
.v8_session
|
||||
.borrow_mut()
|
||||
.as_mut()
|
||||
.schedule_pause_on_next_statement(reason, detail);
|
||||
// TODO(bartlomieju): use raw `*mut V8InspectorSession` pointer, as this
|
||||
// reference may become aliased.
|
||||
(*self.v8_session).schedule_pause_on_next_statement(reason, detail);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -582,19 +596,19 @@ impl v8::inspector::ChannelImpl for InspectorSession {
|
|||
fn flush_protocol_notifications(&mut self) {}
|
||||
}
|
||||
|
||||
/// This is a "pump" future takes care of receiving messages and dispatching
|
||||
/// them to the inspector. It resolves when receiver closes.
|
||||
impl Future for InspectorSession {
|
||||
type Output = ();
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) {
|
||||
impl Stream for InspectorSession {
|
||||
type Item = (*mut v8::inspector::V8InspectorSession, String);
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let inner = self.get_mut();
|
||||
if let Poll::Ready(maybe_msg) = inner.proxy.rx.poll_next_unpin(cx) {
|
||||
if let Some(msg) = maybe_msg {
|
||||
let msg = v8::inspector::StringView::from(msg.as_bytes());
|
||||
let mut v8_session = self.v8_session.borrow_mut();
|
||||
let v8_session_ptr = v8_session.as_mut();
|
||||
v8_session_ptr.dispatch_protocol_message(msg);
|
||||
return Poll::Ready(Some((&mut *inner.v8_session, msg)));
|
||||
} else {
|
||||
return Poll::Ready(());
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue