diff --git a/core/inspector.rs b/core/inspector.rs index 0339248854..1ce0b77ad8 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -13,7 +13,7 @@ use crate::futures::future::select; use crate::futures::future::Either; use crate::futures::future::Future; use crate::futures::prelude::*; -use crate::futures::stream::FuturesUnordered; +use crate::futures::stream::SelectAll; use crate::futures::stream::StreamExt; use crate::futures::task; use crate::futures::task::Context; @@ -226,9 +226,19 @@ impl JsRuntimeInspector { loop { // Do one "handshake" with a newly connected session at a time. if let Some(mut session) = sessions.handshake.take() { - if session.poll_unpin(cx).is_pending() { - sessions.established.push(session); - continue; + let poll_result = session.poll_next_unpin(cx); + match poll_result { + Poll::Pending => { + sessions.established.push(session); + continue; + } + Poll::Ready(Some(session_stream_item)) => { + let (v8_session_ptr, msg) = session_stream_item; + InspectorSession::dispatch_message(v8_session_ptr, msg); + sessions.established.push(session); + continue; + } + Poll::Ready(None) => {} } } @@ -243,7 +253,11 @@ impl JsRuntimeInspector { // Poll established sessions. match sessions.established.poll_next_unpin(cx) { - Poll::Ready(Some(_)) => continue, + Poll::Ready(Some(session_stream_item)) => { + let (v8_session_ptr, msg) = session_stream_item; + InspectorSession::dispatch_message(v8_session_ptr, msg); + continue; + } Poll::Ready(None) => break, Poll::Pending => break, }; @@ -371,7 +385,7 @@ struct SessionContainer { v8_inspector: Rc>>, session_rx: UnboundedReceiver, handshake: Option>, - established: FuturesUnordered>, + established: SelectAll>, } impl SessionContainer { @@ -383,7 +397,7 @@ impl SessionContainer { v8_inspector, session_rx: new_session_rx, handshake: None, - established: FuturesUnordered::new(), + established: SelectAll::new(), } } @@ -411,7 +425,7 @@ impl SessionContainer { v8_inspector: Default::default(), session_rx: rx, handshake: None, - established: FuturesUnordered::new(), + established: SelectAll::new(), } } } @@ -492,7 +506,7 @@ impl task::ArcWake for InspectorWaker { /// eg. Websocket or another set of channels. struct InspectorSession { v8_channel: v8::inspector::ChannelBase, - v8_session: Rc>>, + v8_session: v8::UniqueRef, proxy: InspectorSessionProxy, } @@ -507,13 +521,13 @@ impl InspectorSession { let v8_channel = v8::inspector::ChannelBase::new::(); let mut v8_inspector = v8_inspector_rc.borrow_mut(); let v8_inspector_ptr = v8_inspector.as_mut().unwrap(); - let v8_session = Rc::new(RefCell::new(v8_inspector_ptr.connect( + let v8_session = v8_inspector_ptr.connect( Self::CONTEXT_GROUP_ID, // Todo(piscisaureus): V8Inspector::connect() should require that // the 'v8_channel' argument cannot move. unsafe { &mut *self_ptr }, v8::inspector::StringView::empty(), - ))); + ); Self { v8_channel, @@ -524,12 +538,14 @@ impl InspectorSession { } // Dispatch message to V8 session - #[allow(unused)] - fn dispatch_message(&mut self, msg: Vec) { - let msg = v8::inspector::StringView::from(msg.as_slice()); - let mut v8_session = self.v8_session.borrow_mut(); - let v8_session_ptr = v8_session.as_mut(); - v8_session_ptr.dispatch_protocol_message(msg); + fn dispatch_message( + v8_session_ptr: *mut v8::inspector::V8InspectorSession, + msg: String, + ) { + let msg = v8::inspector::StringView::from(msg.as_bytes()); + unsafe { + (*v8_session_ptr).dispatch_protocol_message(msg); + }; } fn send_message( @@ -547,11 +563,9 @@ impl InspectorSession { pub fn break_on_next_statement(&mut self) { let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); let detail = v8::inspector::StringView::empty(); - self - .v8_session - .borrow_mut() - .as_mut() - .schedule_pause_on_next_statement(reason, detail); + // TODO(bartlomieju): use raw `*mut V8InspectorSession` pointer, as this + // reference may become aliased. + (*self.v8_session).schedule_pause_on_next_statement(reason, detail); } } @@ -582,19 +596,19 @@ impl v8::inspector::ChannelImpl for InspectorSession { fn flush_protocol_notifications(&mut self) {} } -/// This is a "pump" future takes care of receiving messages and dispatching -/// them to the inspector. It resolves when receiver closes. -impl Future for InspectorSession { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) { +impl Stream for InspectorSession { + type Item = (*mut v8::inspector::V8InspectorSession, String); + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let inner = self.get_mut(); + if let Poll::Ready(maybe_msg) = inner.proxy.rx.poll_next_unpin(cx) { if let Some(msg) = maybe_msg { - let msg = v8::inspector::StringView::from(msg.as_bytes()); - let mut v8_session = self.v8_session.borrow_mut(); - let v8_session_ptr = v8_session.as_mut(); - v8_session_ptr.dispatch_protocol_message(msg); + return Poll::Ready(Some((&mut *inner.v8_session, msg))); } else { - return Poll::Ready(()); + return Poll::Ready(None); } }