refactor: move deno_core::TaskQueue to cli::util::sync (#20481)

TaskQueue is being removed from `deno_core` and replaced with an unsync
version in deno_unsyc.

https://github.com/denoland/deno_core/pull/193

This is a change in preparation for that. The remaining
`deno_core::TaskQueue` usage in this repo should be replaced with
`deno_core::unsync::TaskQueue` once upgraded.
This commit is contained in:
David Sherret 2023-09-13 17:53:07 -04:00 committed by GitHub
parent 022664aab4
commit 12a75e3b43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 263 additions and 7 deletions

View file

@ -13,6 +13,8 @@ use crate::npm::CliNpmResolver;
use crate::resolver::CliGraphResolver;
use crate::tools::check;
use crate::tools::check::TypeChecker;
use crate::util::sync::TaskQueue;
use crate::util::sync::TaskQueuePermit;
use deno_core::anyhow::bail;
use deno_core::error::custom_error;
@ -20,8 +22,6 @@ use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use deno_core::parking_lot::RwLock;
use deno_core::ModuleSpecifier;
use deno_core::TaskQueue;
use deno_core::TaskQueuePermit;
use deno_graph::source::Loader;
use deno_graph::GraphKind;
use deno_graph::Module;

View file

@ -18,7 +18,6 @@ use deno_core::futures::FutureExt;
use deno_core::parking_lot::Mutex;
use deno_core::serde_json;
use deno_core::url::Url;
use deno_core::TaskQueue;
use deno_npm::registry::NpmPackageInfo;
use deno_npm::registry::NpmRegistryApi;
use deno_npm::registry::NpmRegistryPackageInfoLoadError;
@ -30,6 +29,7 @@ use crate::http_util::HttpClient;
use crate::util::fs::atomic_write_file;
use crate::util::progress_bar::ProgressBar;
use crate::util::sync::AtomicFlag;
use crate::util::sync::TaskQueue;
use super::cache::should_sync_download;
use super::cache::NpmCache;
@ -118,7 +118,7 @@ impl NpmRegistryApi for CliNpmRegistryApi {
let result = if should_sync_download() {
let inner = self.inner().clone();
SYNC_DOWNLOAD_TASK_QUEUE
.queue(async move { inner.maybe_package_info(name).await })
.run(async move { inner.maybe_package_info(name).await })
.await
} else {
self.inner().maybe_package_info(name).await

View file

@ -7,7 +7,6 @@ use std::sync::Arc;
use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use deno_core::parking_lot::RwLock;
use deno_core::TaskQueue;
use deno_lockfile::NpmPackageDependencyLockfileInfo;
use deno_lockfile::NpmPackageLockfileInfo;
use deno_npm::registry::NpmPackageInfo;
@ -32,6 +31,7 @@ use deno_semver::package::PackageReq;
use deno_semver::VersionReq;
use crate::args::Lockfile;
use crate::util::sync::TaskQueue;
use super::registry::CliNpmRegistryApi;

View file

@ -7,7 +7,6 @@ use deno_core::futures::future;
use deno_core::futures::future::LocalBoxFuture;
use deno_core::futures::FutureExt;
use deno_core::ModuleSpecifier;
use deno_core::TaskQueue;
use deno_graph::source::NpmPackageReqResolution;
use deno_graph::source::NpmResolver;
use deno_graph::source::Resolver;
@ -27,6 +26,7 @@ use crate::npm::CliNpmRegistryApi;
use crate::npm::NpmResolution;
use crate::npm::PackageJsonDepsInstaller;
use crate::util::sync::AtomicFlag;
use crate::util::sync::TaskQueue;
/// Result of checking if a specifier is mapped via
/// an import map or package.json.

View file

@ -1,7 +1,13 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use deno_core::futures::task::AtomicWaker;
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;
/// Simplifies the use of an atomic boolean as a flag.
#[derive(Debug, Default)]
@ -19,9 +25,147 @@ impl AtomicFlag {
}
}
#[derive(Debug, Default)]
struct TaskQueueTaskItem {
is_ready: AtomicFlag,
is_future_dropped: AtomicFlag,
waker: AtomicWaker,
}
#[derive(Debug, Default)]
struct TaskQueueTasks {
is_running: bool,
items: LinkedList<Arc<TaskQueueTaskItem>>,
}
/// A queue that executes tasks sequentially one after the other
/// ensuring order and that no task runs at the same time as another.
///
/// Note that this differs from tokio's semaphore in that the order
/// is acquired synchronously.
#[derive(Debug, Default)]
pub struct TaskQueue {
tasks: Mutex<TaskQueueTasks>,
}
impl TaskQueue {
/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub fn acquire(&self) -> TaskQueuePermitAcquireFuture {
TaskQueuePermitAcquireFuture::new(self)
}
/// Alternate API that acquires a permit internally
/// for the duration of the future.
pub fn run<'a, R>(
&'a self,
future: impl Future<Output = R> + 'a,
) -> impl Future<Output = R> + 'a {
let acquire_future = self.acquire();
async move {
let permit = acquire_future.await;
let result = future.await;
drop(permit); // explicit for clarity
result
}
}
fn raise_next(&self) {
let front_item = {
let mut tasks = self.tasks.lock();
// clear out any wakers for futures that were dropped
while let Some(front_waker) = tasks.items.front() {
if front_waker.is_future_dropped.is_raised() {
tasks.items.pop_front();
} else {
break;
}
}
let front_item = tasks.items.pop_front();
tasks.is_running = front_item.is_some();
front_item
};
// wake up the next waker
if let Some(front_item) = front_item {
front_item.is_ready.raise();
front_item.waker.wake();
}
}
}
/// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit<'a>(&'a TaskQueue);
impl<'a> Drop for TaskQueuePermit<'a> {
fn drop(&mut self) {
self.0.raise_next();
}
}
pub struct TaskQueuePermitAcquireFuture<'a> {
task_queue: Option<&'a TaskQueue>,
item: Arc<TaskQueueTaskItem>,
}
impl<'a> TaskQueuePermitAcquireFuture<'a> {
pub fn new(task_queue: &'a TaskQueue) -> Self {
// acquire the waker position synchronously
let mut tasks = task_queue.tasks.lock();
let item = if !tasks.is_running {
tasks.is_running = true;
let item = Arc::new(TaskQueueTaskItem::default());
item.is_ready.raise();
item
} else {
let item = Arc::new(TaskQueueTaskItem::default());
tasks.items.push_back(item.clone());
item
};
drop(tasks);
Self {
task_queue: Some(task_queue),
item,
}
}
}
impl<'a> Drop for TaskQueuePermitAcquireFuture<'a> {
fn drop(&mut self) {
if let Some(task_queue) = self.task_queue.take() {
if self.item.is_ready.is_raised() {
task_queue.raise_next();
} else {
self.item.is_future_dropped.raise();
}
}
}
}
impl<'a> Future for TaskQueuePermitAcquireFuture<'a> {
type Output = TaskQueuePermit<'a>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.item.is_ready.is_raised() {
std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap()))
} else {
self.item.waker.register(cx.waker());
std::task::Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::AtomicFlag;
use deno_core::futures;
use deno_core::parking_lot::Mutex;
use std::sync::Arc;
use super::*;
#[test]
fn atomic_flag_raises() {
@ -32,4 +176,116 @@ mod test {
assert!(!flag.raise());
assert!(flag.is_raised());
}
#[tokio::test]
async fn task_queue_runs_one_after_other() {
let task_queue = TaskQueue::default();
let mut tasks = Vec::new();
let data = Arc::new(Mutex::new(0));
for i in 0..100 {
let data = data.clone();
tasks.push(task_queue.run(async move {
deno_core::unsync::spawn_blocking(move || {
let mut data = data.lock();
assert_eq!(*data, i);
*data = i + 1;
})
.await
.unwrap();
}));
}
futures::future::join_all(tasks).await;
}
#[tokio::test]
async fn task_queue_run_in_sequence() {
let task_queue = TaskQueue::default();
let data = Arc::new(Mutex::new(0));
let first = task_queue.run(async {
*data.lock() = 1;
});
let second = task_queue.run(async {
assert_eq!(*data.lock(), 1);
*data.lock() = 2;
});
let _ = tokio::join!(first, second);
assert_eq!(*data.lock(), 2);
}
#[tokio::test]
async fn task_queue_future_dropped_before_poll() {
let task_queue = Arc::new(TaskQueue::default());
// acquire a future, but do not await it
let future = task_queue.acquire();
// this task tries to acquire another permit, but will be blocked by the first permit.
let enter_flag = Arc::new(AtomicFlag::default());
let delayed_task = deno_core::unsync::spawn({
let enter_flag = enter_flag.clone();
let task_queue = task_queue.clone();
async move {
enter_flag.raise();
task_queue.acquire().await;
true
}
});
// ensure the task gets a chance to be scheduled and blocked
tokio::task::yield_now().await;
assert!(enter_flag.is_raised());
// now, drop the first future
drop(future);
assert!(delayed_task.await.unwrap());
}
#[tokio::test]
async fn task_queue_many_future_dropped_before_poll() {
let task_queue = Arc::new(TaskQueue::default());
// acquire a future, but do not await it
let mut futures = Vec::new();
for _ in 0..=10_000 {
futures.push(task_queue.acquire());
}
// this task tries to acquire another permit, but will be blocked by the first permit.
let enter_flag = Arc::new(AtomicFlag::default());
let delayed_task = deno_core::unsync::spawn({
let task_queue = task_queue.clone();
let enter_flag = enter_flag.clone();
async move {
enter_flag.raise();
task_queue.acquire().await;
true
}
});
// ensure the task gets a chance to be scheduled and blocked
tokio::task::yield_now().await;
assert!(enter_flag.is_raised());
// now, drop the futures
drop(futures);
assert!(delayed_task.await.unwrap());
}
#[tokio::test]
async fn task_queue_middle_future_dropped_while_permit_acquired() {
let task_queue = TaskQueue::default();
let fut1 = task_queue.acquire();
let fut2 = task_queue.acquire();
let fut3 = task_queue.acquire();
// should not hang
drop(fut2);
drop(fut1.await);
drop(fut3.await);
}
}