// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // This file is forked/ported from // Copyright 2020 The Evcxr Authors. MIT license. // NOTE(bartlomieju): unfortunately it appears that clippy is broken // and can't allow a single line ignore for `await_holding_lock`. #![allow(clippy::await_holding_lock)] use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; use crate::cdp; use crate::tools::repl; use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::futures; use deno_core::parking_lot::Mutex; use deno_core::serde_json; use deno_core::CancelFuture; use deno_core::CancelHandle; use jupyter_runtime::ExecutionCount; use tokio::sync::mpsc; use tokio::sync::oneshot; use jupyter_runtime::messaging; use jupyter_runtime::ConnectionInfo; use jupyter_runtime::JupyterMessage; use jupyter_runtime::JupyterMessageContent; use jupyter_runtime::KernelControlConnection; use jupyter_runtime::KernelIoPubConnection; use jupyter_runtime::KernelShellConnection; use jupyter_runtime::ReplyError; use jupyter_runtime::ReplyStatus; use jupyter_runtime::StreamContent; use uuid::Uuid; use super::JupyterReplProxy; pub struct JupyterServer { execution_count: ExecutionCount, last_execution_request: Arc>>, iopub_connection: Arc>, repl_session_proxy: JupyterReplProxy, } pub struct StdinConnectionProxy { pub tx: mpsc::UnboundedSender, pub rx: mpsc::UnboundedReceiver, } pub struct StartupData { pub iopub_connection: Arc>, pub stdin_connection_proxy: Arc>, pub last_execution_request: Arc>>, } impl JupyterServer { pub async fn start( connection_info: ConnectionInfo, mut stdio_rx: mpsc::UnboundedReceiver, repl_session_proxy: JupyterReplProxy, setup_tx: oneshot::Sender, ) -> Result<(), AnyError> { let session_id = Uuid::new_v4().to_string(); let mut heartbeat = connection_info.create_kernel_heartbeat_connection().await?; let shell_connection = connection_info .create_kernel_shell_connection(&session_id) .await?; let control_connection = connection_info .create_kernel_control_connection(&session_id) .await?; let mut stdin_connection = connection_info .create_kernel_stdin_connection(&session_id) .await?; let iopub_connection = connection_info .create_kernel_iopub_connection(&session_id) .await?; let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let last_execution_request = Arc::new(Mutex::new(None)); let (stdin_tx1, mut stdin_rx1) = mpsc::unbounded_channel::(); let (stdin_tx2, stdin_rx2) = mpsc::unbounded_channel::(); let stdin_connection_proxy = Arc::new(Mutex::new(StdinConnectionProxy { tx: stdin_tx1, rx: stdin_rx2, })); let Ok(()) = setup_tx.send(StartupData { iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), stdin_connection_proxy, }) else { bail!("Failed to send startup data"); }; let cancel_handle = CancelHandle::new_rc(); let mut server = Self { execution_count: ExecutionCount::new(0), iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), repl_session_proxy, }; let stdin_fut = deno_core::unsync::spawn(async move { loop { let Some(msg) = stdin_rx1.recv().await else { return; }; let Ok(()) = stdin_connection.send(msg).await else { return; }; let Ok(msg) = stdin_connection.read().await else { return; }; let Ok(()) = stdin_tx2.send(msg) else { return; }; } }); let hearbeat_fut = deno_core::unsync::spawn(async move { loop { if let Err(err) = heartbeat.single_heartbeat().await { log::error!( "Heartbeat error: {}\nBacktrace:\n{}", err, err.backtrace() ); } } }); let control_fut = deno_core::unsync::spawn({ let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = Self::handle_control(control_connection, cancel_handle).await { log::error!( "Control error: {}\nBacktrace:\n{}", err, err.backtrace() ); } } }); let shell_fut = deno_core::unsync::spawn(async move { if let Err(err) = server.handle_shell(shell_connection).await { log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); let stdio_fut = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( iopub_connection.clone(), last_execution_request.clone(), stdio_msg, ) .await; } }); let repl_session_fut = deno_core::unsync::spawn(async move {}); let join_fut = futures::future::try_join_all(vec![ hearbeat_fut, control_fut, shell_fut, stdio_fut, repl_session_fut, stdin_fut, ]); if let Ok(result) = join_fut.or_cancel(cancel_handle).await { result?; } Ok(()) } async fn handle_stdio_msg( iopub_connection: Arc>, last_execution_request: Arc>>, stdio_msg: StreamContent, ) { let maybe_exec_result = last_execution_request.lock().clone(); let Some(exec_request) = maybe_exec_result else { return; }; let result = iopub_connection .lock() .send(stdio_msg.as_child_of(&exec_request)) .await; if let Err(err) = result { log::error!("Output error: {}", err); } } async fn handle_control( mut connection: KernelControlConnection, cancel_handle: Rc, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; match msg.content { JupyterMessageContent::KernelInfoRequest(_) => { // normally kernel info is sent from the shell channel // however, some frontends will send it on the control channel // and it's no harm to send a kernel info reply on control connection.send(kernel_info().as_child_of(&msg)).await?; } JupyterMessageContent::ShutdownRequest(_) => { cancel_handle.cancel(); } JupyterMessageContent::InterruptRequest(_) => { log::error!("Interrupt request currently not supported"); } JupyterMessageContent::DebugRequest(_) => { log::error!("Debug request currently not supported"); // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request // and https://microsoft.github.io/debug-adapter-protocol/ } _ => { log::error!( "Unrecognized control message type: {}", msg.message_type() ); } } } } async fn handle_shell( &mut self, mut connection: KernelShellConnection, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; self.handle_shell_message(msg, &mut connection).await?; } } async fn handle_shell_message( &mut self, msg: JupyterMessage, connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { let parent = &msg.clone(); self .send_iopub(messaging::Status::busy().as_child_of(parent)) .await?; match msg.content { JupyterMessageContent::ExecuteRequest(execute_request) => { self .handle_execution_request(execute_request, parent, connection) .await?; } JupyterMessageContent::CompleteRequest(req) => { let user_code = req.code; let cursor_pos = req.cursor_pos; let lsp_completions = self .repl_session_proxy .lsp_completions(user_code.clone(), cursor_pos) .await; if !lsp_completions.is_empty() { let matches: Vec = lsp_completions .iter() .map(|item| item.new_text.clone()) .collect(); let cursor_start = lsp_completions .first() .map(|item| item.range.start) .unwrap_or(cursor_pos); let cursor_end = lsp_completions .last() .map(|item| item.range.end) .unwrap_or(cursor_pos); connection .send( messaging::CompleteReply { matches, cursor_start, cursor_end, metadata: Default::default(), status: ReplyStatus::Ok, error: None, } .as_child_of(parent), ) .await?; } else { let expr = get_expr_from_line_at_pos(&user_code, cursor_pos); // check if the expression is in the form `obj.prop` let (completions, cursor_start) = if let Some(index) = expr.rfind('.') { let sub_expr = &expr[..index]; let prop_name = &expr[index + 1..]; let candidates = get_expression_property_names( &mut self.repl_session_proxy, sub_expr, ) .await .into_iter() .filter(|n| { !n.starts_with("Symbol(") && n.starts_with(prop_name) && n != &*repl::REPL_INTERNALS_NAME }) .collect(); (candidates, cursor_pos - prop_name.len()) } else { // combine results of declarations and globalThis properties let mut candidates = get_expression_property_names( &mut self.repl_session_proxy, "globalThis", ) .await .into_iter() .chain( get_global_lexical_scope_names(&mut self.repl_session_proxy) .await, ) .filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME) .collect::>(); // sort and remove duplicates candidates.sort(); candidates.dedup(); // make sure to sort first (candidates, cursor_pos - expr.len()) }; connection .send( messaging::CompleteReply { matches: completions, cursor_start, cursor_end: cursor_pos, metadata: Default::default(), status: ReplyStatus::Ok, error: None, } .as_child_of(parent), ) .await?; } } JupyterMessageContent::InspectRequest(_req) => { // TODO(bartlomieju?): implement introspection request // The inspect request is used to get information about an object at cursor position. // There are two detail levels: 0 is typically documentation, 1 is typically source code // The response includes a MimeBundle to render the object: // { // "status": "ok", // "found": true, // "data": { // "text/plain": "Plain documentation here", // "text/html": "
Rich documentation here
", // "application/json": { // "key1": "value1", // "key2": "value2" // } // }, // } connection .send( messaging::InspectReply { status: ReplyStatus::Ok, found: false, data: Default::default(), metadata: Default::default(), error: None, } .as_child_of(parent), ) .await?; } JupyterMessageContent::IsCompleteRequest(_) => { connection .send(messaging::IsCompleteReply::complete().as_child_of(parent)) .await?; } JupyterMessageContent::KernelInfoRequest(_) => { connection.send(kernel_info().as_child_of(parent)).await?; } JupyterMessageContent::CommOpen(comm) => { connection .send( messaging::CommClose { comm_id: comm.comm_id, data: Default::default(), } .as_child_of(parent), ) .await?; } JupyterMessageContent::HistoryRequest(_req) => { connection .send( messaging::HistoryReply { history: vec![], error: None, status: ReplyStatus::Ok, } .as_child_of(parent), ) .await?; } JupyterMessageContent::InputReply(_rep) => { // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592 // NOTE: This will belong on the stdin channel, not the shell channel } JupyterMessageContent::CommInfoRequest(_req) => { connection .send( messaging::CommInfoReply { comms: Default::default(), status: ReplyStatus::Ok, error: None, } .as_child_of(parent), ) .await?; } JupyterMessageContent::CommMsg(_) | JupyterMessageContent::CommClose(_) => { // Do nothing with regular comm messages } // Any unknown message type is ignored _ => { log::error!( "Unrecognized shell message type: {}", msg.content.message_type() ); } } self .send_iopub(messaging::Status::idle().as_child_of(parent)) .await?; Ok(()) } async fn handle_execution_request( &mut self, execute_request: messaging::ExecuteRequest, parent_message: &JupyterMessage, connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { if !execute_request.silent && execute_request.store_history { self.execution_count.increment(); } *self.last_execution_request.lock() = Some(parent_message.clone()); self .send_iopub( messaging::ExecuteInput { execution_count: self.execution_count, code: execute_request.code.clone(), } .as_child_of(parent_message), ) .await?; let result = self .repl_session_proxy .evaluate_line_with_object_wrapping(execute_request.code) .await; let evaluate_response = match result { Ok(eval_response) => eval_response, Err(err) => { self .send_iopub( messaging::ErrorOutput { ename: err.to_string(), evalue: err.to_string(), traceback: vec![], } .as_child_of(parent_message), ) .await?; connection .send( messaging::ExecuteReply { execution_count: self.execution_count, status: ReplyStatus::Error, payload: Default::default(), user_expressions: None, error: None, } .as_child_of(parent_message), ) .await?; return Ok(()); } }; let cdp::EvaluateResponse { result, exception_details, } = evaluate_response.value; if exception_details.is_none() { publish_result( &mut self.repl_session_proxy, &result, self.execution_count, ) .await?; connection .send( messaging::ExecuteReply { execution_count: self.execution_count, status: ReplyStatus::Ok, user_expressions: None, payload: Default::default(), error: None, } .as_child_of(parent_message), ) .await?; // Let's sleep here for a few ms, so we give a chance to the task that is // handling stdout and stderr streams to receive and flush the content. // Otherwise, executing multiple cells one-by-one might lead to output // from various cells be grouped together in another cell result. tokio::time::sleep(std::time::Duration::from_millis(5)).await; } else if let Some(exception_details) = exception_details { // Determine the exception value and name let (name, message, stack) = if let Some(exception) = exception_details.exception { let result = self .repl_session_proxy .call_function_on_args( r#" function(object) { if (object instanceof Error) { const name = "name" in object ? String(object.name) : ""; const message = "message" in object ? String(object.message) : ""; const stack = "stack" in object ? String(object.stack) : ""; return JSON.stringify({ name, message, stack }); } else { const message = String(object); return JSON.stringify({ name: "", message, stack: "" }); } } "# .into(), vec![exception], ) .await?; match result.result.value { Some(serde_json::Value::String(str)) => { if let Ok(object) = serde_json::from_str::>(&str) { let get = |k| object.get(k).cloned().unwrap_or_default(); (get("name"), get("message"), get("stack")) } else { log::error!("Unexpected result while parsing JSON {str}"); ("".into(), "".into(), "".into()) } } _ => { log::error!("Unexpected result while parsing exception {result:?}"); ("".into(), "".into(), "".into()) } } } else { log::error!("Unexpectedly missing exception {exception_details:?}"); ("".into(), "".into(), "".into()) }; let stack = if stack.is_empty() { format!( "{}\n at ", serde_json::to_string(&message).unwrap() ) } else { stack }; let traceback = format!("Stack trace:\n{stack}") .split('\n') .map(|s| s.to_owned()) .collect::>(); let ename = if name.is_empty() { "Unknown error".into() } else { name }; let evalue = if message.is_empty() { "(none)".into() } else { message }; self .send_iopub( messaging::ErrorOutput { ename: ename.clone(), evalue: evalue.clone(), traceback: traceback.clone(), } .as_child_of(parent_message), ) .await?; connection .send( messaging::ExecuteReply { execution_count: self.execution_count, status: ReplyStatus::Error, error: Some(Box::new(ReplyError { ename, evalue, traceback, })), user_expressions: None, payload: Default::default(), } .as_child_of(parent_message), ) .await?; } Ok(()) } async fn send_iopub( &mut self, message: JupyterMessage, ) -> Result<(), AnyError> { self.iopub_connection.lock().send(message.clone()).await } } fn kernel_info() -> messaging::KernelInfoReply { messaging::KernelInfoReply { status: ReplyStatus::Ok, protocol_version: "5.3".to_string(), implementation: "Deno kernel".to_string(), implementation_version: crate::version::DENO_VERSION_INFO.deno.to_string(), language_info: messaging::LanguageInfo { name: "typescript".to_string(), version: crate::version::DENO_VERSION_INFO.typescript.to_string(), mimetype: "text/x.typescript".to_string(), file_extension: ".ts".to_string(), pygments_lexer: "typescript".to_string(), codemirror_mode: messaging::CodeMirrorMode::typescript(), nbconvert_exporter: "script".to_string(), }, banner: "Welcome to Deno kernel".to_string(), help_links: vec![messaging::HelpLink { text: "Visit Deno manual".to_string(), url: "https://deno.land/manual".to_string(), }], debugger: false, error: None, } } async fn publish_result( repl_session_proxy: &mut JupyterReplProxy, evaluate_result: &cdp::RemoteObject, execution_count: ExecutionCount, ) -> Result>, AnyError> { let arg0 = cdp::CallArgument { value: Some(execution_count.into()), unserializable_value: None, object_id: None, }; let arg1 = cdp::CallArgument::from(evaluate_result); let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await else { return Ok(None); }; if let Some(exception_details) = &response.exception_details { // If the object doesn't have a Jupyter.display method or it throws an // exception, we just ignore it and let the caller handle it. log::error!("Exception encountered: {}", exception_details.text); return Ok(None); } Ok(None) } // TODO(bartlomieju): dedup with repl::editor fn get_expr_from_line_at_pos(line: &str, cursor_pos: usize) -> &str { let start = line[..cursor_pos].rfind(is_word_boundary).unwrap_or(0); let end = line[cursor_pos..] .rfind(is_word_boundary) .map(|i| cursor_pos + i) .unwrap_or(cursor_pos); let word = &line[start..end]; let word = word.strip_prefix(is_word_boundary).unwrap_or(word); let word = word.strip_suffix(is_word_boundary).unwrap_or(word); word } // TODO(bartlomieju): dedup with repl::editor fn is_word_boundary(c: char) -> bool { if matches!(c, '.' | '_' | '$') { false } else { char::is_ascii_whitespace(&c) || char::is_ascii_punctuation(&c) } } // TODO(bartlomieju): dedup with repl::editor async fn get_global_lexical_scope_names( repl_session_proxy: &mut JupyterReplProxy, ) -> Vec { repl_session_proxy.global_lexical_scope_names().await.names } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_property_names( repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Vec { // try to get the properties from the expression if let Some(properties) = get_object_expr_properties(repl_session_proxy, expr).await { return properties; } // otherwise fall back to the prototype let expr_type = get_expression_type(repl_session_proxy, expr).await; let object_expr = match expr_type.as_deref() { // possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject Some("object") => "Object.prototype", Some("function") => "Function.prototype", Some("string") => "String.prototype", Some("boolean") => "Boolean.prototype", Some("bigint") => "BigInt.prototype", Some("number") => "Number.prototype", _ => return Vec::new(), // undefined, symbol, and unhandled }; get_object_expr_properties(repl_session_proxy, object_expr) .await .unwrap_or_default() } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_type( repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option { evaluate_expression(repl_session_proxy, expr) .await .map(|res| res.result.kind) } // TODO(bartlomieju): dedup with repl::editor async fn get_object_expr_properties( repl_session_proxy: &mut JupyterReplProxy, object_expr: &str, ) -> Option> { let evaluate_result = evaluate_expression(repl_session_proxy, object_expr).await?; let object_id = evaluate_result.result.object_id?; let get_properties_response = repl_session_proxy.get_properties(object_id.clone()).await?; Some( get_properties_response .result .into_iter() .map(|prop| prop.name) .collect(), ) } // TODO(bartlomieju): dedup with repl::editor async fn evaluate_expression( repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option { let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?; if evaluate_response.exception_details.is_some() { None } else { Some(evaluate_response) } }