refactor: remove Semaphore::new(1) and use TaskQueue (#18014)

This commit is contained in:
David Sherret 2023-03-04 20:07:11 -05:00 committed by GitHub
parent 7afa3aceb0
commit 2f7222da8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 55 deletions

View file

@ -19,6 +19,8 @@ use deno_core::error::custom_error;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::parking_lot::RwLock; use deno_core::parking_lot::RwLock;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
use deno_core::TaskQueue;
use deno_core::TaskQueuePermit;
use deno_graph::Module; use deno_graph::Module;
use deno_graph::ModuleGraph; use deno_graph::ModuleGraph;
use deno_graph::ModuleGraphError; use deno_graph::ModuleGraphError;
@ -29,8 +31,6 @@ use import_map::ImportMapError;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::sync::SemaphorePermit;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub struct GraphValidOptions { pub struct GraphValidOptions {
@ -318,27 +318,21 @@ struct GraphData {
} }
/// Holds the `ModuleGraph` and what parts of it are type checked. /// Holds the `ModuleGraph` and what parts of it are type checked.
#[derive(Clone)] #[derive(Clone, Default)]
pub struct ModuleGraphContainer { pub struct ModuleGraphContainer {
update_semaphore: Arc<Semaphore>, // Allow only one request to update the graph data at a time,
// but allow other requests to read from it at any time even
// while another request is updating the data.
update_queue: Arc<TaskQueue>,
graph_data: Arc<RwLock<GraphData>>, graph_data: Arc<RwLock<GraphData>>,
} }
impl Default for ModuleGraphContainer {
fn default() -> Self {
Self {
update_semaphore: Arc::new(Semaphore::new(1)),
graph_data: Default::default(),
}
}
}
impl ModuleGraphContainer { impl ModuleGraphContainer {
/// Acquires a permit to modify the module graph without other code /// Acquires a permit to modify the module graph without other code
/// having the chance to modify it. In the meantime, other code may /// having the chance to modify it. In the meantime, other code may
/// still read from the existing module graph. /// still read from the existing module graph.
pub async fn acquire_update_permit(&self) -> ModuleGraphUpdatePermit { pub async fn acquire_update_permit(&self) -> ModuleGraphUpdatePermit {
let permit = self.update_semaphore.acquire().await.unwrap(); let permit = self.update_queue.acquire().await;
ModuleGraphUpdatePermit { ModuleGraphUpdatePermit {
permit, permit,
graph_data: self.graph_data.clone(), graph_data: self.graph_data.clone(),
@ -395,7 +389,7 @@ impl ModuleGraphContainer {
/// everything looks fine, calling `.commit()` will store the /// everything looks fine, calling `.commit()` will store the
/// new graph in the ModuleGraphContainer. /// new graph in the ModuleGraphContainer.
pub struct ModuleGraphUpdatePermit<'a> { pub struct ModuleGraphUpdatePermit<'a> {
permit: SemaphorePermit<'a>, permit: TaskQueuePermit<'a>,
graph_data: Arc<RwLock<GraphData>>, graph_data: Arc<RwLock<GraphData>>,
graph: ModuleGraph, graph: ModuleGraph,
} }

View file

@ -10,6 +10,7 @@ use deno_core::anyhow::Context;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex; use deno_core::parking_lot::Mutex;
use deno_core::parking_lot::RwLock; use deno_core::parking_lot::RwLock;
use deno_core::TaskQueue;
use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageNv;
use deno_graph::npm::NpmPackageNvReference; use deno_graph::npm::NpmPackageNvReference;
use deno_graph::npm::NpmPackageReq; use deno_graph::npm::NpmPackageReq;
@ -241,7 +242,7 @@ pub struct NpmResolution(Arc<NpmResolutionInner>);
struct NpmResolutionInner { struct NpmResolutionInner {
api: NpmRegistryApi, api: NpmRegistryApi,
snapshot: RwLock<NpmResolutionSnapshot>, snapshot: RwLock<NpmResolutionSnapshot>,
update_semaphore: tokio::sync::Semaphore, update_queue: TaskQueue,
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
} }
@ -263,7 +264,7 @@ impl NpmResolution {
Self(Arc::new(NpmResolutionInner { Self(Arc::new(NpmResolutionInner {
api, api,
snapshot: RwLock::new(initial_snapshot.unwrap_or_default()), snapshot: RwLock::new(initial_snapshot.unwrap_or_default()),
update_semaphore: tokio::sync::Semaphore::new(1), update_queue: Default::default(),
maybe_lockfile, maybe_lockfile,
})) }))
} }
@ -275,7 +276,7 @@ impl NpmResolution {
let inner = &self.0; let inner = &self.0;
// only allow one thread in here at a time // only allow one thread in here at a time
let _permit = inner.update_semaphore.acquire().await?; let _permit = inner.update_queue.acquire().await;
let snapshot = inner.snapshot.read().clone(); let snapshot = inner.snapshot.read().clone();
let snapshot = add_package_reqs_to_snapshot( let snapshot = add_package_reqs_to_snapshot(
@ -296,7 +297,7 @@ impl NpmResolution {
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let inner = &self.0; let inner = &self.0;
// only allow one thread in here at a time // only allow one thread in here at a time
let _permit = inner.update_semaphore.acquire().await?; let _permit = inner.update_queue.acquire().await;
let snapshot = inner.snapshot.read().clone(); let snapshot = inner.snapshot.read().clone();
let reqs_set = package_reqs.iter().collect::<HashSet<_>>(); let reqs_set = package_reqs.iter().collect::<HashSet<_>>();
@ -326,7 +327,7 @@ impl NpmResolution {
pub async fn resolve_pending(&self) -> Result<(), AnyError> { pub async fn resolve_pending(&self) -> Result<(), AnyError> {
let inner = &self.0; let inner = &self.0;
// only allow one thread in here at a time // only allow one thread in here at a time
let _permit = inner.update_semaphore.acquire().await?; let _permit = inner.update_queue.acquire().await;
let snapshot = inner.snapshot.read().clone(); let snapshot = inner.snapshot.read().clone();
let snapshot = add_package_reqs_to_snapshot( let snapshot = add_package_reqs_to_snapshot(

View file

@ -7,6 +7,7 @@ use deno_core::futures::future;
use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::future::LocalBoxFuture;
use deno_core::futures::FutureExt; use deno_core::futures::FutureExt;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
use deno_core::TaskQueue;
use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageNv;
use deno_graph::npm::NpmPackageReq; use deno_graph::npm::NpmPackageReq;
use deno_graph::source::NpmResolver; use deno_graph::source::NpmResolver;
@ -34,7 +35,7 @@ pub struct CliGraphResolver {
npm_registry_api: NpmRegistryApi, npm_registry_api: NpmRegistryApi,
npm_resolution: NpmResolution, npm_resolution: NpmResolution,
package_json_deps_installer: PackageJsonDepsInstaller, package_json_deps_installer: PackageJsonDepsInstaller,
sync_download_semaphore: Option<Arc<tokio::sync::Semaphore>>, sync_download_queue: Option<Arc<TaskQueue>>,
} }
impl Default for CliGraphResolver { impl Default for CliGraphResolver {
@ -52,7 +53,7 @@ impl Default for CliGraphResolver {
npm_registry_api, npm_registry_api,
npm_resolution, npm_resolution,
package_json_deps_installer: Default::default(), package_json_deps_installer: Default::default(),
sync_download_semaphore: Self::create_sync_download_semaphore(), sync_download_queue: Self::create_sync_download_queue(),
} }
} }
} }
@ -77,13 +78,13 @@ impl CliGraphResolver {
npm_registry_api, npm_registry_api,
npm_resolution, npm_resolution,
package_json_deps_installer, package_json_deps_installer,
sync_download_semaphore: Self::create_sync_download_semaphore(), sync_download_queue: Self::create_sync_download_queue(),
} }
} }
fn create_sync_download_semaphore() -> Option<Arc<tokio::sync::Semaphore>> { fn create_sync_download_queue() -> Option<Arc<TaskQueue>> {
if crate::npm::should_sync_download() { if crate::npm::should_sync_download() {
Some(Arc::new(tokio::sync::Semaphore::new(1))) Some(Default::default())
} else { } else {
None None
} }
@ -194,10 +195,10 @@ impl NpmResolver for CliGraphResolver {
let package_name = package_name.to_string(); let package_name = package_name.to_string();
let api = self.npm_registry_api.clone(); let api = self.npm_registry_api.clone();
let deps_installer = self.package_json_deps_installer.clone(); let deps_installer = self.package_json_deps_installer.clone();
let maybe_sync_download_semaphore = self.sync_download_semaphore.clone(); let maybe_sync_download_queue = self.sync_download_queue.clone();
async move { async move {
let permit = if let Some(semaphore) = &maybe_sync_download_semaphore { let permit = if let Some(task_queue) = &maybe_sync_download_queue {
Some(semaphore.acquire().await.unwrap()) Some(task_queue.acquire().await)
} else { } else {
None None
}; };

View file

@ -118,6 +118,7 @@ pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX; pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
pub use crate::source_map::SourceMapGetter; pub use crate::source_map::SourceMapGetter;
pub use crate::task_queue::TaskQueue; pub use crate::task_queue::TaskQueue;
pub use crate::task_queue::TaskQueuePermit;
pub fn v8_version() -> &'static str { pub fn v8_version() -> &'static str {
v8::V8::get_version() v8::V8::get_version()

View file

@ -8,13 +8,13 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
#[derive(Default)] #[derive(Debug, Default)]
struct TaskQueueTaskWaker { struct TaskQueueTaskWaker {
is_ready: AtomicBool, is_ready: AtomicBool,
waker: AtomicWaker, waker: AtomicWaker,
} }
#[derive(Default)] #[derive(Debug, Default)]
struct TaskQueueTasks { struct TaskQueueTasks {
is_running: bool, is_running: bool,
wakers: LinkedList<Arc<TaskQueueTaskWaker>>, wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
@ -26,40 +26,35 @@ struct TaskQueueTasks {
/// Note that tokio's semaphore doesn't seem to maintain order /// Note that tokio's semaphore doesn't seem to maintain order
/// and so we can't use that in the code that uses this or use /// and so we can't use that in the code that uses this or use
/// that here. /// that here.
#[derive(Clone, Default)] #[derive(Debug, Default)]
pub struct TaskQueue { pub struct TaskQueue {
tasks: Arc<Mutex<TaskQueueTasks>>, tasks: Mutex<TaskQueueTasks>,
} }
impl TaskQueue { impl TaskQueue {
/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub async fn acquire(&self) -> TaskQueuePermit {
let acquire = TaskQueuePermitAcquire::new(self);
acquire.await;
TaskQueuePermit(self)
}
/// Alternate API that acquires a permit internally /// Alternate API that acquires a permit internally
/// for the duration of the future. /// for the duration of the future.
#[cfg(test)]
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R { pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
let _permit = self.acquire().await; let _permit = self.acquire().await;
future.await future.await
} }
/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub async fn acquire(&self) -> TaskQueuePermit {
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
acquire.await;
TaskQueuePermit {
tasks: self.tasks.clone(),
}
}
} }
/// A permit that when dropped will allow another task to proceed. /// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit { pub struct TaskQueuePermit<'a>(&'a TaskQueue);
tasks: Arc<Mutex<TaskQueueTasks>>,
}
impl Drop for TaskQueuePermit { impl<'a> Drop for TaskQueuePermit<'a> {
fn drop(&mut self) { fn drop(&mut self) {
let next_item = { let next_item = {
let mut tasks = self.tasks.lock(); let mut tasks = self.0.tasks.lock();
let next_item = tasks.wakers.pop_front(); let next_item = tasks.wakers.pop_front();
tasks.is_running = next_item.is_some(); tasks.is_running = next_item.is_some();
next_item next_item
@ -71,23 +66,23 @@ impl Drop for TaskQueuePermit {
} }
} }
struct TaskQueuePermitAcquire { struct TaskQueuePermitAcquire<'a> {
tasks: Arc<Mutex<TaskQueueTasks>>, task_queue: &'a TaskQueue,
initialized: AtomicBool, initialized: AtomicBool,
waker: Arc<TaskQueueTaskWaker>, waker: Arc<TaskQueueTaskWaker>,
} }
impl TaskQueuePermitAcquire { impl<'a> TaskQueuePermitAcquire<'a> {
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self { pub fn new(task_queue: &'a TaskQueue) -> Self {
Self { Self {
tasks, task_queue,
initialized: Default::default(), initialized: Default::default(),
waker: Default::default(), waker: Default::default(),
} }
} }
} }
impl Future for TaskQueuePermitAcquire { impl<'a> Future for TaskQueuePermitAcquire<'a> {
type Output = (); type Output = ();
fn poll( fn poll(
@ -99,7 +94,7 @@ impl Future for TaskQueuePermitAcquire {
// ensure this is initialized // ensure this is initialized
if !self.initialized.swap(true, Ordering::SeqCst) { if !self.initialized.swap(true, Ordering::SeqCst) {
let mut tasks = self.tasks.lock(); let mut tasks = self.task_queue.tasks.lock();
if !tasks.is_running { if !tasks.is_running {
tasks.is_running = true; tasks.is_running = true;
return std::task::Poll::Ready(()); return std::task::Poll::Ready(());