mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
refactor(jupyter): move ZeroMQ server to a separate thread (#24373)
Moves the ZeroMQ messaging server to a separate thread. This will allow to run blocking JS code and maintain communication with the notebook frontend. Towards https://github.com/denoland/deno/pull/23592 Towards https://github.com/denoland/deno/pull/24250 Closes https://github.com/denoland/deno/issues/23617
This commit is contained in:
parent
c13b6d1413
commit
7d919f6fd9
4 changed files with 483 additions and 150 deletions
|
@ -45,11 +45,11 @@ pub async fn op_jupyter_broadcast(
|
|||
|
||||
(
|
||||
s.borrow::<Arc<Mutex<KernelIoPubConnection>>>().clone(),
|
||||
s.borrow::<Rc<RefCell<Option<JupyterMessage>>>>().clone(),
|
||||
s.borrow::<Arc<Mutex<Option<JupyterMessage>>>>().clone(),
|
||||
)
|
||||
};
|
||||
|
||||
let maybe_last_request = last_execution_request.borrow().clone();
|
||||
let maybe_last_request = last_execution_request.lock().await.clone();
|
||||
if let Some(last_request) = maybe_last_request {
|
||||
let content = JupyterMessageContent::from_type_and_content(
|
||||
&message_type,
|
||||
|
|
|
@ -2,18 +2,23 @@
|
|||
|
||||
use crate::args::Flags;
|
||||
use crate::args::JupyterFlags;
|
||||
use crate::cdp;
|
||||
use crate::lsp::ReplCompletionItem;
|
||||
use crate::ops;
|
||||
use crate::tools::repl;
|
||||
use crate::tools::test::create_single_test_event_channel;
|
||||
use crate::tools::test::reporters::PrettyTestReporter;
|
||||
use crate::tools::test::TestEventWorkerSender;
|
||||
use crate::CliFactory;
|
||||
use deno_core::anyhow::bail;
|
||||
use deno_core::anyhow::Context;
|
||||
use deno_core::error::generic_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::FutureExt;
|
||||
use deno_core::located_script_name;
|
||||
use deno_core::resolve_url_or_path;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::url::Url;
|
||||
use deno_runtime::deno_io::Stdio;
|
||||
use deno_runtime::deno_io::StdioPipe;
|
||||
|
@ -21,11 +26,11 @@ use deno_runtime::deno_permissions::Permissions;
|
|||
use deno_runtime::deno_permissions::PermissionsContainer;
|
||||
use deno_runtime::WorkerExecutionMode;
|
||||
use deno_terminal::colors;
|
||||
|
||||
use jupyter_runtime::jupyter::ConnectionInfo;
|
||||
use jupyter_runtime::messaging::StreamContent;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
mod install;
|
||||
pub mod server;
|
||||
|
@ -142,7 +147,377 @@ pub async fn kernel(
|
|||
)
|
||||
}));
|
||||
|
||||
server::JupyterServer::start(spec, stdio_rx, repl_session).await?;
|
||||
let (tx1, rx1) = mpsc::unbounded_channel();
|
||||
let (tx2, rx2) = mpsc::unbounded_channel();
|
||||
let (startup_data_tx, startup_data_rx) =
|
||||
oneshot::channel::<server::StartupData>();
|
||||
|
||||
let mut repl_session_proxy = JupyterReplSession {
|
||||
repl_session,
|
||||
rx: rx1,
|
||||
tx: tx2,
|
||||
};
|
||||
let repl_session_proxy_channels = JupyterReplProxy { tx: tx1, rx: rx2 };
|
||||
|
||||
let join_handle = std::thread::spawn(move || {
|
||||
let fut = server::JupyterServer::start(
|
||||
spec,
|
||||
stdio_rx,
|
||||
repl_session_proxy_channels,
|
||||
startup_data_tx,
|
||||
)
|
||||
.boxed_local();
|
||||
deno_runtime::tokio_util::create_and_run_current_thread(fut)
|
||||
});
|
||||
|
||||
let Ok(startup_data) = startup_data_rx.await else {
|
||||
bail!("Failed to acquire startup data");
|
||||
};
|
||||
{
|
||||
let op_state_rc =
|
||||
repl_session_proxy.repl_session.worker.js_runtime.op_state();
|
||||
let mut op_state = op_state_rc.borrow_mut();
|
||||
op_state.put(startup_data.iopub_connection.clone());
|
||||
op_state.put(startup_data.last_execution_request.clone());
|
||||
}
|
||||
|
||||
repl_session_proxy.start().await;
|
||||
let server_result = join_handle.join();
|
||||
match server_result {
|
||||
Ok(result) => {
|
||||
result?;
|
||||
}
|
||||
Err(e) => {
|
||||
bail!("Jupyter kernel error: {:?}", e);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub enum JupyterReplRequest {
|
||||
LspCompletions {
|
||||
line_text: String,
|
||||
position: usize,
|
||||
},
|
||||
JsGetProperties {
|
||||
object_id: String,
|
||||
},
|
||||
JsEvaluate {
|
||||
expr: String,
|
||||
},
|
||||
JsGlobalLexicalScopeNames,
|
||||
JsEvaluateLineWithObjectWrapping {
|
||||
line: String,
|
||||
},
|
||||
JsCallFunctionOnArgs {
|
||||
function_declaration: String,
|
||||
args: Vec<cdp::RemoteObject>,
|
||||
},
|
||||
JsCallFunctionOn {
|
||||
arg0: cdp::CallArgument,
|
||||
arg1: cdp::CallArgument,
|
||||
},
|
||||
}
|
||||
|
||||
pub enum JupyterReplResponse {
|
||||
LspCompletions(Vec<ReplCompletionItem>),
|
||||
JsGetProperties(Option<cdp::GetPropertiesResponse>),
|
||||
JsEvaluate(Option<cdp::EvaluateResponse>),
|
||||
JsGlobalLexicalScopeNames(cdp::GlobalLexicalScopeNamesResponse),
|
||||
JsEvaluateLineWithObjectWrapping(Result<repl::TsEvaluateResponse, AnyError>),
|
||||
JsCallFunctionOnArgs(Result<cdp::CallFunctionOnResponse, AnyError>),
|
||||
JsCallFunctionOn(Option<cdp::CallFunctionOnResponse>),
|
||||
}
|
||||
|
||||
pub struct JupyterReplProxy {
|
||||
tx: mpsc::UnboundedSender<JupyterReplRequest>,
|
||||
rx: mpsc::UnboundedReceiver<JupyterReplResponse>,
|
||||
}
|
||||
|
||||
impl JupyterReplProxy {
|
||||
pub async fn lsp_completions(
|
||||
&mut self,
|
||||
line_text: String,
|
||||
position: usize,
|
||||
) -> Vec<ReplCompletionItem> {
|
||||
let _ = self.tx.send(JupyterReplRequest::LspCompletions {
|
||||
line_text,
|
||||
position,
|
||||
});
|
||||
let Some(JupyterReplResponse::LspCompletions(resp)) = self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn get_properties(
|
||||
&mut self,
|
||||
object_id: String,
|
||||
) -> Option<cdp::GetPropertiesResponse> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send(JupyterReplRequest::JsGetProperties { object_id });
|
||||
let Some(JupyterReplResponse::JsGetProperties(resp)) = self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn evaluate(
|
||||
&mut self,
|
||||
expr: String,
|
||||
) -> Option<cdp::EvaluateResponse> {
|
||||
let _ = self.tx.send(JupyterReplRequest::JsEvaluate { expr });
|
||||
let Some(JupyterReplResponse::JsEvaluate(resp)) = self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn global_lexical_scope_names(
|
||||
&mut self,
|
||||
) -> cdp::GlobalLexicalScopeNamesResponse {
|
||||
let _ = self.tx.send(JupyterReplRequest::JsGlobalLexicalScopeNames);
|
||||
let Some(JupyterReplResponse::JsGlobalLexicalScopeNames(resp)) =
|
||||
self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn evaluate_line_with_object_wrapping(
|
||||
&mut self,
|
||||
line: String,
|
||||
) -> Result<repl::TsEvaluateResponse, AnyError> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send(JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line });
|
||||
let Some(JupyterReplResponse::JsEvaluateLineWithObjectWrapping(resp)) =
|
||||
self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn call_function_on_args(
|
||||
&mut self,
|
||||
function_declaration: String,
|
||||
args: Vec<cdp::RemoteObject>,
|
||||
) -> Result<cdp::CallFunctionOnResponse, AnyError> {
|
||||
let _ = self.tx.send(JupyterReplRequest::JsCallFunctionOnArgs {
|
||||
function_declaration,
|
||||
args,
|
||||
});
|
||||
let Some(JupyterReplResponse::JsCallFunctionOnArgs(resp)) =
|
||||
self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): rename to "broadcast_result"?
|
||||
pub async fn call_function_on(
|
||||
&mut self,
|
||||
arg0: cdp::CallArgument,
|
||||
arg1: cdp::CallArgument,
|
||||
) -> Option<cdp::CallFunctionOnResponse> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send(JupyterReplRequest::JsCallFunctionOn { arg0, arg1 });
|
||||
let Some(JupyterReplResponse::JsCallFunctionOn(resp)) =
|
||||
self.rx.recv().await
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
resp
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JupyterReplSession {
|
||||
repl_session: repl::ReplSession,
|
||||
rx: mpsc::UnboundedReceiver<JupyterReplRequest>,
|
||||
tx: mpsc::UnboundedSender<JupyterReplResponse>,
|
||||
}
|
||||
|
||||
impl JupyterReplSession {
|
||||
pub async fn start(&mut self) {
|
||||
loop {
|
||||
let Some(msg) = self.rx.recv().await else {
|
||||
break;
|
||||
};
|
||||
let resp = match msg {
|
||||
JupyterReplRequest::LspCompletions {
|
||||
line_text,
|
||||
position,
|
||||
} => JupyterReplResponse::LspCompletions(
|
||||
self.lsp_completions(&line_text, position).await,
|
||||
),
|
||||
JupyterReplRequest::JsGetProperties { object_id } => {
|
||||
JupyterReplResponse::JsGetProperties(
|
||||
self.get_properties(object_id).await,
|
||||
)
|
||||
}
|
||||
JupyterReplRequest::JsEvaluate { expr } => {
|
||||
JupyterReplResponse::JsEvaluate(self.evaluate(expr).await)
|
||||
}
|
||||
JupyterReplRequest::JsGlobalLexicalScopeNames => {
|
||||
JupyterReplResponse::JsGlobalLexicalScopeNames(
|
||||
self.global_lexical_scope_names().await,
|
||||
)
|
||||
}
|
||||
JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => {
|
||||
JupyterReplResponse::JsEvaluateLineWithObjectWrapping(
|
||||
self.evaluate_line_with_object_wrapping(&line).await,
|
||||
)
|
||||
}
|
||||
JupyterReplRequest::JsCallFunctionOnArgs {
|
||||
function_declaration,
|
||||
args,
|
||||
} => JupyterReplResponse::JsCallFunctionOnArgs(
|
||||
self
|
||||
.call_function_on_args(function_declaration, &args)
|
||||
.await,
|
||||
),
|
||||
JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => {
|
||||
JupyterReplResponse::JsCallFunctionOn(
|
||||
self.call_function_on(arg0, arg1).await,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(()) = self.tx.send(resp) else {
|
||||
break;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn lsp_completions(
|
||||
&mut self,
|
||||
line_text: &str,
|
||||
position: usize,
|
||||
) -> Vec<ReplCompletionItem> {
|
||||
self
|
||||
.repl_session
|
||||
.language_server
|
||||
.completions(line_text, position)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_properties(
|
||||
&mut self,
|
||||
object_id: String,
|
||||
) -> Option<cdp::GetPropertiesResponse> {
|
||||
let get_properties_response = self
|
||||
.repl_session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.getProperties",
|
||||
Some(cdp::GetPropertiesArgs {
|
||||
object_id,
|
||||
own_properties: None,
|
||||
accessor_properties_only: None,
|
||||
generate_preview: None,
|
||||
non_indexed_properties_only: Some(true),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
serde_json::from_value(get_properties_response).ok()
|
||||
}
|
||||
|
||||
pub async fn evaluate(
|
||||
&mut self,
|
||||
expr: String,
|
||||
) -> Option<cdp::EvaluateResponse> {
|
||||
let evaluate_response: serde_json::Value = self
|
||||
.repl_session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.evaluate",
|
||||
Some(cdp::EvaluateArgs {
|
||||
expression: expr,
|
||||
object_group: None,
|
||||
include_command_line_api: None,
|
||||
silent: None,
|
||||
context_id: Some(self.repl_session.context_id),
|
||||
return_by_value: None,
|
||||
generate_preview: None,
|
||||
user_gesture: None,
|
||||
await_promise: None,
|
||||
throw_on_side_effect: Some(true),
|
||||
timeout: Some(200),
|
||||
disable_breaks: None,
|
||||
repl_mode: None,
|
||||
allow_unsafe_eval_blocked_by_csp: None,
|
||||
unique_context_id: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
serde_json::from_value(evaluate_response).ok()
|
||||
}
|
||||
|
||||
pub async fn global_lexical_scope_names(
|
||||
&mut self,
|
||||
) -> cdp::GlobalLexicalScopeNamesResponse {
|
||||
let evaluate_response = self
|
||||
.repl_session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.globalLexicalScopeNames",
|
||||
Some(cdp::GlobalLexicalScopeNamesArgs {
|
||||
execution_context_id: Some(self.repl_session.context_id),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
serde_json::from_value(evaluate_response).unwrap()
|
||||
}
|
||||
|
||||
pub async fn evaluate_line_with_object_wrapping(
|
||||
&mut self,
|
||||
line: &str,
|
||||
) -> Result<repl::TsEvaluateResponse, AnyError> {
|
||||
self
|
||||
.repl_session
|
||||
.evaluate_line_with_object_wrapping(line)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn call_function_on_args(
|
||||
&mut self,
|
||||
function_declaration: String,
|
||||
args: &[cdp::RemoteObject],
|
||||
) -> Result<cdp::CallFunctionOnResponse, AnyError> {
|
||||
self
|
||||
.repl_session
|
||||
.call_function_on_args(function_declaration, args)
|
||||
.await
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): rename to "broadcast_result"?
|
||||
pub async fn call_function_on(
|
||||
&mut self,
|
||||
arg0: cdp::CallArgument,
|
||||
arg1: cdp::CallArgument,
|
||||
) -> Option<cdp::CallFunctionOnResponse> {
|
||||
let response = self.repl_session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.callFunctionOn",
|
||||
Some(json!({
|
||||
"functionDeclaration": r#"async function (execution_count, result) {
|
||||
await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result);
|
||||
}"#,
|
||||
"arguments": [arg0, arg1],
|
||||
"executionContextId": self.repl_session.context_id,
|
||||
"awaitPromise": true,
|
||||
})),
|
||||
)
|
||||
.await.ok()?;
|
||||
serde_json::from_value(response).ok()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,20 +3,20 @@
|
|||
// This file is forked/ported from <https://github.com/evcxr/evcxr>
|
||||
// Copyright 2020 The Evcxr Authors. MIT license.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::cdp;
|
||||
use crate::tools::repl;
|
||||
use deno_core::anyhow::bail;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures;
|
||||
use deno_core::serde_json;
|
||||
use deno_core::serde_json::json;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use jupyter_runtime::messaging;
|
||||
|
@ -25,27 +25,32 @@ use jupyter_runtime::ConnectionInfo;
|
|||
use jupyter_runtime::JupyterMessage;
|
||||
use jupyter_runtime::JupyterMessageContent;
|
||||
use jupyter_runtime::KernelControlConnection;
|
||||
use jupyter_runtime::KernelHeartbeatConnection;
|
||||
use jupyter_runtime::KernelIoPubConnection;
|
||||
use jupyter_runtime::KernelShellConnection;
|
||||
use jupyter_runtime::ReplyError;
|
||||
use jupyter_runtime::ReplyStatus;
|
||||
use jupyter_runtime::StreamContent;
|
||||
|
||||
use super::JupyterReplProxy;
|
||||
|
||||
pub struct JupyterServer {
|
||||
execution_count: usize,
|
||||
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
|
||||
// This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
|
||||
// points.
|
||||
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
|
||||
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
|
||||
repl_session: repl::ReplSession,
|
||||
repl_session_proxy: JupyterReplProxy,
|
||||
}
|
||||
|
||||
pub struct StartupData {
|
||||
pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
|
||||
pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
|
||||
}
|
||||
|
||||
impl JupyterServer {
|
||||
pub async fn start(
|
||||
connection_info: ConnectionInfo,
|
||||
mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>,
|
||||
mut repl_session: repl::ReplSession,
|
||||
repl_session_proxy: JupyterReplProxy,
|
||||
setup_tx: oneshot::Sender<StartupData>,
|
||||
) -> Result<(), AnyError> {
|
||||
let mut heartbeat =
|
||||
connection_info.create_kernel_heartbeat_connection().await?;
|
||||
|
@ -59,15 +64,14 @@ impl JupyterServer {
|
|||
connection_info.create_kernel_iopub_connection().await?;
|
||||
|
||||
let iopub_connection = Arc::new(Mutex::new(iopub_connection));
|
||||
let last_execution_request = Rc::new(RefCell::new(None));
|
||||
let last_execution_request = Arc::new(Mutex::new(None));
|
||||
|
||||
// Store `iopub_connection` in the op state so it's accessible to the runtime API.
|
||||
{
|
||||
let op_state_rc = repl_session.worker.js_runtime.op_state();
|
||||
let mut op_state = op_state_rc.borrow_mut();
|
||||
op_state.put(iopub_connection.clone());
|
||||
op_state.put(last_execution_request.clone());
|
||||
}
|
||||
let Ok(()) = setup_tx.send(StartupData {
|
||||
iopub_connection: iopub_connection.clone(),
|
||||
last_execution_request: last_execution_request.clone(),
|
||||
}) else {
|
||||
bail!("Failed to send startup data");
|
||||
};
|
||||
|
||||
let cancel_handle = CancelHandle::new_rc();
|
||||
|
||||
|
@ -75,20 +79,22 @@ impl JupyterServer {
|
|||
execution_count: 0,
|
||||
iopub_connection: iopub_connection.clone(),
|
||||
last_execution_request: last_execution_request.clone(),
|
||||
repl_session,
|
||||
repl_session_proxy,
|
||||
};
|
||||
|
||||
let handle1 = deno_core::unsync::spawn(async move {
|
||||
if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
|
||||
let hearbeat_fut = deno_core::unsync::spawn(async move {
|
||||
loop {
|
||||
if let Err(err) = heartbeat.single_heartbeat().await {
|
||||
log::error!(
|
||||
"Heartbeat error: {}\nBacktrace:\n{}",
|
||||
err,
|
||||
err.backtrace()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let handle2 = deno_core::unsync::spawn({
|
||||
let control_fut = deno_core::unsync::spawn({
|
||||
let cancel_handle = cancel_handle.clone();
|
||||
async move {
|
||||
if let Err(err) =
|
||||
|
@ -103,13 +109,13 @@ impl JupyterServer {
|
|||
}
|
||||
});
|
||||
|
||||
let handle3 = deno_core::unsync::spawn(async move {
|
||||
let shell_fut = deno_core::unsync::spawn(async move {
|
||||
if let Err(err) = server.handle_shell(shell_connection).await {
|
||||
log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace());
|
||||
}
|
||||
});
|
||||
|
||||
let handle4 = deno_core::unsync::spawn(async move {
|
||||
let stdio_fut = deno_core::unsync::spawn(async move {
|
||||
while let Some(stdio_msg) = stdio_rx.recv().await {
|
||||
Self::handle_stdio_msg(
|
||||
iopub_connection.clone(),
|
||||
|
@ -120,8 +126,15 @@ impl JupyterServer {
|
|||
}
|
||||
});
|
||||
|
||||
let join_fut =
|
||||
futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]);
|
||||
let repl_session_fut = deno_core::unsync::spawn(async move {});
|
||||
|
||||
let join_fut = futures::future::try_join_all(vec![
|
||||
hearbeat_fut,
|
||||
control_fut,
|
||||
shell_fut,
|
||||
stdio_fut,
|
||||
repl_session_fut,
|
||||
]);
|
||||
|
||||
if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
|
||||
result?;
|
||||
|
@ -132,28 +145,21 @@ impl JupyterServer {
|
|||
|
||||
async fn handle_stdio_msg(
|
||||
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
|
||||
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
|
||||
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
|
||||
stdio_msg: StreamContent,
|
||||
) {
|
||||
let maybe_exec_result = last_execution_request.borrow().clone();
|
||||
if let Some(exec_request) = maybe_exec_result {
|
||||
let result = (iopub_connection.lock().await)
|
||||
.send(stdio_msg.as_child_of(&exec_request))
|
||||
.await;
|
||||
let maybe_exec_result = last_execution_request.lock().await.clone();
|
||||
let Some(exec_request) = maybe_exec_result else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut iopub_conn = iopub_connection.lock().await;
|
||||
let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await;
|
||||
|
||||
if let Err(err) = result {
|
||||
log::error!("Output error: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_heartbeat(
|
||||
connection: &mut KernelHeartbeatConnection,
|
||||
) -> Result<(), AnyError> {
|
||||
loop {
|
||||
connection.single_heartbeat().await?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_control(
|
||||
mut connection: KernelControlConnection,
|
||||
|
@ -222,9 +228,8 @@ impl JupyterServer {
|
|||
let cursor_pos = req.cursor_pos;
|
||||
|
||||
let lsp_completions = self
|
||||
.repl_session
|
||||
.language_server
|
||||
.completions(&user_code, cursor_pos)
|
||||
.repl_session_proxy
|
||||
.lsp_completions(user_code.clone(), cursor_pos)
|
||||
.await;
|
||||
|
||||
if !lsp_completions.is_empty() {
|
||||
|
@ -263,8 +268,10 @@ impl JupyterServer {
|
|||
{
|
||||
let sub_expr = &expr[..index];
|
||||
let prop_name = &expr[index + 1..];
|
||||
let candidates =
|
||||
get_expression_property_names(&mut self.repl_session, sub_expr)
|
||||
let candidates = get_expression_property_names(
|
||||
&mut self.repl_session_proxy,
|
||||
sub_expr,
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
.filter(|n| {
|
||||
|
@ -278,12 +285,15 @@ impl JupyterServer {
|
|||
} else {
|
||||
// combine results of declarations and globalThis properties
|
||||
let mut candidates = get_expression_property_names(
|
||||
&mut self.repl_session,
|
||||
&mut self.repl_session_proxy,
|
||||
"globalThis",
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
.chain(get_global_lexical_scope_names(&mut self.repl_session).await)
|
||||
.chain(
|
||||
get_global_lexical_scope_names(&mut self.repl_session_proxy)
|
||||
.await,
|
||||
)
|
||||
.filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
|
@ -419,7 +429,7 @@ impl JupyterServer {
|
|||
if !execute_request.silent && execute_request.store_history {
|
||||
self.execution_count += 1;
|
||||
}
|
||||
*self.last_execution_request.borrow_mut() = Some(parent_message.clone());
|
||||
*self.last_execution_request.lock().await = Some(parent_message.clone());
|
||||
|
||||
self
|
||||
.send_iopub(
|
||||
|
@ -432,8 +442,8 @@ impl JupyterServer {
|
|||
.await?;
|
||||
|
||||
let result = self
|
||||
.repl_session
|
||||
.evaluate_line_with_object_wrapping(&execute_request.code)
|
||||
.repl_session_proxy
|
||||
.evaluate_line_with_object_wrapping(execute_request.code)
|
||||
.await;
|
||||
|
||||
let evaluate_response = match result {
|
||||
|
@ -471,7 +481,11 @@ impl JupyterServer {
|
|||
} = evaluate_response.value;
|
||||
|
||||
if exception_details.is_none() {
|
||||
publish_result(&mut self.repl_session, &result, self.execution_count)
|
||||
publish_result(
|
||||
&mut self.repl_session_proxy,
|
||||
&result,
|
||||
self.execution_count,
|
||||
)
|
||||
.await?;
|
||||
|
||||
connection
|
||||
|
@ -497,7 +511,7 @@ impl JupyterServer {
|
|||
exception_details.exception
|
||||
{
|
||||
let result = self
|
||||
.repl_session
|
||||
.repl_session_proxy
|
||||
.call_function_on_args(
|
||||
r#"
|
||||
function(object) {
|
||||
|
@ -513,7 +527,7 @@ impl JupyterServer {
|
|||
}
|
||||
"#
|
||||
.into(),
|
||||
&[exception],
|
||||
vec![exception],
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -629,7 +643,7 @@ fn kernel_info() -> messaging::KernelInfoReply {
|
|||
}
|
||||
|
||||
async fn publish_result(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
evaluate_result: &cdp::RemoteObject,
|
||||
execution_count: usize,
|
||||
) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
|
||||
|
@ -641,21 +655,10 @@ async fn publish_result(
|
|||
|
||||
let arg1 = cdp::CallArgument::from(evaluate_result);
|
||||
|
||||
let response = session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.callFunctionOn",
|
||||
Some(json!({
|
||||
"functionDeclaration": r#"async function (execution_count, result) {
|
||||
await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result);
|
||||
}"#,
|
||||
"arguments": [arg0, arg1],
|
||||
"executionContextId": session.context_id,
|
||||
"awaitPromise": true,
|
||||
})),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let response: cdp::CallFunctionOnResponse = serde_json::from_value(response)?;
|
||||
let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if let Some(exception_details) = &response.exception_details {
|
||||
// If the object doesn't have a Jupyter.display method or it throws an
|
||||
|
@ -693,34 +696,25 @@ fn is_word_boundary(c: char) -> bool {
|
|||
|
||||
// TODO(bartlomieju): dedup with repl::editor
|
||||
async fn get_global_lexical_scope_names(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
) -> Vec<String> {
|
||||
let evaluate_response = session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.globalLexicalScopeNames",
|
||||
Some(cdp::GlobalLexicalScopeNamesArgs {
|
||||
execution_context_id: Some(session.context_id),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let evaluate_response: cdp::GlobalLexicalScopeNamesResponse =
|
||||
serde_json::from_value(evaluate_response).unwrap();
|
||||
evaluate_response.names
|
||||
repl_session_proxy.global_lexical_scope_names().await.names
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): dedup with repl::editor
|
||||
async fn get_expression_property_names(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
expr: &str,
|
||||
) -> Vec<String> {
|
||||
// try to get the properties from the expression
|
||||
if let Some(properties) = get_object_expr_properties(session, expr).await {
|
||||
if let Some(properties) =
|
||||
get_object_expr_properties(repl_session_proxy, expr).await
|
||||
{
|
||||
return properties;
|
||||
}
|
||||
|
||||
// otherwise fall back to the prototype
|
||||
let expr_type = get_expression_type(session, expr).await;
|
||||
let expr_type = get_expression_type(repl_session_proxy, expr).await;
|
||||
let object_expr = match expr_type.as_deref() {
|
||||
// possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject
|
||||
Some("object") => "Object.prototype",
|
||||
|
@ -732,44 +726,32 @@ async fn get_expression_property_names(
|
|||
_ => return Vec::new(), // undefined, symbol, and unhandled
|
||||
};
|
||||
|
||||
get_object_expr_properties(session, object_expr)
|
||||
get_object_expr_properties(repl_session_proxy, object_expr)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): dedup with repl::editor
|
||||
async fn get_expression_type(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
expr: &str,
|
||||
) -> Option<String> {
|
||||
evaluate_expression(session, expr)
|
||||
evaluate_expression(repl_session_proxy, expr)
|
||||
.await
|
||||
.map(|res| res.result.kind)
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): dedup with repl::editor
|
||||
async fn get_object_expr_properties(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
object_expr: &str,
|
||||
) -> Option<Vec<String>> {
|
||||
let evaluate_result = evaluate_expression(session, object_expr).await?;
|
||||
let evaluate_result =
|
||||
evaluate_expression(repl_session_proxy, object_expr).await?;
|
||||
let object_id = evaluate_result.result.object_id?;
|
||||
|
||||
let get_properties_response = session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.getProperties",
|
||||
Some(cdp::GetPropertiesArgs {
|
||||
object_id,
|
||||
own_properties: None,
|
||||
accessor_properties_only: None,
|
||||
generate_preview: None,
|
||||
non_indexed_properties_only: Some(true),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
let get_properties_response: cdp::GetPropertiesResponse =
|
||||
serde_json::from_value(get_properties_response).ok()?;
|
||||
let get_properties_response =
|
||||
repl_session_proxy.get_properties(object_id.clone()).await?;
|
||||
Some(
|
||||
get_properties_response
|
||||
.result
|
||||
|
@ -781,35 +763,10 @@ async fn get_object_expr_properties(
|
|||
|
||||
// TODO(bartlomieju): dedup with repl::editor
|
||||
async fn evaluate_expression(
|
||||
session: &mut repl::ReplSession,
|
||||
repl_session_proxy: &mut JupyterReplProxy,
|
||||
expr: &str,
|
||||
) -> Option<cdp::EvaluateResponse> {
|
||||
let evaluate_response = session
|
||||
.post_message_with_event_loop(
|
||||
"Runtime.evaluate",
|
||||
Some(cdp::EvaluateArgs {
|
||||
expression: expr.to_string(),
|
||||
object_group: None,
|
||||
include_command_line_api: None,
|
||||
silent: None,
|
||||
context_id: Some(session.context_id),
|
||||
return_by_value: None,
|
||||
generate_preview: None,
|
||||
user_gesture: None,
|
||||
await_promise: None,
|
||||
throw_on_side_effect: Some(true),
|
||||
timeout: Some(200),
|
||||
disable_breaks: None,
|
||||
repl_mode: None,
|
||||
allow_unsafe_eval_blocked_by_csp: None,
|
||||
unique_context_id: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
let evaluate_response: cdp::EvaluateResponse =
|
||||
serde_json::from_value(evaluate_response).ok()?;
|
||||
|
||||
let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?;
|
||||
if evaluate_response.exception_details.is_some() {
|
||||
None
|
||||
} else {
|
||||
|
|
|
@ -30,6 +30,7 @@ use editor::EditorHelper;
|
|||
use editor::ReplEditor;
|
||||
pub use session::EvaluationOutput;
|
||||
pub use session::ReplSession;
|
||||
pub use session::TsEvaluateResponse;
|
||||
pub use session::REPL_INTERNALS_NAME;
|
||||
|
||||
use super::test::create_single_test_event_channel;
|
||||
|
|
Loading…
Reference in a new issue