1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 15:24:46 -05:00
denoland-deno/cli/tools/jupyter/server.rs
Bartek Iwańczuk 2bb013f9ba
refactor: version module exports a single const struct (#25014)
This commit rewrites the internal `version` module that exported
various information about the current executable. Instead of exporting
several consts, we are now exporting a single const structure that 
contains all the necessary information.

This is the first step towards cleaning up how we use this information
and should allow us to use SUI to be able to patch this information
in already produced binary making it easier to cut new releases.

---------

Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
2024-08-15 23:47:16 +02:00

823 lines
24 KiB
Rust

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This file is forked/ported from <https://github.com/evcxr/evcxr>
// Copyright 2020 The Evcxr Authors. MIT license.
// NOTE(bartlomieju): unfortunately it appears that clippy is broken
// and can't allow a single line ignore for `await_holding_lock`.
#![allow(clippy::await_holding_lock)]
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use crate::cdp;
use crate::tools::repl;
use deno_core::anyhow::bail;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::parking_lot::Mutex;
use deno_core::serde_json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use jupyter_runtime::ExecutionCount;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use jupyter_runtime::messaging;
use jupyter_runtime::ConnectionInfo;
use jupyter_runtime::JupyterMessage;
use jupyter_runtime::JupyterMessageContent;
use jupyter_runtime::KernelControlConnection;
use jupyter_runtime::KernelIoPubConnection;
use jupyter_runtime::KernelShellConnection;
use jupyter_runtime::ReplyError;
use jupyter_runtime::ReplyStatus;
use jupyter_runtime::StreamContent;
use uuid::Uuid;
use super::JupyterReplProxy;
pub struct JupyterServer {
execution_count: ExecutionCount,
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
repl_session_proxy: JupyterReplProxy,
}
pub struct StdinConnectionProxy {
pub tx: mpsc::UnboundedSender<JupyterMessage>,
pub rx: mpsc::UnboundedReceiver<JupyterMessage>,
}
pub struct StartupData {
pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
pub stdin_connection_proxy: Arc<Mutex<StdinConnectionProxy>>,
pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
}
impl JupyterServer {
pub async fn start(
connection_info: ConnectionInfo,
mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>,
repl_session_proxy: JupyterReplProxy,
setup_tx: oneshot::Sender<StartupData>,
) -> Result<(), AnyError> {
let session_id = Uuid::new_v4().to_string();
let mut heartbeat =
connection_info.create_kernel_heartbeat_connection().await?;
let shell_connection = connection_info
.create_kernel_shell_connection(&session_id)
.await?;
let control_connection = connection_info
.create_kernel_control_connection(&session_id)
.await?;
let mut stdin_connection = connection_info
.create_kernel_stdin_connection(&session_id)
.await?;
let iopub_connection = connection_info
.create_kernel_iopub_connection(&session_id)
.await?;
let iopub_connection = Arc::new(Mutex::new(iopub_connection));
let last_execution_request = Arc::new(Mutex::new(None));
let (stdin_tx1, mut stdin_rx1) =
mpsc::unbounded_channel::<JupyterMessage>();
let (stdin_tx2, stdin_rx2) = mpsc::unbounded_channel::<JupyterMessage>();
let stdin_connection_proxy = Arc::new(Mutex::new(StdinConnectionProxy {
tx: stdin_tx1,
rx: stdin_rx2,
}));
let Ok(()) = setup_tx.send(StartupData {
iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
stdin_connection_proxy,
}) else {
bail!("Failed to send startup data");
};
let cancel_handle = CancelHandle::new_rc();
let mut server = Self {
execution_count: ExecutionCount::new(0),
iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
repl_session_proxy,
};
let stdin_fut = deno_core::unsync::spawn(async move {
loop {
let Some(msg) = stdin_rx1.recv().await else {
return;
};
let Ok(()) = stdin_connection.send(msg).await else {
return;
};
let Ok(msg) = stdin_connection.read().await else {
return;
};
let Ok(()) = stdin_tx2.send(msg) else {
return;
};
}
});
let hearbeat_fut = deno_core::unsync::spawn(async move {
loop {
if let Err(err) = heartbeat.single_heartbeat().await {
log::error!(
"Heartbeat error: {}\nBacktrace:\n{}",
err,
err.backtrace()
);
}
}
});
let control_fut = deno_core::unsync::spawn({
let cancel_handle = cancel_handle.clone();
async move {
if let Err(err) =
Self::handle_control(control_connection, cancel_handle).await
{
log::error!(
"Control error: {}\nBacktrace:\n{}",
err,
err.backtrace()
);
}
}
});
let shell_fut = deno_core::unsync::spawn(async move {
if let Err(err) = server.handle_shell(shell_connection).await {
log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace());
}
});
let stdio_fut = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
iopub_connection.clone(),
last_execution_request.clone(),
stdio_msg,
)
.await;
}
});
let repl_session_fut = deno_core::unsync::spawn(async move {});
let join_fut = futures::future::try_join_all(vec![
hearbeat_fut,
control_fut,
shell_fut,
stdio_fut,
repl_session_fut,
stdin_fut,
]);
if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
result?;
}
Ok(())
}
async fn handle_stdio_msg(
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
stdio_msg: StreamContent,
) {
let maybe_exec_result = last_execution_request.lock().clone();
let Some(exec_request) = maybe_exec_result else {
return;
};
let result = iopub_connection
.lock()
.send(stdio_msg.as_child_of(&exec_request))
.await;
if let Err(err) = result {
log::error!("Output error: {}", err);
}
}
async fn handle_control(
mut connection: KernelControlConnection,
cancel_handle: Rc<CancelHandle>,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
match msg.content {
JupyterMessageContent::KernelInfoRequest(_) => {
// normally kernel info is sent from the shell channel
// however, some frontends will send it on the control channel
// and it's no harm to send a kernel info reply on control
connection.send(kernel_info().as_child_of(&msg)).await?;
}
JupyterMessageContent::ShutdownRequest(_) => {
cancel_handle.cancel();
}
JupyterMessageContent::InterruptRequest(_) => {
log::error!("Interrupt request currently not supported");
}
JupyterMessageContent::DebugRequest(_) => {
log::error!("Debug request currently not supported");
// See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request
// and https://microsoft.github.io/debug-adapter-protocol/
}
_ => {
log::error!(
"Unrecognized control message type: {}",
msg.message_type()
);
}
}
}
}
async fn handle_shell(
&mut self,
mut connection: KernelShellConnection,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
self.handle_shell_message(msg, &mut connection).await?;
}
}
async fn handle_shell_message(
&mut self,
msg: JupyterMessage,
connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
let parent = &msg.clone();
self
.send_iopub(messaging::Status::busy().as_child_of(parent))
.await?;
match msg.content {
JupyterMessageContent::ExecuteRequest(execute_request) => {
self
.handle_execution_request(execute_request, parent, connection)
.await?;
}
JupyterMessageContent::CompleteRequest(req) => {
let user_code = req.code;
let cursor_pos = req.cursor_pos;
let lsp_completions = self
.repl_session_proxy
.lsp_completions(user_code.clone(), cursor_pos)
.await;
if !lsp_completions.is_empty() {
let matches: Vec<String> = lsp_completions
.iter()
.map(|item| item.new_text.clone())
.collect();
let cursor_start = lsp_completions
.first()
.map(|item| item.range.start)
.unwrap_or(cursor_pos);
let cursor_end = lsp_completions
.last()
.map(|item| item.range.end)
.unwrap_or(cursor_pos);
connection
.send(
messaging::CompleteReply {
matches,
cursor_start,
cursor_end,
metadata: Default::default(),
status: ReplyStatus::Ok,
error: None,
}
.as_child_of(parent),
)
.await?;
} else {
let expr = get_expr_from_line_at_pos(&user_code, cursor_pos);
// check if the expression is in the form `obj.prop`
let (completions, cursor_start) = if let Some(index) = expr.rfind('.')
{
let sub_expr = &expr[..index];
let prop_name = &expr[index + 1..];
let candidates = get_expression_property_names(
&mut self.repl_session_proxy,
sub_expr,
)
.await
.into_iter()
.filter(|n| {
!n.starts_with("Symbol(")
&& n.starts_with(prop_name)
&& n != &*repl::REPL_INTERNALS_NAME
})
.collect();
(candidates, cursor_pos - prop_name.len())
} else {
// combine results of declarations and globalThis properties
let mut candidates = get_expression_property_names(
&mut self.repl_session_proxy,
"globalThis",
)
.await
.into_iter()
.chain(
get_global_lexical_scope_names(&mut self.repl_session_proxy)
.await,
)
.filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME)
.collect::<Vec<_>>();
// sort and remove duplicates
candidates.sort();
candidates.dedup(); // make sure to sort first
(candidates, cursor_pos - expr.len())
};
connection
.send(
messaging::CompleteReply {
matches: completions,
cursor_start,
cursor_end: cursor_pos,
metadata: Default::default(),
status: ReplyStatus::Ok,
error: None,
}
.as_child_of(parent),
)
.await?;
}
}
JupyterMessageContent::InspectRequest(_req) => {
// TODO(bartlomieju?): implement introspection request
// The inspect request is used to get information about an object at cursor position.
// There are two detail levels: 0 is typically documentation, 1 is typically source code
// The response includes a MimeBundle to render the object:
// {
// "status": "ok",
// "found": true,
// "data": {
// "text/plain": "Plain documentation here",
// "text/html": "<div>Rich documentation here</div>",
// "application/json": {
// "key1": "value1",
// "key2": "value2"
// }
// },
// }
connection
.send(
messaging::InspectReply {
status: ReplyStatus::Ok,
found: false,
data: Default::default(),
metadata: Default::default(),
error: None,
}
.as_child_of(parent),
)
.await?;
}
JupyterMessageContent::IsCompleteRequest(_) => {
connection
.send(messaging::IsCompleteReply::complete().as_child_of(parent))
.await?;
}
JupyterMessageContent::KernelInfoRequest(_) => {
connection.send(kernel_info().as_child_of(parent)).await?;
}
JupyterMessageContent::CommOpen(comm) => {
connection
.send(
messaging::CommClose {
comm_id: comm.comm_id,
data: Default::default(),
}
.as_child_of(parent),
)
.await?;
}
JupyterMessageContent::HistoryRequest(_req) => {
connection
.send(
messaging::HistoryReply {
history: vec![],
error: None,
status: ReplyStatus::Ok,
}
.as_child_of(parent),
)
.await?;
}
JupyterMessageContent::InputReply(_rep) => {
// TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592
// NOTE: This will belong on the stdin channel, not the shell channel
}
JupyterMessageContent::CommInfoRequest(_req) => {
connection
.send(
messaging::CommInfoReply {
comms: Default::default(),
status: ReplyStatus::Ok,
error: None,
}
.as_child_of(parent),
)
.await?;
}
JupyterMessageContent::CommMsg(_)
| JupyterMessageContent::CommClose(_) => {
// Do nothing with regular comm messages
}
// Any unknown message type is ignored
_ => {
log::error!(
"Unrecognized shell message type: {}",
msg.content.message_type()
);
}
}
self
.send_iopub(messaging::Status::idle().as_child_of(parent))
.await?;
Ok(())
}
async fn handle_execution_request(
&mut self,
execute_request: messaging::ExecuteRequest,
parent_message: &JupyterMessage,
connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
if !execute_request.silent && execute_request.store_history {
self.execution_count.increment();
}
*self.last_execution_request.lock() = Some(parent_message.clone());
self
.send_iopub(
messaging::ExecuteInput {
execution_count: self.execution_count,
code: execute_request.code.clone(),
}
.as_child_of(parent_message),
)
.await?;
let result = self
.repl_session_proxy
.evaluate_line_with_object_wrapping(execute_request.code)
.await;
let evaluate_response = match result {
Ok(eval_response) => eval_response,
Err(err) => {
self
.send_iopub(
messaging::ErrorOutput {
ename: err.to_string(),
evalue: err.to_string(),
traceback: vec![],
}
.as_child_of(parent_message),
)
.await?;
connection
.send(
messaging::ExecuteReply {
execution_count: self.execution_count,
status: ReplyStatus::Error,
payload: Default::default(),
user_expressions: None,
error: None,
}
.as_child_of(parent_message),
)
.await?;
return Ok(());
}
};
let cdp::EvaluateResponse {
result,
exception_details,
} = evaluate_response.value;
if exception_details.is_none() {
publish_result(
&mut self.repl_session_proxy,
&result,
self.execution_count,
)
.await?;
connection
.send(
messaging::ExecuteReply {
execution_count: self.execution_count,
status: ReplyStatus::Ok,
user_expressions: None,
payload: Default::default(),
error: None,
}
.as_child_of(parent_message),
)
.await?;
// Let's sleep here for a few ms, so we give a chance to the task that is
// handling stdout and stderr streams to receive and flush the content.
// Otherwise, executing multiple cells one-by-one might lead to output
// from various cells be grouped together in another cell result.
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
} else if let Some(exception_details) = exception_details {
// Determine the exception value and name
let (name, message, stack) = if let Some(exception) =
exception_details.exception
{
let result = self
.repl_session_proxy
.call_function_on_args(
r#"
function(object) {
if (object instanceof Error) {
const name = "name" in object ? String(object.name) : "";
const message = "message" in object ? String(object.message) : "";
const stack = "stack" in object ? String(object.stack) : "";
return JSON.stringify({ name, message, stack });
} else {
const message = String(object);
return JSON.stringify({ name: "", message, stack: "" });
}
}
"#
.into(),
vec![exception],
)
.await?;
match result.result.value {
Some(serde_json::Value::String(str)) => {
if let Ok(object) =
serde_json::from_str::<HashMap<String, String>>(&str)
{
let get = |k| object.get(k).cloned().unwrap_or_default();
(get("name"), get("message"), get("stack"))
} else {
log::error!("Unexpected result while parsing JSON {str}");
("".into(), "".into(), "".into())
}
}
_ => {
log::error!("Unexpected result while parsing exception {result:?}");
("".into(), "".into(), "".into())
}
}
} else {
log::error!("Unexpectedly missing exception {exception_details:?}");
("".into(), "".into(), "".into())
};
let stack = if stack.is_empty() {
format!(
"{}\n at <unknown>",
serde_json::to_string(&message).unwrap()
)
} else {
stack
};
let traceback = format!("Stack trace:\n{stack}")
.split('\n')
.map(|s| s.to_owned())
.collect::<Vec<_>>();
let ename = if name.is_empty() {
"Unknown error".into()
} else {
name
};
let evalue = if message.is_empty() {
"(none)".into()
} else {
message
};
self
.send_iopub(
messaging::ErrorOutput {
ename: ename.clone(),
evalue: evalue.clone(),
traceback: traceback.clone(),
}
.as_child_of(parent_message),
)
.await?;
connection
.send(
messaging::ExecuteReply {
execution_count: self.execution_count,
status: ReplyStatus::Error,
error: Some(Box::new(ReplyError {
ename,
evalue,
traceback,
})),
user_expressions: None,
payload: Default::default(),
}
.as_child_of(parent_message),
)
.await?;
}
Ok(())
}
async fn send_iopub(
&mut self,
message: JupyterMessage,
) -> Result<(), AnyError> {
self.iopub_connection.lock().send(message.clone()).await
}
}
fn kernel_info() -> messaging::KernelInfoReply {
messaging::KernelInfoReply {
status: ReplyStatus::Ok,
protocol_version: "5.3".to_string(),
implementation: "Deno kernel".to_string(),
implementation_version: crate::version::DENO_VERSION_INFO.deno.to_string(),
language_info: messaging::LanguageInfo {
name: "typescript".to_string(),
version: crate::version::DENO_VERSION_INFO.typescript.to_string(),
mimetype: "text/x.typescript".to_string(),
file_extension: ".ts".to_string(),
pygments_lexer: "typescript".to_string(),
codemirror_mode: messaging::CodeMirrorMode::typescript(),
nbconvert_exporter: "script".to_string(),
},
banner: "Welcome to Deno kernel".to_string(),
help_links: vec![messaging::HelpLink {
text: "Visit Deno manual".to_string(),
url: "https://deno.land/manual".to_string(),
}],
debugger: false,
error: None,
}
}
async fn publish_result(
repl_session_proxy: &mut JupyterReplProxy,
evaluate_result: &cdp::RemoteObject,
execution_count: ExecutionCount,
) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
let arg0 = cdp::CallArgument {
value: Some(execution_count.into()),
unserializable_value: None,
object_id: None,
};
let arg1 = cdp::CallArgument::from(evaluate_result);
let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await
else {
return Ok(None);
};
if let Some(exception_details) = &response.exception_details {
// If the object doesn't have a Jupyter.display method or it throws an
// exception, we just ignore it and let the caller handle it.
log::error!("Exception encountered: {}", exception_details.text);
return Ok(None);
}
Ok(None)
}
// TODO(bartlomieju): dedup with repl::editor
fn get_expr_from_line_at_pos(line: &str, cursor_pos: usize) -> &str {
let start = line[..cursor_pos].rfind(is_word_boundary).unwrap_or(0);
let end = line[cursor_pos..]
.rfind(is_word_boundary)
.map(|i| cursor_pos + i)
.unwrap_or(cursor_pos);
let word = &line[start..end];
let word = word.strip_prefix(is_word_boundary).unwrap_or(word);
let word = word.strip_suffix(is_word_boundary).unwrap_or(word);
word
}
// TODO(bartlomieju): dedup with repl::editor
fn is_word_boundary(c: char) -> bool {
if matches!(c, '.' | '_' | '$') {
false
} else {
char::is_ascii_whitespace(&c) || char::is_ascii_punctuation(&c)
}
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_global_lexical_scope_names(
repl_session_proxy: &mut JupyterReplProxy,
) -> Vec<String> {
repl_session_proxy.global_lexical_scope_names().await.names
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_property_names(
repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Vec<String> {
// try to get the properties from the expression
if let Some(properties) =
get_object_expr_properties(repl_session_proxy, expr).await
{
return properties;
}
// otherwise fall back to the prototype
let expr_type = get_expression_type(repl_session_proxy, expr).await;
let object_expr = match expr_type.as_deref() {
// possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject
Some("object") => "Object.prototype",
Some("function") => "Function.prototype",
Some("string") => "String.prototype",
Some("boolean") => "Boolean.prototype",
Some("bigint") => "BigInt.prototype",
Some("number") => "Number.prototype",
_ => return Vec::new(), // undefined, symbol, and unhandled
};
get_object_expr_properties(repl_session_proxy, object_expr)
.await
.unwrap_or_default()
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_type(
repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Option<String> {
evaluate_expression(repl_session_proxy, expr)
.await
.map(|res| res.result.kind)
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_object_expr_properties(
repl_session_proxy: &mut JupyterReplProxy,
object_expr: &str,
) -> Option<Vec<String>> {
let evaluate_result =
evaluate_expression(repl_session_proxy, object_expr).await?;
let object_id = evaluate_result.result.object_id?;
let get_properties_response =
repl_session_proxy.get_properties(object_id.clone()).await?;
Some(
get_properties_response
.result
.into_iter()
.map(|prop| prop.name)
.collect(),
)
}
// TODO(bartlomieju): dedup with repl::editor
async fn evaluate_expression(
repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Option<cdp::EvaluateResponse> {
let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?;
if evaluate_response.exception_details.is_some() {
None
} else {
Some(evaluate_response)
}
}