From 0dfd333649acd85508e62bb60e3be4946e543597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 9 Oct 2024 09:04:15 +0100 Subject: [PATCH] fix(jupyter): keep running event loop when waiting for messages (#26049) Closes https://github.com/denoland/deno/issues/24421 --- cli/tools/jupyter/mod.rs | 108 +++++++++++++++++------------ tests/integration/jupyter_tests.rs | 34 +++++++++ 2 files changed, 97 insertions(+), 45 deletions(-) diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index 71e947dddb..0ffd0da1ee 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -357,56 +357,74 @@ pub struct JupyterReplSession { impl JupyterReplSession { pub async fn start(&mut self) { + let mut poll_worker = true; loop { - let Some(msg) = self.rx.recv().await else { - break; - }; - let resp = match msg { - JupyterReplRequest::LspCompletions { - line_text, - position, - } => JupyterReplResponse::LspCompletions( - self.lsp_completions(&line_text, position).await, - ), - JupyterReplRequest::JsGetProperties { object_id } => { - JupyterReplResponse::JsGetProperties( - self.get_properties(object_id).await, - ) - } - JupyterReplRequest::JsEvaluate { expr } => { - JupyterReplResponse::JsEvaluate(self.evaluate(expr).await) - } - JupyterReplRequest::JsGlobalLexicalScopeNames => { - JupyterReplResponse::JsGlobalLexicalScopeNames( - self.global_lexical_scope_names().await, - ) - } - JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => { - JupyterReplResponse::JsEvaluateLineWithObjectWrapping( - self.evaluate_line_with_object_wrapping(&line).await, - ) - } - JupyterReplRequest::JsCallFunctionOnArgs { - function_declaration, - args, - } => JupyterReplResponse::JsCallFunctionOnArgs( - self - .call_function_on_args(function_declaration, &args) - .await, - ), - JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => { - JupyterReplResponse::JsCallFunctionOn( - self.call_function_on(arg0, arg1).await, - ) - } - }; + tokio::select! { + biased; - let Ok(()) = self.tx.send(resp) else { - break; - }; + maybe_message = self.rx.recv() => { + let Some(msg) = maybe_message else { + break; + }; + if self.handle_message(msg).await.is_err() { + break; + } + poll_worker = true; + }, + _ = self.repl_session.run_event_loop(), if poll_worker => { + poll_worker = false; + } + } } } + async fn handle_message( + &mut self, + msg: JupyterReplRequest, + ) -> Result<(), AnyError> { + let resp = match msg { + JupyterReplRequest::LspCompletions { + line_text, + position, + } => JupyterReplResponse::LspCompletions( + self.lsp_completions(&line_text, position).await, + ), + JupyterReplRequest::JsGetProperties { object_id } => { + JupyterReplResponse::JsGetProperties( + self.get_properties(object_id).await, + ) + } + JupyterReplRequest::JsEvaluate { expr } => { + JupyterReplResponse::JsEvaluate(self.evaluate(expr).await) + } + JupyterReplRequest::JsGlobalLexicalScopeNames => { + JupyterReplResponse::JsGlobalLexicalScopeNames( + self.global_lexical_scope_names().await, + ) + } + JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => { + JupyterReplResponse::JsEvaluateLineWithObjectWrapping( + self.evaluate_line_with_object_wrapping(&line).await, + ) + } + JupyterReplRequest::JsCallFunctionOnArgs { + function_declaration, + args, + } => JupyterReplResponse::JsCallFunctionOnArgs( + self + .call_function_on_args(function_declaration, &args) + .await, + ), + JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => { + JupyterReplResponse::JsCallFunctionOn( + self.call_function_on(arg0, arg1).await, + ) + } + }; + + self.tx.send(resp).map_err(|e| e.into()) + } + pub async fn lsp_completions( &mut self, line_text: &str, diff --git a/tests/integration/jupyter_tests.rs b/tests/integration/jupyter_tests.rs index 1b2c213118..e99780a276 100644 --- a/tests/integration/jupyter_tests.rs +++ b/tests/integration/jupyter_tests.rs @@ -628,3 +628,37 @@ async fn jupyter_store_history_false() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn jupyter_http_server() -> Result<()> { + let (_ctx, client, _process) = setup().await; + client + .send( + Shell, + "execute_request", + json!({ + "silent": false, + "store_history": false, + "code": r#"Deno.serve({ port: 10234 }, (req) => Response.json({ hello: "world" }))"#, + }), + ) + .await?; + + let reply = client.recv(Shell).await?; + assert_eq!(reply.header.msg_type, "execute_reply"); + assert_json_subset( + reply.content, + json!({ + "status": "ok", + "execution_count": 0, + }), + ); + + for _ in 0..3 { + let resp = reqwest::get("http://localhost:10234").await.unwrap(); + let text: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(text, json!({ "hello": "world" })); + } + + Ok(()) +}