diff --git a/cli/graph_util.rs b/cli/graph_util.rs index 6fb4b77902..1a5a293a71 100644 --- a/cli/graph_util.rs +++ b/cli/graph_util.rs @@ -13,6 +13,8 @@ use crate::npm::CliNpmResolver; use crate::resolver::CliGraphResolver; use crate::tools::check; use crate::tools::check::TypeChecker; +use crate::util::sync::TaskQueue; +use crate::util::sync::TaskQueuePermit; use deno_core::anyhow::bail; use deno_core::error::custom_error; @@ -20,8 +22,6 @@ use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; use deno_core::ModuleSpecifier; -use deno_core::TaskQueue; -use deno_core::TaskQueuePermit; use deno_graph::source::Loader; use deno_graph::GraphKind; use deno_graph::Module; diff --git a/cli/npm/registry.rs b/cli/npm/registry.rs index 907258d3b7..ec0647023a 100644 --- a/cli/npm/registry.rs +++ b/cli/npm/registry.rs @@ -18,7 +18,6 @@ use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; use deno_core::serde_json; use deno_core::url::Url; -use deno_core::TaskQueue; use deno_npm::registry::NpmPackageInfo; use deno_npm::registry::NpmRegistryApi; use deno_npm::registry::NpmRegistryPackageInfoLoadError; @@ -30,6 +29,7 @@ use crate::http_util::HttpClient; use crate::util::fs::atomic_write_file; use crate::util::progress_bar::ProgressBar; use crate::util::sync::AtomicFlag; +use crate::util::sync::TaskQueue; use super::cache::should_sync_download; use super::cache::NpmCache; @@ -118,7 +118,7 @@ impl NpmRegistryApi for CliNpmRegistryApi { let result = if should_sync_download() { let inner = self.inner().clone(); SYNC_DOWNLOAD_TASK_QUEUE - .queue(async move { inner.maybe_package_info(name).await }) + .run(async move { inner.maybe_package_info(name).await }) .await } else { self.inner().maybe_package_info(name).await diff --git a/cli/npm/resolution.rs b/cli/npm/resolution.rs index 73e27f487b..10ff5fd92b 100644 --- a/cli/npm/resolution.rs +++ b/cli/npm/resolution.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; -use deno_core::TaskQueue; use deno_lockfile::NpmPackageDependencyLockfileInfo; use deno_lockfile::NpmPackageLockfileInfo; use deno_npm::registry::NpmPackageInfo; @@ -32,6 +31,7 @@ use deno_semver::package::PackageReq; use deno_semver::VersionReq; use crate::args::Lockfile; +use crate::util::sync::TaskQueue; use super::registry::CliNpmRegistryApi; diff --git a/cli/resolver.rs b/cli/resolver.rs index 4fb9127313..a7b2cd01ea 100644 --- a/cli/resolver.rs +++ b/cli/resolver.rs @@ -7,7 +7,6 @@ use deno_core::futures::future; use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::ModuleSpecifier; -use deno_core::TaskQueue; use deno_graph::source::NpmPackageReqResolution; use deno_graph::source::NpmResolver; use deno_graph::source::Resolver; @@ -27,6 +26,7 @@ use crate::npm::CliNpmRegistryApi; use crate::npm::NpmResolution; use crate::npm::PackageJsonDepsInstaller; use crate::util::sync::AtomicFlag; +use crate::util::sync::TaskQueue; /// Result of checking if a specifier is mapped via /// an import map or package.json. diff --git a/cli/util/sync.rs b/cli/util/sync.rs index 6d136abc1f..6eff974a7e 100644 --- a/cli/util/sync.rs +++ b/cli/util/sync.rs @@ -1,7 +1,13 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::collections::LinkedList; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::Arc; + +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::Future; +use deno_core::parking_lot::Mutex; /// Simplifies the use of an atomic boolean as a flag. #[derive(Debug, Default)] @@ -19,9 +25,147 @@ impl AtomicFlag { } } +#[derive(Debug, Default)] +struct TaskQueueTaskItem { + is_ready: AtomicFlag, + is_future_dropped: AtomicFlag, + waker: AtomicWaker, +} + +#[derive(Debug, Default)] +struct TaskQueueTasks { + is_running: bool, + items: LinkedList>, +} + +/// A queue that executes tasks sequentially one after the other +/// ensuring order and that no task runs at the same time as another. +/// +/// Note that this differs from tokio's semaphore in that the order +/// is acquired synchronously. +#[derive(Debug, Default)] +pub struct TaskQueue { + tasks: Mutex, +} + +impl TaskQueue { + /// Acquires a permit where the tasks are executed one at a time + /// and in the order that they were acquired. + pub fn acquire(&self) -> TaskQueuePermitAcquireFuture { + TaskQueuePermitAcquireFuture::new(self) + } + + /// Alternate API that acquires a permit internally + /// for the duration of the future. + pub fn run<'a, R>( + &'a self, + future: impl Future + 'a, + ) -> impl Future + 'a { + let acquire_future = self.acquire(); + async move { + let permit = acquire_future.await; + let result = future.await; + drop(permit); // explicit for clarity + result + } + } + + fn raise_next(&self) { + let front_item = { + let mut tasks = self.tasks.lock(); + + // clear out any wakers for futures that were dropped + while let Some(front_waker) = tasks.items.front() { + if front_waker.is_future_dropped.is_raised() { + tasks.items.pop_front(); + } else { + break; + } + } + let front_item = tasks.items.pop_front(); + tasks.is_running = front_item.is_some(); + front_item + }; + + // wake up the next waker + if let Some(front_item) = front_item { + front_item.is_ready.raise(); + front_item.waker.wake(); + } + } +} + +/// A permit that when dropped will allow another task to proceed. +pub struct TaskQueuePermit<'a>(&'a TaskQueue); + +impl<'a> Drop for TaskQueuePermit<'a> { + fn drop(&mut self) { + self.0.raise_next(); + } +} + +pub struct TaskQueuePermitAcquireFuture<'a> { + task_queue: Option<&'a TaskQueue>, + item: Arc, +} + +impl<'a> TaskQueuePermitAcquireFuture<'a> { + pub fn new(task_queue: &'a TaskQueue) -> Self { + // acquire the waker position synchronously + let mut tasks = task_queue.tasks.lock(); + let item = if !tasks.is_running { + tasks.is_running = true; + let item = Arc::new(TaskQueueTaskItem::default()); + item.is_ready.raise(); + item + } else { + let item = Arc::new(TaskQueueTaskItem::default()); + tasks.items.push_back(item.clone()); + item + }; + drop(tasks); + Self { + task_queue: Some(task_queue), + item, + } + } +} + +impl<'a> Drop for TaskQueuePermitAcquireFuture<'a> { + fn drop(&mut self) { + if let Some(task_queue) = self.task_queue.take() { + if self.item.is_ready.is_raised() { + task_queue.raise_next(); + } else { + self.item.is_future_dropped.raise(); + } + } + } +} + +impl<'a> Future for TaskQueuePermitAcquireFuture<'a> { + type Output = TaskQueuePermit<'a>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + if self.item.is_ready.is_raised() { + std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap())) + } else { + self.item.waker.register(cx.waker()); + std::task::Poll::Pending + } + } +} + #[cfg(test)] mod test { - use super::AtomicFlag; + use deno_core::futures; + use deno_core::parking_lot::Mutex; + use std::sync::Arc; + + use super::*; #[test] fn atomic_flag_raises() { @@ -32,4 +176,116 @@ mod test { assert!(!flag.raise()); assert!(flag.is_raised()); } + + #[tokio::test] + async fn task_queue_runs_one_after_other() { + let task_queue = TaskQueue::default(); + let mut tasks = Vec::new(); + let data = Arc::new(Mutex::new(0)); + for i in 0..100 { + let data = data.clone(); + tasks.push(task_queue.run(async move { + deno_core::unsync::spawn_blocking(move || { + let mut data = data.lock(); + assert_eq!(*data, i); + *data = i + 1; + }) + .await + .unwrap(); + })); + } + futures::future::join_all(tasks).await; + } + + #[tokio::test] + async fn task_queue_run_in_sequence() { + let task_queue = TaskQueue::default(); + let data = Arc::new(Mutex::new(0)); + + let first = task_queue.run(async { + *data.lock() = 1; + }); + let second = task_queue.run(async { + assert_eq!(*data.lock(), 1); + *data.lock() = 2; + }); + let _ = tokio::join!(first, second); + + assert_eq!(*data.lock(), 2); + } + + #[tokio::test] + async fn task_queue_future_dropped_before_poll() { + let task_queue = Arc::new(TaskQueue::default()); + + // acquire a future, but do not await it + let future = task_queue.acquire(); + + // this task tries to acquire another permit, but will be blocked by the first permit. + let enter_flag = Arc::new(AtomicFlag::default()); + let delayed_task = deno_core::unsync::spawn({ + let enter_flag = enter_flag.clone(); + let task_queue = task_queue.clone(); + async move { + enter_flag.raise(); + task_queue.acquire().await; + true + } + }); + + // ensure the task gets a chance to be scheduled and blocked + tokio::task::yield_now().await; + assert!(enter_flag.is_raised()); + + // now, drop the first future + drop(future); + + assert!(delayed_task.await.unwrap()); + } + + #[tokio::test] + async fn task_queue_many_future_dropped_before_poll() { + let task_queue = Arc::new(TaskQueue::default()); + + // acquire a future, but do not await it + let mut futures = Vec::new(); + for _ in 0..=10_000 { + futures.push(task_queue.acquire()); + } + + // this task tries to acquire another permit, but will be blocked by the first permit. + let enter_flag = Arc::new(AtomicFlag::default()); + let delayed_task = deno_core::unsync::spawn({ + let task_queue = task_queue.clone(); + let enter_flag = enter_flag.clone(); + async move { + enter_flag.raise(); + task_queue.acquire().await; + true + } + }); + + // ensure the task gets a chance to be scheduled and blocked + tokio::task::yield_now().await; + assert!(enter_flag.is_raised()); + + // now, drop the futures + drop(futures); + + assert!(delayed_task.await.unwrap()); + } + + #[tokio::test] + async fn task_queue_middle_future_dropped_while_permit_acquired() { + let task_queue = TaskQueue::default(); + + let fut1 = task_queue.acquire(); + let fut2 = task_queue.acquire(); + let fut3 = task_queue.acquire(); + + // should not hang + drop(fut2); + drop(fut1.await); + drop(fut3.await); + } }