diff --git a/cli/graph_util.rs b/cli/graph_util.rs index 31645824d8..ecae9ea4e2 100644 --- a/cli/graph_util.rs +++ b/cli/graph_util.rs @@ -19,6 +19,8 @@ use deno_core::error::custom_error; use deno_core::error::AnyError; use deno_core::parking_lot::RwLock; use deno_core::ModuleSpecifier; +use deno_core::TaskQueue; +use deno_core::TaskQueuePermit; use deno_graph::Module; use deno_graph::ModuleGraph; use deno_graph::ModuleGraphError; @@ -29,8 +31,6 @@ use import_map::ImportMapError; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Semaphore; -use tokio::sync::SemaphorePermit; #[derive(Clone, Copy)] pub struct GraphValidOptions { @@ -318,27 +318,21 @@ struct GraphData { } /// Holds the `ModuleGraph` and what parts of it are type checked. -#[derive(Clone)] +#[derive(Clone, Default)] pub struct ModuleGraphContainer { - update_semaphore: Arc, + // Allow only one request to update the graph data at a time, + // but allow other requests to read from it at any time even + // while another request is updating the data. + update_queue: Arc, graph_data: Arc>, } -impl Default for ModuleGraphContainer { - fn default() -> Self { - Self { - update_semaphore: Arc::new(Semaphore::new(1)), - graph_data: Default::default(), - } - } -} - impl ModuleGraphContainer { /// Acquires a permit to modify the module graph without other code /// having the chance to modify it. In the meantime, other code may /// still read from the existing module graph. pub async fn acquire_update_permit(&self) -> ModuleGraphUpdatePermit { - let permit = self.update_semaphore.acquire().await.unwrap(); + let permit = self.update_queue.acquire().await; ModuleGraphUpdatePermit { permit, graph_data: self.graph_data.clone(), @@ -395,7 +389,7 @@ impl ModuleGraphContainer { /// everything looks fine, calling `.commit()` will store the /// new graph in the ModuleGraphContainer. pub struct ModuleGraphUpdatePermit<'a> { - permit: SemaphorePermit<'a>, + permit: TaskQueuePermit<'a>, graph_data: Arc>, graph: ModuleGraph, } diff --git a/cli/npm/resolution/mod.rs b/cli/npm/resolution/mod.rs index f43f3c5cb0..e1e3307c30 100644 --- a/cli/npm/resolution/mod.rs +++ b/cli/npm/resolution/mod.rs @@ -10,6 +10,7 @@ use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; +use deno_core::TaskQueue; use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageNvReference; use deno_graph::npm::NpmPackageReq; @@ -241,7 +242,7 @@ pub struct NpmResolution(Arc); struct NpmResolutionInner { api: NpmRegistryApi, snapshot: RwLock, - update_semaphore: tokio::sync::Semaphore, + update_queue: TaskQueue, maybe_lockfile: Option>>, } @@ -263,7 +264,7 @@ impl NpmResolution { Self(Arc::new(NpmResolutionInner { api, snapshot: RwLock::new(initial_snapshot.unwrap_or_default()), - update_semaphore: tokio::sync::Semaphore::new(1), + update_queue: Default::default(), maybe_lockfile, })) } @@ -275,7 +276,7 @@ impl NpmResolution { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let snapshot = add_package_reqs_to_snapshot( @@ -296,7 +297,7 @@ impl NpmResolution { ) -> Result<(), AnyError> { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let reqs_set = package_reqs.iter().collect::>(); @@ -326,7 +327,7 @@ impl NpmResolution { pub async fn resolve_pending(&self) -> Result<(), AnyError> { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let snapshot = add_package_reqs_to_snapshot( diff --git a/cli/resolver.rs b/cli/resolver.rs index 46ae16a67e..b113fc4708 100644 --- a/cli/resolver.rs +++ b/cli/resolver.rs @@ -7,6 +7,7 @@ use deno_core::futures::future; use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::ModuleSpecifier; +use deno_core::TaskQueue; use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageReq; use deno_graph::source::NpmResolver; @@ -34,7 +35,7 @@ pub struct CliGraphResolver { npm_registry_api: NpmRegistryApi, npm_resolution: NpmResolution, package_json_deps_installer: PackageJsonDepsInstaller, - sync_download_semaphore: Option>, + sync_download_queue: Option>, } impl Default for CliGraphResolver { @@ -52,7 +53,7 @@ impl Default for CliGraphResolver { npm_registry_api, npm_resolution, package_json_deps_installer: Default::default(), - sync_download_semaphore: Self::create_sync_download_semaphore(), + sync_download_queue: Self::create_sync_download_queue(), } } } @@ -77,13 +78,13 @@ impl CliGraphResolver { npm_registry_api, npm_resolution, package_json_deps_installer, - sync_download_semaphore: Self::create_sync_download_semaphore(), + sync_download_queue: Self::create_sync_download_queue(), } } - fn create_sync_download_semaphore() -> Option> { + fn create_sync_download_queue() -> Option> { if crate::npm::should_sync_download() { - Some(Arc::new(tokio::sync::Semaphore::new(1))) + Some(Default::default()) } else { None } @@ -194,10 +195,10 @@ impl NpmResolver for CliGraphResolver { let package_name = package_name.to_string(); let api = self.npm_registry_api.clone(); let deps_installer = self.package_json_deps_installer.clone(); - let maybe_sync_download_semaphore = self.sync_download_semaphore.clone(); + let maybe_sync_download_queue = self.sync_download_queue.clone(); async move { - let permit = if let Some(semaphore) = &maybe_sync_download_semaphore { - Some(semaphore.acquire().await.unwrap()) + let permit = if let Some(task_queue) = &maybe_sync_download_queue { + Some(task_queue.acquire().await) } else { None }; diff --git a/core/lib.rs b/core/lib.rs index 7ec40e3119..1c60db6944 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -118,6 +118,7 @@ pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX; pub use crate::runtime::V8_WRAPPER_TYPE_INDEX; pub use crate::source_map::SourceMapGetter; pub use crate::task_queue::TaskQueue; +pub use crate::task_queue::TaskQueuePermit; pub fn v8_version() -> &'static str { v8::V8::get_version() diff --git a/core/task_queue.rs b/core/task_queue.rs index 839c47655c..36a169650c 100644 --- a/core/task_queue.rs +++ b/core/task_queue.rs @@ -8,13 +8,13 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -#[derive(Default)] +#[derive(Debug, Default)] struct TaskQueueTaskWaker { is_ready: AtomicBool, waker: AtomicWaker, } -#[derive(Default)] +#[derive(Debug, Default)] struct TaskQueueTasks { is_running: bool, wakers: LinkedList>, @@ -26,40 +26,35 @@ struct TaskQueueTasks { /// Note that tokio's semaphore doesn't seem to maintain order /// and so we can't use that in the code that uses this or use /// that here. -#[derive(Clone, Default)] +#[derive(Debug, Default)] pub struct TaskQueue { - tasks: Arc>, + tasks: Mutex, } impl TaskQueue { + /// Acquires a permit where the tasks are executed one at a time + /// and in the order that they were acquired. + pub async fn acquire(&self) -> TaskQueuePermit { + let acquire = TaskQueuePermitAcquire::new(self); + acquire.await; + TaskQueuePermit(self) + } + /// Alternate API that acquires a permit internally /// for the duration of the future. - #[cfg(test)] pub async fn queue(&self, future: impl Future) -> R { let _permit = self.acquire().await; future.await } - - /// Acquires a permit where the tasks are executed one at a time - /// and in the order that they were acquired. - pub async fn acquire(&self) -> TaskQueuePermit { - let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); - acquire.await; - TaskQueuePermit { - tasks: self.tasks.clone(), - } - } } /// A permit that when dropped will allow another task to proceed. -pub struct TaskQueuePermit { - tasks: Arc>, -} +pub struct TaskQueuePermit<'a>(&'a TaskQueue); -impl Drop for TaskQueuePermit { +impl<'a> Drop for TaskQueuePermit<'a> { fn drop(&mut self) { let next_item = { - let mut tasks = self.tasks.lock(); + let mut tasks = self.0.tasks.lock(); let next_item = tasks.wakers.pop_front(); tasks.is_running = next_item.is_some(); next_item @@ -71,23 +66,23 @@ impl Drop for TaskQueuePermit { } } -struct TaskQueuePermitAcquire { - tasks: Arc>, +struct TaskQueuePermitAcquire<'a> { + task_queue: &'a TaskQueue, initialized: AtomicBool, waker: Arc, } -impl TaskQueuePermitAcquire { - pub fn new(tasks: Arc>) -> Self { +impl<'a> TaskQueuePermitAcquire<'a> { + pub fn new(task_queue: &'a TaskQueue) -> Self { Self { - tasks, + task_queue, initialized: Default::default(), waker: Default::default(), } } } -impl Future for TaskQueuePermitAcquire { +impl<'a> Future for TaskQueuePermitAcquire<'a> { type Output = (); fn poll( @@ -99,7 +94,7 @@ impl Future for TaskQueuePermitAcquire { // ensure this is initialized if !self.initialized.swap(true, Ordering::SeqCst) { - let mut tasks = self.tasks.lock(); + let mut tasks = self.task_queue.tasks.lock(); if !tasks.is_running { tasks.is_running = true; return std::task::Poll::Ready(());