1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

fix: adapt to new jupyter runtime API and include session IDs (#24762)

Closes #24737, #24437.
This commit is contained in:
Kyle Kelley 2024-07-27 01:39:08 -07:00 committed by GitHub
parent 06b6352292
commit 63f8218a7d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 42 additions and 38 deletions

11
Cargo.lock generated
View file

@ -5563,9 +5563,9 @@ dependencies = [
[[package]] [[package]]
name = "runtimelib" name = "runtimelib"
version = "0.11.0" version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81f4969d577fe13ef40c8eb6fad2ccc66c26c800410672c847f5397699240b9d" checksum = "0c3d817764e3971867351e6103955b17d808f5330e9ef63aaaaab55bf8c664c1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64 0.22.1", "base64 0.22.1",
@ -5573,6 +5573,7 @@ dependencies = [
"chrono", "chrono",
"data-encoding", "data-encoding",
"dirs", "dirs",
"futures",
"glob", "glob",
"rand", "rand",
"ring", "ring",
@ -6899,7 +6900,7 @@ dependencies = [
"base64 0.21.7", "base64 0.21.7",
"bytes", "bytes",
"console_static_text", "console_static_text",
"deno_unsync 0.3.10", "deno_unsync 0.4.0",
"denokv_proto", "denokv_proto",
"fastwebsockets", "fastwebsockets",
"flate2", "flate2",
@ -8272,9 +8273,9 @@ dependencies = [
[[package]] [[package]]
name = "zeromq" name = "zeromq"
version = "0.3.4" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2db35fbc7d9082d39a85c9831ec5dc7b7b135038d2f00bb5ff2a4c0275893da1" checksum = "fb0560d00172817b7f7c2265060783519c475702ae290b154115ca75e976d4d0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"asynchronous-codec", "asynchronous-codec",

View file

@ -191,7 +191,7 @@ url = { version = "< 2.5.0", features = ["serde", "expose_internals"] }
uuid = { version = "1.3.0", features = ["v4"] } uuid = { version = "1.3.0", features = ["v4"] }
webpki-roots = "0.26" webpki-roots = "0.26"
which = "4.2.5" which = "4.2.5"
zeromq = { version = "=0.3.4", default-features = false, features = ["tcp-transport", "tokio-runtime"] } zeromq = { version = "=0.4.0", default-features = false, features = ["tcp-transport", "tokio-runtime"] }
zstd = "=0.12.4" zstd = "=0.12.4"
# crypto # crypto

View file

@ -116,7 +116,7 @@ hyper-util.workspace = true
import_map = { version = "=0.20.0", features = ["ext"] } import_map = { version = "=0.20.0", features = ["ext"] }
indexmap.workspace = true indexmap.workspace = true
jsonc-parser.workspace = true jsonc-parser.workspace = true
jupyter_runtime = { package = "runtimelib", version = "=0.11.0" } jupyter_runtime = { package = "runtimelib", version = "=0.14.0" }
lazy-regex.workspace = true lazy-regex.workspace = true
libc.workspace = true libc.workspace = true
libz-sys.workspace = true libz-sys.workspace = true

View file

@ -65,14 +65,12 @@ pub fn op_jupyter_input(
return Ok(None); return Ok(None);
} }
let msg = JupyterMessage::new( let content = InputRequest {
InputRequest { prompt,
prompt, password: is_password,
password: is_password, };
}
.into(), let msg = JupyterMessage::new(content, Some(&last_request));
Some(&last_request),
);
let Ok(()) = stdin_connection_proxy.lock().tx.send(msg) else { let Ok(()) = stdin_connection_proxy.lock().tx.send(msg) else {
return Ok(None); return Ok(None);
@ -149,13 +147,13 @@ pub fn op_print(
let sender = state.borrow_mut::<mpsc::UnboundedSender<StreamContent>>(); let sender = state.borrow_mut::<mpsc::UnboundedSender<StreamContent>>();
if is_err { if is_err {
if let Err(err) = sender.send(StreamContent::stderr(msg.into())) { if let Err(err) = sender.send(StreamContent::stderr(msg)) {
log::error!("Failed to send stderr message: {}", err); log::error!("Failed to send stderr message: {}", err);
} }
return Ok(()); return Ok(());
} }
if let Err(err) = sender.send(StreamContent::stdout(msg.into())) { if let Err(err) = sender.send(StreamContent::stdout(msg)) {
log::error!("Failed to send stdout message: {}", err); log::error!("Failed to send stdout message: {}", err);
} }
Ok(()) Ok(())

View file

@ -125,9 +125,7 @@ pub async fn kernel(
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self self
.0 .0
.send(StreamContent::stdout( .send(StreamContent::stdout(&String::from_utf8_lossy(buf)))
String::from_utf8_lossy(buf).into_owned(),
))
.ok(); .ok();
Ok(buf.len()) Ok(buf.len())
} }

View file

@ -20,11 +20,11 @@ use deno_core::parking_lot::Mutex;
use deno_core::serde_json; use deno_core::serde_json;
use deno_core::CancelFuture; use deno_core::CancelFuture;
use deno_core::CancelHandle; use deno_core::CancelHandle;
use jupyter_runtime::ExecutionCount;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use jupyter_runtime::messaging; use jupyter_runtime::messaging;
use jupyter_runtime::AsChildOf;
use jupyter_runtime::ConnectionInfo; use jupyter_runtime::ConnectionInfo;
use jupyter_runtime::JupyterMessage; use jupyter_runtime::JupyterMessage;
use jupyter_runtime::JupyterMessageContent; use jupyter_runtime::JupyterMessageContent;
@ -34,11 +34,12 @@ use jupyter_runtime::KernelShellConnection;
use jupyter_runtime::ReplyError; use jupyter_runtime::ReplyError;
use jupyter_runtime::ReplyStatus; use jupyter_runtime::ReplyStatus;
use jupyter_runtime::StreamContent; use jupyter_runtime::StreamContent;
use uuid::Uuid;
use super::JupyterReplProxy; use super::JupyterReplProxy;
pub struct JupyterServer { pub struct JupyterServer {
execution_count: usize, execution_count: ExecutionCount,
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
iopub_connection: Arc<Mutex<KernelIoPubConnection>>, iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
repl_session_proxy: JupyterReplProxy, repl_session_proxy: JupyterReplProxy,
@ -62,16 +63,22 @@ impl JupyterServer {
repl_session_proxy: JupyterReplProxy, repl_session_proxy: JupyterReplProxy,
setup_tx: oneshot::Sender<StartupData>, setup_tx: oneshot::Sender<StartupData>,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let session_id = Uuid::new_v4().to_string();
let mut heartbeat = let mut heartbeat =
connection_info.create_kernel_heartbeat_connection().await?; connection_info.create_kernel_heartbeat_connection().await?;
let shell_connection = let shell_connection = connection_info
connection_info.create_kernel_shell_connection().await?; .create_kernel_shell_connection(&session_id)
let control_connection = .await?;
connection_info.create_kernel_control_connection().await?; let control_connection = connection_info
let mut stdin_connection = .create_kernel_control_connection(&session_id)
connection_info.create_kernel_stdin_connection().await?; .await?;
let iopub_connection = let mut stdin_connection = connection_info
connection_info.create_kernel_iopub_connection().await?; .create_kernel_stdin_connection(&session_id)
.await?;
let iopub_connection = connection_info
.create_kernel_iopub_connection(&session_id)
.await?;
let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let iopub_connection = Arc::new(Mutex::new(iopub_connection));
let last_execution_request = Arc::new(Mutex::new(None)); let last_execution_request = Arc::new(Mutex::new(None));
@ -96,7 +103,7 @@ impl JupyterServer {
let cancel_handle = CancelHandle::new_rc(); let cancel_handle = CancelHandle::new_rc();
let mut server = Self { let mut server = Self {
execution_count: 0, execution_count: ExecutionCount::new(0),
iopub_connection: iopub_connection.clone(), iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(), last_execution_request: last_execution_request.clone(),
repl_session_proxy, repl_session_proxy,
@ -468,7 +475,7 @@ impl JupyterServer {
connection: &mut KernelShellConnection, connection: &mut KernelShellConnection,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
if !execute_request.silent && execute_request.store_history { if !execute_request.silent && execute_request.store_history {
self.execution_count += 1; self.execution_count.increment();
} }
*self.last_execution_request.lock() = Some(parent_message.clone()); *self.last_execution_request.lock() = Some(parent_message.clone());
@ -634,11 +641,11 @@ impl JupyterServer {
messaging::ExecuteReply { messaging::ExecuteReply {
execution_count: self.execution_count, execution_count: self.execution_count,
status: ReplyStatus::Error, status: ReplyStatus::Error,
error: Some(ReplyError { error: Some(Box::new(ReplyError {
ename, ename,
evalue, evalue,
traceback, traceback,
}), })),
user_expressions: None, user_expressions: None,
payload: Default::default(), payload: Default::default(),
} }
@ -654,7 +661,7 @@ impl JupyterServer {
&mut self, &mut self,
message: JupyterMessage, message: JupyterMessage,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
self.iopub_connection.lock().send(message).await self.iopub_connection.lock().send(message.clone()).await
} }
} }
@ -686,10 +693,10 @@ fn kernel_info() -> messaging::KernelInfoReply {
async fn publish_result( async fn publish_result(
repl_session_proxy: &mut JupyterReplProxy, repl_session_proxy: &mut JupyterReplProxy,
evaluate_result: &cdp::RemoteObject, evaluate_result: &cdp::RemoteObject,
execution_count: usize, execution_count: ExecutionCount,
) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> { ) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
let arg0 = cdp::CallArgument { let arg0 = cdp::CallArgument {
value: Some(serde_json::Value::Number(execution_count.into())), value: Some(execution_count.into()),
unserializable_value: None, unserializable_value: None,
object_id: None, object_id: None,
}; };

View file

@ -493,7 +493,7 @@ async fn jupyter_heartbeat_echoes() -> Result<()> {
let (_ctx, client, _process) = setup().await; let (_ctx, client, _process) = setup().await;
client.send_heartbeat(b"ping").await?; client.send_heartbeat(b"ping").await?;
let msg = client.recv_heartbeat().await?; let msg = client.recv_heartbeat().await?;
assert_eq!(msg, Bytes::from_static(b"ping")); assert_eq!(msg, Bytes::from_static(b"pong"));
Ok(()) Ok(())
} }