From 89026abe395c22eb2ace4ea5f948189daa1dadf1 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 25 May 2023 23:01:33 -0400 Subject: [PATCH] chore(lsp/tests): diagnostic synchronization (#19264) Fixes the flaky lsp test by having better synchronization of diagnostics between the client and server for testing purposes. --- cli/lsp/client.rs | 34 +++++++ cli/lsp/diagnostics.rs | 134 ++++++++++++++++++++++++--- cli/lsp/language_server.rs | 27 ++++-- cli/lsp/lsp_custom.rs | 18 ++++ cli/lsp/mod.rs | 18 +++- cli/tests/integration/lsp_tests.rs | 104 ++++++++++----------- test_util/src/lsp.rs | 139 +++++++++++++++++++---------- 7 files changed, 349 insertions(+), 125 deletions(-) diff --git a/cli/lsp/client.rs b/cli/lsp/client.rs index 4923a4585e..5f1a7fcef2 100644 --- a/cli/lsp/client.rs +++ b/cli/lsp/client.rs @@ -62,6 +62,20 @@ impl Client { }); } + /// This notification is sent to the client during internal testing + /// purposes only in order to let the test client know when the latest + /// diagnostics have been published. + pub fn send_diagnostic_batch_notification( + &self, + params: lsp_custom::DiagnosticBatchNotificationParams, + ) { + // do on a task in case the caller currently is in the lsp lock + let client = self.0.clone(); + spawn(async move { + client.send_diagnostic_batch_notification(params).await; + }); + } + pub fn send_test_notification(&self, params: TestingNotification) { // do on a task in case the caller currently is in the lsp lock let client = self.0.clone(); @@ -160,6 +174,10 @@ trait ClientTrait: Send + Sync { &self, params: lsp_custom::RegistryStateNotificationParams, ); + async fn send_diagnostic_batch_notification( + &self, + params: lsp_custom::DiagnosticBatchNotificationParams, + ); async fn send_test_notification(&self, params: TestingNotification); async fn specifier_configurations( &self, @@ -197,6 +215,16 @@ impl ClientTrait for TowerClient { .await } + async fn send_diagnostic_batch_notification( + &self, + params: lsp_custom::DiagnosticBatchNotificationParams, + ) { + self + .0 + .send_notification::(params) + .await + } + async fn send_test_notification(&self, notification: TestingNotification) { match notification { TestingNotification::Module(params) => { @@ -311,6 +339,12 @@ impl ClientTrait for ReplClient { ) { } + async fn send_diagnostic_batch_notification( + &self, + _params: lsp_custom::DiagnosticBatchNotificationParams, + ) { + } + async fn send_test_notification(&self, _params: TestingNotification) {} async fn specifier_configurations( diff --git a/cli/lsp/diagnostics.rs b/cli/lsp/diagnostics.rs index 1a57ad03b8..a5e9d7bf8e 100644 --- a/cli/lsp/diagnostics.rs +++ b/cli/lsp/diagnostics.rs @@ -16,6 +16,7 @@ use super::tsc::TsServer; use crate::args::LintOptions; use crate::graph_util; use crate::graph_util::enhanced_resolution_error_message; +use crate::lsp::lsp_custom::DiagnosticBatchNotificationParams; use crate::tools::lint::get_configured_rules; use deno_ast::MediaType; @@ -37,6 +38,7 @@ use deno_runtime::tokio_util::create_basic_runtime; use deno_semver::npm::NpmPackageReqReference; use log::error; use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::thread; use tokio::sync::mpsc; @@ -45,8 +47,13 @@ use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tower_lsp::lsp_types as lsp; -pub type SnapshotForDiagnostics = - (Arc, Arc, LintOptions); +#[derive(Debug)] +pub struct DiagnosticServerUpdateMessage { + pub snapshot: Arc, + pub config: Arc, + pub lint_options: LintOptions, +} + pub type DiagnosticRecord = (ModuleSpecifier, Option, Vec); pub type DiagnosticVec = Vec; @@ -145,13 +152,55 @@ impl TsDiagnosticsStore { } } +pub fn should_send_diagnostic_batch_index_notifications() -> bool { + crate::args::has_flag_env_var( + "DENO_DONT_USE_INTERNAL_LSP_DIAGNOSTIC_SYNC_FLAG", + ) +} + +#[derive(Clone, Debug)] +struct DiagnosticBatchCounter(Option>); + +impl Default for DiagnosticBatchCounter { + fn default() -> Self { + if should_send_diagnostic_batch_index_notifications() { + Self(Some(Default::default())) + } else { + Self(None) + } + } +} + +impl DiagnosticBatchCounter { + pub fn inc(&self) -> Option { + self + .0 + .as_ref() + .map(|value| value.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1) + } + + pub fn get(&self) -> Option { + self + .0 + .as_ref() + .map(|value| value.load(std::sync::atomic::Ordering::SeqCst)) + } +} + +#[derive(Debug)] +struct ChannelMessage { + message: DiagnosticServerUpdateMessage, + batch_index: Option, +} + #[derive(Debug)] pub struct DiagnosticsServer { - channel: Option>, + channel: Option>, ts_diagnostics: TsDiagnosticsStore, client: Client, performance: Arc, ts_server: Arc, + batch_counter: DiagnosticBatchCounter, } impl DiagnosticsServer { @@ -166,6 +215,7 @@ impl DiagnosticsServer { client, performance, ts_server, + batch_counter: Default::default(), } } @@ -187,7 +237,7 @@ impl DiagnosticsServer { #[allow(unused_must_use)] pub fn start(&mut self) { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = mpsc::unbounded_channel::(); self.channel = Some(tx); let client = self.client.clone(); let performance = self.performance.clone(); @@ -208,7 +258,17 @@ impl DiagnosticsServer { match rx.recv().await { // channel has closed None => break, - Some((snapshot, config, lint_options)) => { + Some(message) => { + let ChannelMessage { + message: + DiagnosticServerUpdateMessage { + snapshot, + config, + lint_options, + }, + batch_index, + } = message; + // cancel the previous run token.cancel(); token = CancellationToken::new(); @@ -255,6 +315,7 @@ impl DiagnosticsServer { }) .unwrap_or_default(); + let messages_len = diagnostics.len(); if !token.is_cancelled() { ts_diagnostics_store.update(&diagnostics); diagnostics_publisher.publish(diagnostics, &token).await; @@ -263,6 +324,17 @@ impl DiagnosticsServer { performance.measure(mark); } } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); + } } })); @@ -286,10 +358,24 @@ impl DiagnosticsServer { ) .await; - diagnostics_publisher.publish(diagnostics, &token).await; - + let messages_len = diagnostics.len(); if !token.is_cancelled() { - performance.measure(mark); + diagnostics_publisher.publish(diagnostics, &token).await; + + if !token.is_cancelled() { + performance.measure(mark); + } + } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); } } })); @@ -315,10 +401,24 @@ impl DiagnosticsServer { ) .await; - diagnostics_publisher.publish(diagnostics, &token).await; - + let messages_len = diagnostics.len(); if !token.is_cancelled() { - performance.measure(mark); + diagnostics_publisher.publish(diagnostics, &token).await; + + if !token.is_cancelled() { + performance.measure(mark); + } + } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); } } })); @@ -329,15 +429,23 @@ impl DiagnosticsServer { }); } + pub fn latest_batch_index(&self) -> Option { + self.batch_counter.get() + } + pub fn update( &self, - message: SnapshotForDiagnostics, + message: DiagnosticServerUpdateMessage, ) -> Result<(), AnyError> { // todo(dsherret): instead of queuing up messages, it would be better to // instead only store the latest message (ex. maybe using a // tokio::sync::watch::channel) if let Some(tx) = &self.channel { - tx.send(message).map_err(|err| err.into()) + tx.send(ChannelMessage { + message, + batch_index: self.batch_counter.inc(), + }) + .map_err(|err| err.into()) } else { Err(anyhow!("diagnostics server not started")) } diff --git a/cli/lsp/language_server.rs b/cli/lsp/language_server.rs index 9a2b067c60..7c4191c82d 100644 --- a/cli/lsp/language_server.rs +++ b/cli/lsp/language_server.rs @@ -46,6 +46,7 @@ use super::completions; use super::config::Config; use super::config::SETTINGS_SECTION; use super::diagnostics; +use super::diagnostics::DiagnosticServerUpdateMessage; use super::diagnostics::DiagnosticsServer; use super::documents::to_hover_text; use super::documents::to_lsp_range; @@ -342,6 +343,22 @@ impl LanguageServer { } } + /// This request is only used by the lsp integration tests to + /// coordinate the tests receiving the latest diagnostics. + pub async fn latest_diagnostic_batch_index_request( + &self, + ) -> LspResult> { + Ok( + self + .0 + .read() + .await + .diagnostics_server + .latest_batch_index() + .map(|v| v.into()), + ) + } + pub async fn performance_request(&self) -> LspResult> { Ok(Some(self.0.read().await.get_performance())) } @@ -2932,11 +2949,11 @@ impl Inner { } fn send_diagnostics_update(&self) { - let snapshot = ( - self.snapshot(), - self.config.snapshot(), - self.lint_options.clone(), - ); + let snapshot = DiagnosticServerUpdateMessage { + snapshot: self.snapshot(), + config: self.config.snapshot(), + lint_options: self.lint_options.clone(), + }; if let Err(err) = self.diagnostics_server.update(snapshot) { error!("Cannot update diagnostics: {}", err); } diff --git a/cli/lsp/lsp_custom.rs b/cli/lsp/lsp_custom.rs index 70a245a663..24c4bc131d 100644 --- a/cli/lsp/lsp_custom.rs +++ b/cli/lsp/lsp_custom.rs @@ -10,6 +10,8 @@ pub const TASK_REQUEST: &str = "deno/task"; pub const RELOAD_IMPORT_REGISTRIES_REQUEST: &str = "deno/reloadImportRegistries"; pub const VIRTUAL_TEXT_DOCUMENT: &str = "deno/virtualTextDocument"; +pub const LATEST_DIAGNOSTIC_BATCH_INDEX: &str = + "deno/internalLatestDiagnosticBatchIndex"; // While lsp_types supports inlay hints currently, tower_lsp does not. pub const INLAY_HINT: &str = "textDocument/inlayHint"; @@ -44,3 +46,19 @@ impl lsp::notification::Notification for RegistryStateNotification { pub struct VirtualTextDocumentParams { pub text_document: lsp::TextDocumentIdentifier, } + +#[derive(Debug, Deserialize, Serialize)] +pub struct DiagnosticBatchNotificationParams { + pub batch_index: usize, + pub messages_len: usize, +} + +/// This notification is only sent for testing purposes +/// in order to know what the latest diagnostics are. +pub enum DiagnosticBatchNotification {} + +impl lsp::notification::Notification for DiagnosticBatchNotification { + type Params = DiagnosticBatchNotificationParams; + + const METHOD: &'static str = "deno/internalTestDiagnosticBatch"; +} diff --git a/cli/lsp/mod.rs b/cli/lsp/mod.rs index 0d5552519e..d13c90089f 100644 --- a/cli/lsp/mod.rs +++ b/cli/lsp/mod.rs @@ -8,6 +8,8 @@ use crate::lsp::language_server::LanguageServer; pub use repl::ReplCompletionItem; pub use repl::ReplLanguageServer; +use self::diagnostics::should_send_diagnostic_batch_index_notifications; + mod analysis; mod cache; mod capabilities; @@ -36,7 +38,7 @@ pub async fn start() -> Result<(), AnyError> { let stdin = tokio::io::stdin(); let stdout = tokio::io::stdout(); - let (service, socket) = LspService::build(|client| { + let builder = LspService::build(|client| { language_server::LanguageServer::new(client::Client::from_tower(client)) }) .custom_method(lsp_custom::CACHE_REQUEST, LanguageServer::cache_request) @@ -58,8 +60,18 @@ pub async fn start() -> Result<(), AnyError> { lsp_custom::VIRTUAL_TEXT_DOCUMENT, LanguageServer::virtual_text_document, ) - .custom_method(lsp_custom::INLAY_HINT, LanguageServer::inlay_hint) - .finish(); + .custom_method(lsp_custom::INLAY_HINT, LanguageServer::inlay_hint); + + let builder = if should_send_diagnostic_batch_index_notifications() { + builder.custom_method( + lsp_custom::LATEST_DIAGNOSTIC_BATCH_INDEX, + LanguageServer::latest_diagnostic_batch_index_request, + ) + } else { + builder + }; + + let (service, socket) = builder.finish(); Server::new(stdin, stdout, socket).serve(service).await; diff --git a/cli/tests/integration/lsp_tests.rs b/cli/tests/integration/lsp_tests.rs index 4ddf5d4e35..eee83c4a2d 100644 --- a/cli/tests/integration/lsp_tests.rs +++ b/cli/tests/integration/lsp_tests.rs @@ -53,7 +53,7 @@ fn lsp_init_tsconfig() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); client.shutdown(); } @@ -93,7 +93,7 @@ fn lsp_tsconfig_types() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); client.shutdown(); } @@ -121,7 +121,7 @@ fn lsp_tsconfig_bad_config_path() { "text": "console.log(Deno.args);\n" } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); } #[test] @@ -142,7 +142,7 @@ fn lsp_triple_slash_types() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); client.shutdown(); } @@ -176,7 +176,7 @@ fn lsp_import_map() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); let res = client.write_request( "textDocument/hover", @@ -223,7 +223,7 @@ fn lsp_import_map_data_url() { })); // This indicates that the import map is applied correctly. - assert!(diagnostics.viewed().iter().any(|diagnostic| diagnostic.code + assert!(diagnostics.all().iter().any(|diagnostic| diagnostic.code == Some(lsp::NumberOrString::String("no-cache".to_string())) && diagnostic .message @@ -268,7 +268,7 @@ fn lsp_import_map_config_file() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); let res = client.write_request( "textDocument/hover", @@ -329,7 +329,7 @@ fn lsp_import_map_embedded_in_config_file() { } })); - assert_eq!(diagnostics.viewed().len(), 0); + assert_eq!(diagnostics.all().len(), 0); let res = client.write_request( "textDocument/hover", @@ -431,7 +431,7 @@ fn lsp_import_assertions() { assert_eq!( json!( diagnostics - .with_file_and_source("file:///a/a.ts", "deno") + .messages_with_file_and_source("file:///a/a.ts", "deno") .diagnostics ), json!([ @@ -3692,7 +3692,7 @@ fn lsp_code_actions_deno_cache() { } })); assert_eq!( - diagnostics.with_source("deno"), + diagnostics.messages_with_source("deno"), serde_json::from_value(json!({ "uri": "file:///a/file.ts", "diagnostics": [{ @@ -3782,7 +3782,7 @@ fn lsp_code_actions_deno_cache_npm() { } })); assert_eq!( - diagnostics.with_source("deno"), + diagnostics.messages_with_source("deno"), serde_json::from_value(json!({ "uri": "file:///a/file.ts", "diagnostics": [{ @@ -5139,7 +5139,7 @@ fn lsp_completions_node_specifier() { })); let non_existent_diagnostics = diagnostics - .with_file_and_source("file:///a/file.ts", "deno") + .messages_with_file_and_source("file:///a/file.ts", "deno") .diagnostics .into_iter() .filter(|d| { @@ -5183,7 +5183,7 @@ fn lsp_completions_node_specifier() { ); let diagnostics = client.read_diagnostics(); let diagnostics = diagnostics - .with_file_and_source("file:///a/file.ts", "deno") + .messages_with_file_and_source("file:///a/file.ts", "deno") .diagnostics .into_iter() .filter(|d| { @@ -5269,7 +5269,7 @@ fn lsp_completions_node_specifier() { let diagnostics = client.read_diagnostics(); let cache_diagnostics = diagnostics - .with_file_and_source("file:///a/file.ts", "deno") + .messages_with_file_and_source("file:///a/file.ts", "deno") .diagnostics .into_iter() .filter(|d| { @@ -5539,7 +5539,7 @@ fn lsp_cache_location() { "text": "import * as a from \"http://127.0.0.1:4545/xTypeScriptTypes.js\";\n// @deno-types=\"http://127.0.0.1:4545/type_definitions/foo.d.ts\"\nimport * as b from \"http://127.0.0.1:4545/type_definitions/foo.js\";\nimport * as c from \"http://127.0.0.1:4545/subdir/type_reference.js\";\nimport * as d from \"http://127.0.0.1:4545/subdir/mod1.ts\";\nimport * as e from \"data:application/typescript;base64,ZXhwb3J0IGNvbnN0IGEgPSAiYSI7CgpleHBvcnQgZW51bSBBIHsKICBBLAogIEIsCiAgQywKfQo=\";\nimport * as f from \"./file_01.ts\";\nimport * as g from \"http://localhost:4545/x/a/mod.ts\";\n\nconsole.log(a, b, c, d, e, f, g);\n" } })); - assert_eq!(diagnostics.viewed().len(), 7); + assert_eq!(diagnostics.all().len(), 7); client.write_request( "deno/cache", json!({ @@ -5634,7 +5634,7 @@ fn lsp_tls_cert() { "text": "import * as a from \"https://localhost:5545/xTypeScriptTypes.js\";\n// @deno-types=\"https://localhost:5545/type_definitions/foo.d.ts\"\nimport * as b from \"https://localhost:5545/type_definitions/foo.js\";\nimport * as c from \"https://localhost:5545/subdir/type_reference.js\";\nimport * as d from \"https://localhost:5545/subdir/mod1.ts\";\nimport * as e from \"data:application/typescript;base64,ZXhwb3J0IGNvbnN0IGEgPSAiYSI7CgpleHBvcnQgZW51bSBBIHsKICBBLAogIEIsCiAgQywKfQo=\";\nimport * as f from \"./file_01.ts\";\nimport * as g from \"http://localhost:4545/x/a/mod.ts\";\n\nconsole.log(a, b, c, d, e, f, g);\n" } })); - let diagnostics = diagnostics.viewed(); + let diagnostics = diagnostics.all(); assert_eq!(diagnostics.len(), 7); client.write_request( "deno/cache", @@ -5725,7 +5725,7 @@ fn lsp_diagnostics_warn_redirect() { ); let diagnostics = client.read_diagnostics(); assert_eq!( - diagnostics.with_source("deno"), + diagnostics.messages_with_source("deno"), lsp::PublishDiagnosticsParams { uri: Url::parse("file:///a/file.ts").unwrap(), diagnostics: vec![ @@ -5802,7 +5802,10 @@ fn lsp_redirect_quick_fix() { ], }), ); - let diagnostics = client.read_diagnostics().with_source("deno").diagnostics; + let diagnostics = client + .read_diagnostics() + .messages_with_source("deno") + .diagnostics; let res = client.write_request( "textDocument/codeAction", json!(json!({ @@ -5872,35 +5875,25 @@ fn lsp_diagnostics_deprecated() { }, })); assert_eq!( - json!(diagnostics.0), - json!([ - { - "uri": "file:///a/file.ts", - "diagnostics": [], - "version": 1 - }, { - "uri": "file:///a/file.ts", - "diagnostics": [], - "version": 1 - }, { - "uri": "file:///a/file.ts", - "diagnostics": [ - { - "range": { - "start": { "line": 3, "character": 0 }, - "end": { "line": 3, "character": 1 } - }, - "severity": 4, - "code": 6385, - "source": "deno-ts", - "message": "'a' is deprecated.", - "relatedInformation": [], - "tags": [2] - } - ], - "version": 1 - } - ]) + json!(diagnostics.all_messages()), + json!([{ + "uri": "file:///a/file.ts", + "diagnostics": [ + { + "range": { + "start": { "line": 3, "character": 0 }, + "end": { "line": 3, "character": 1 } + }, + "severity": 4, + "code": 6385, + "source": "deno-ts", + "message": "'a' is deprecated.", + "relatedInformation": [], + "tags": [2] + } + ], + "version": 1 + }]) ); client.shutdown(); } @@ -5929,7 +5922,7 @@ fn lsp_diagnostics_deno_types() { } }), ); - assert_eq!(diagnostics.viewed().len(), 5); + assert_eq!(diagnostics.all().len(), 5); client.shutdown(); } @@ -5963,7 +5956,8 @@ fn lsp_diagnostics_refresh_dependents() { } })); assert_eq!( - json!(diagnostics.with_file_and_source("file:///a/file_02.ts", "deno-ts")), + json!(diagnostics + .messages_with_file_and_source("file:///a/file_02.ts", "deno-ts")), json!({ "uri": "file:///a/file_02.ts", "diagnostics": [ @@ -6002,7 +5996,7 @@ fn lsp_diagnostics_refresh_dependents() { }), ); let diagnostics = client.read_diagnostics(); - assert_eq!(diagnostics.viewed().len(), 0); // no diagnostics now + assert_eq!(diagnostics.all().len(), 0); // no diagnostics now client.shutdown(); assert_eq!(client.queue_len(), 0); @@ -7056,7 +7050,7 @@ fn lsp_lint_with_config() { "text": "// TODO: fixme\nexport async function non_camel_case() {\nconsole.log(\"finished!\")\n}" } })); - let diagnostics = diagnostics.viewed(); + let diagnostics = diagnostics.all(); assert_eq!(diagnostics.len(), 1); assert_eq!( diagnostics[0].code, @@ -7101,7 +7095,7 @@ fn lsp_lint_exclude_with_config() { } }), ); - let diagnostics = diagnostics.viewed(); + let diagnostics = diagnostics.all(); assert_eq!(diagnostics, Vec::new()); client.shutdown(); } @@ -7484,7 +7478,7 @@ fn lsp_data_urls_with_jsx_compiler_option() { "version": 1, "text": "import a from \"data:application/typescript,export default 5;\";\na;" } - })).viewed(); + })).all(); // there will be a diagnostic about not having cached the data url assert_eq!(diagnostics.len(), 1); @@ -7630,7 +7624,7 @@ fn lsp_node_modules_dir() { refresh_config(&mut client); let diagnostics = client.read_diagnostics(); - assert_eq!(diagnostics.viewed().len(), 2); // not cached + assert_eq!(diagnostics.all().len(), 2, "{:#?}", diagnostics); // not cached cache(&mut client); @@ -7647,7 +7641,7 @@ fn lsp_node_modules_dir() { cache(&mut client); let diagnostics = client.read_diagnostics(); - assert_eq!(diagnostics.viewed().len(), 0, "{:#?}", diagnostics); + assert_eq!(diagnostics.all().len(), 0, "{:#?}", diagnostics); assert!(temp_dir.path().join("deno.lock").exists()); diff --git a/test_util/src/lsp.rs b/test_util/src/lsp.rs index a7061543ff..5fbca7f354 100644 --- a/test_util/src/lsp.rs +++ b/test_util/src/lsp.rs @@ -87,6 +87,12 @@ impl<'a> From<&'a [u8]> for LspMessage { } } +#[derive(Debug, Deserialize)] +struct DiagnosticBatchNotificationParams { + batch_index: usize, + messages_len: usize, +} + fn read_message(reader: &mut R) -> Result>> where R: io::Read + io::BufRead, @@ -174,6 +180,25 @@ impl LspStdoutReader { cvar.wait(&mut msg_queue); } } + + pub fn read_latest_message( + &mut self, + mut get_match: impl FnMut(&LspMessage) -> Option, + ) -> R { + let (msg_queue, cvar) = &*self.pending_messages; + let mut msg_queue = msg_queue.lock(); + loop { + for i in (0..msg_queue.len()).rev() { + let msg = &msg_queue[i]; + if let Some(result) = get_match(msg) { + let msg = msg_queue.remove(i); + self.read_messages.push(msg); + return result; + } + } + cvar.wait(&mut msg_queue); + } + } } pub struct InitializeParamsBuilder { @@ -485,6 +510,8 @@ impl LspClientBuilder { command .env("DENO_DIR", deno_dir.path()) .env("NPM_CONFIG_REGISTRY", npm_registry_url()) + // turn on diagnostic synchronization communication + .env("DENO_DONT_USE_INTERNAL_LSP_DIAGNOSTIC_SYNC_FLAG", "1") .arg("lsp") .stdin(Stdio::piped()) .stdout(Stdio::piped()); @@ -510,7 +537,6 @@ impl LspClientBuilder { .unwrap_or_else(|| TestContextBuilder::new().build()), writer, deno_dir, - diagnosable_open_file_count: 0, }) } } @@ -523,7 +549,6 @@ pub struct LspClient { writer: io::BufWriter, deno_dir: TempDir, context: TestContext, - diagnosable_open_file_count: usize, } impl Drop for LspClient { @@ -609,20 +634,6 @@ impl LspClient { } pub fn did_open_raw(&mut self, params: Value) { - let text_doc = params - .as_object() - .unwrap() - .get("textDocument") - .unwrap() - .as_object() - .unwrap(); - if matches!( - text_doc.get("languageId").unwrap().as_str().unwrap(), - "typescript" | "javascript" - ) { - self.diagnosable_open_file_count += 1; - } - self.write_notification("textDocument/didOpen", params); } @@ -632,11 +643,46 @@ impl LspClient { self.write_response(id, result); } + fn get_latest_diagnostic_batch_index(&mut self) -> usize { + let result = self + .write_request("deno/internalLatestDiagnosticBatchIndex", json!(null)); + result.as_u64().unwrap() as usize + } + + /// Reads the latest diagnostics. It's assumed that pub fn read_diagnostics(&mut self) -> CollectedDiagnostics { - let mut all_diagnostics = Vec::new(); - for _ in 0..self.diagnosable_open_file_count { - all_diagnostics.extend(read_diagnostics(self).0); + // ask the server what the latest diagnostic batch index is + let latest_diagnostic_batch_index = + self.get_latest_diagnostic_batch_index(); + + // now wait for three (deno, lint, and typescript diagnostics) batch + // notification messages for that index + let mut read = 0; + let mut total_messages_len = 0; + while read < 3 { + let (method, response) = + self.read_notification::(); + assert_eq!(method, "deno/internalTestDiagnosticBatch"); + let response = response.unwrap(); + if response.batch_index == latest_diagnostic_batch_index { + read += 1; + total_messages_len += response.messages_len; + } } + + // now read the latest diagnostic messages + let mut all_diagnostics = Vec::with_capacity(total_messages_len); + let mut seen_files = HashSet::new(); + for _ in 0..total_messages_len { + let (method, response) = + self.read_latest_notification::(); + assert_eq!(method, "textDocument/publishDiagnostics"); + let response = response.unwrap(); + if seen_files.insert(response.uri.to_string()) { + all_diagnostics.push(response); + } + } + CollectedDiagnostics(all_diagnostics) } @@ -668,6 +714,19 @@ impl LspClient { }) } + pub fn read_latest_notification(&mut self) -> (String, Option) + where + R: de::DeserializeOwned, + { + self.reader.read_latest_message(|msg| match msg { + LspMessage::Notification(method, maybe_params) => { + let params = serde_json::from_value(maybe_params.clone()?).ok()?; + Some((method.to_string(), params)) + } + _ => None, + }) + } + pub fn read_notification_with_method( &mut self, expected_method: &str, @@ -819,35 +878,29 @@ impl LspClient { } #[derive(Debug, Clone)] -pub struct CollectedDiagnostics(pub Vec); +pub struct CollectedDiagnostics(Vec); impl CollectedDiagnostics { /// Gets the diagnostics that the editor will see after all the publishes. - pub fn viewed(&self) -> Vec { + pub fn all(&self) -> Vec { self - .viewed_messages() + .all_messages() .into_iter() .flat_map(|m| m.diagnostics) .collect() } /// Gets the messages that the editor will see after all the publishes. - pub fn viewed_messages(&self) -> Vec { - // go over the publishes in reverse order in order to get - // the final messages that will be shown in the editor - let mut messages = Vec::new(); - let mut had_specifier = HashSet::new(); - for message in self.0.iter().rev() { - if had_specifier.insert(message.uri.clone()) { - messages.insert(0, message.clone()); - } - } - messages + pub fn all_messages(&self) -> Vec { + self.0.clone() } - pub fn with_source(&self, source: &str) -> lsp::PublishDiagnosticsParams { + pub fn messages_with_source( + &self, + source: &str, + ) -> lsp::PublishDiagnosticsParams { self - .viewed_messages() + .all_messages() .iter() .find(|p| { p.diagnostics @@ -858,14 +911,14 @@ impl CollectedDiagnostics { .unwrap() } - pub fn with_file_and_source( + pub fn messages_with_file_and_source( &self, specifier: &str, source: &str, ) -> lsp::PublishDiagnosticsParams { let specifier = Url::parse(specifier).unwrap(); self - .viewed_messages() + .all_messages() .iter() .find(|p| { p.uri == specifier @@ -879,18 +932,6 @@ impl CollectedDiagnostics { } } -fn read_diagnostics(client: &mut LspClient) -> CollectedDiagnostics { - // diagnostics come in batches of three unless they're cancelled - let mut diagnostics = vec![]; - for _ in 0..3 { - let (method, response) = - client.read_notification::(); - assert_eq!(method, "textDocument/publishDiagnostics"); - diagnostics.push(response.unwrap()); - } - CollectedDiagnostics(diagnostics) -} - #[cfg(test)] mod tests { use super::*;