deno/cli/util/file_watcher.rs
Łukasz Czerniawski 08d5d32dfc
feat: add --watch-exclude flag (#21935)
This PR introduces the ability to exclude certain paths from the file watcher
in Deno. This is particularly useful when running scripts in watch mode,
as it allows developers to prevent unnecessary restarts when changes are
made to files that do not affect the running script, or when executing
scripts that generate new files which results in an infinite restart
loop.

---------

Co-authored-by: David Sherret <dsherret@gmail.com>
2024-03-27 22:47:46 +00:00

427 lines
11 KiB
Rust

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use crate::args::Flags;
use crate::colors;
use crate::util::fs::canonicalize_path;
use deno_config::glob::PathOrPatternSet;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::Future;
use deno_core::futures::FutureExt;
use deno_core::parking_lot::Mutex;
use deno_runtime::fmt_errors::format_js_error;
use log::info;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use std::cell::RefCell;
use std::collections::HashSet;
use std::io::IsTerminal;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::sleep;
const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
struct DebouncedReceiver {
// The `recv()` call could be used in a tokio `select!` macro,
// and so we store this state on the struct to ensure we don't
// lose items if a `recv()` never completes
received_items: HashSet<PathBuf>,
receiver: UnboundedReceiver<Vec<PathBuf>>,
}
impl DebouncedReceiver {
fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
let (sender, receiver) = mpsc::unbounded_channel();
(
Arc::new(sender),
Self {
receiver,
received_items: HashSet::new(),
},
)
}
async fn recv(&mut self) -> Option<Vec<PathBuf>> {
if self.received_items.is_empty() {
self
.received_items
.extend(self.receiver.recv().await?.into_iter());
}
loop {
select! {
items = self.receiver.recv() => {
self.received_items.extend(items?);
}
_ = sleep(DEBOUNCE_INTERVAL) => {
return Some(self.received_items.drain().collect());
}
}
}
}
}
async fn error_handler<F>(watch_future: F) -> bool
where
F: Future<Output = Result<(), AnyError>>,
{
let result = watch_future.await;
if let Err(err) = result {
let error_string = match err.downcast_ref::<JsError>() {
Some(e) => format_js_error(e),
None => format!("{err:?}"),
};
eprintln!(
"{}: {}",
colors::red_bold("error"),
error_string.trim_start_matches("error: ")
);
false
} else {
true
}
}
pub struct PrintConfig {
banner: &'static str,
/// Printing watcher status to terminal.
job_name: &'static str,
/// Determine whether to clear the terminal screen; applicable to TTY environments only.
clear_screen: bool,
}
impl PrintConfig {
/// By default `PrintConfig` uses "Watcher" as a banner name that will
/// be printed in color. If you need to customize it, use
/// `PrintConfig::new_with_banner` instead.
pub fn new(job_name: &'static str, clear_screen: bool) -> Self {
Self {
banner: "Watcher",
job_name,
clear_screen,
}
}
pub fn new_with_banner(
banner: &'static str,
job_name: &'static str,
clear_screen: bool,
) -> Self {
Self {
banner,
job_name,
clear_screen,
}
}
}
fn create_print_after_restart_fn(
banner: &'static str,
clear_screen: bool,
) -> impl Fn() {
move || {
if clear_screen && std::io::stderr().is_terminal() {
eprint!("{CLEAR_SCREEN}");
}
info!(
"{} File change detected! Restarting!",
colors::intense_blue(banner),
);
}
}
/// An interface to interact with Deno's CLI file watcher.
#[derive(Debug)]
pub struct WatcherCommunicator {
/// Send a list of paths that should be watched for changes.
paths_to_watch_tx: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
/// Listen for a list of paths that were changed.
changed_paths_rx: tokio::sync::broadcast::Receiver<Option<Vec<PathBuf>>>,
/// Send a message to force a restart.
restart_tx: tokio::sync::mpsc::UnboundedSender<()>,
restart_mode: Mutex<WatcherRestartMode>,
banner: String,
}
impl WatcherCommunicator {
pub fn watch_paths(&self, paths: Vec<PathBuf>) -> Result<(), AnyError> {
self.paths_to_watch_tx.send(paths).map_err(AnyError::from)
}
pub fn force_restart(&self) -> Result<(), AnyError> {
// Change back to automatic mode, so that HMR can set up watching
// from scratch.
*self.restart_mode.lock() = WatcherRestartMode::Automatic;
self.restart_tx.send(()).map_err(AnyError::from)
}
pub async fn watch_for_changed_paths(
&self,
) -> Result<Option<Vec<PathBuf>>, AnyError> {
let mut rx = self.changed_paths_rx.resubscribe();
rx.recv().await.map_err(AnyError::from)
}
pub fn change_restart_mode(&self, restart_mode: WatcherRestartMode) {
*self.restart_mode.lock() = restart_mode;
}
pub fn print(&self, msg: String) {
log::info!("{} {}", self.banner, msg);
}
}
/// Creates a file watcher.
///
/// - `operation` is the actual operation we want to run every time the watcher detects file
/// changes. For example, in the case where we would like to bundle, then `operation` would
/// have the logic for it like bundling the code.
pub async fn watch_func<O, F>(
flags: Flags,
print_config: PrintConfig,
operation: O,
) -> Result<(), AnyError>
where
O: FnMut(
Flags,
Arc<WatcherCommunicator>,
Option<Vec<PathBuf>>,
) -> Result<F, AnyError>,
F: Future<Output = Result<(), AnyError>>,
{
let fut = watch_recv(
flags,
print_config,
WatcherRestartMode::Automatic,
operation,
)
.boxed_local();
fut.await
}
#[derive(Clone, Copy, Debug)]
pub enum WatcherRestartMode {
/// When a file path changes the process is restarted.
Automatic,
/// When a file path changes the caller will trigger a restart, using
/// `WatcherInterface.restart_tx`.
Manual,
}
/// Creates a file watcher.
///
/// - `operation` is the actual operation we want to run every time the watcher detects file
/// changes. For example, in the case where we would like to bundle, then `operation` would
/// have the logic for it like bundling the code.
pub async fn watch_recv<O, F>(
mut flags: Flags,
print_config: PrintConfig,
restart_mode: WatcherRestartMode,
mut operation: O,
) -> Result<(), AnyError>
where
O: FnMut(
Flags,
Arc<WatcherCommunicator>,
Option<Vec<PathBuf>>,
) -> Result<F, AnyError>,
F: Future<Output = Result<(), AnyError>>,
{
let exclude_set = flags.resolve_watch_exclude_set()?;
let (paths_to_watch_tx, mut paths_to_watch_rx) =
tokio::sync::mpsc::unbounded_channel();
let (restart_tx, mut restart_rx) = tokio::sync::mpsc::unbounded_channel();
let (changed_paths_tx, changed_paths_rx) = tokio::sync::broadcast::channel(4);
let (watcher_sender, mut watcher_receiver) =
DebouncedReceiver::new_with_sender();
let PrintConfig {
banner,
job_name,
clear_screen,
} = print_config;
let print_after_restart = create_print_after_restart_fn(banner, clear_screen);
let watcher_communicator = Arc::new(WatcherCommunicator {
paths_to_watch_tx: paths_to_watch_tx.clone(),
changed_paths_rx: changed_paths_rx.resubscribe(),
restart_tx: restart_tx.clone(),
restart_mode: Mutex::new(restart_mode),
banner: colors::intense_blue(banner).to_string(),
});
info!("{} {} started.", colors::intense_blue(banner), job_name);
let changed_paths = Rc::new(RefCell::new(None));
let changed_paths_ = changed_paths.clone();
let watcher_ = watcher_communicator.clone();
deno_core::unsync::spawn(async move {
loop {
let received_changed_paths = watcher_receiver.recv().await;
*changed_paths_.borrow_mut() = received_changed_paths.clone();
match *watcher_.restart_mode.lock() {
WatcherRestartMode::Automatic => {
let _ = restart_tx.send(());
}
WatcherRestartMode::Manual => {
// TODO(bartlomieju): should we fail on sending changed paths?
let _ = changed_paths_tx.send(received_changed_paths);
}
}
}
});
loop {
// We may need to give the runtime a tick to settle, as cancellations may need to propagate
// to tasks. We choose yielding 10 times to the runtime as a decent heuristic. If watch tests
// start to fail, this may need to be increased.
for _ in 0..10 {
tokio::task::yield_now().await;
}
let mut watcher = new_watcher(watcher_sender.clone())?;
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx, &exclude_set);
let receiver_future = async {
loop {
let maybe_paths = paths_to_watch_rx.recv().await;
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap(), &exclude_set);
}
};
let operation_future = error_handler(operation(
flags.clone(),
watcher_communicator.clone(),
changed_paths.borrow_mut().take(),
)?);
// don't reload dependencies after the first run
flags.reload = false;
select! {
_ = receiver_future => {},
_ = restart_rx.recv() => {
print_after_restart();
continue;
},
success = operation_future => {
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx, &exclude_set);
// TODO(bartlomieju): print exit code here?
info!(
"{} {} {}. Restarting on file change...",
colors::intense_blue(banner),
job_name,
if success {
"finished"
} else {
"failed"
}
);
},
}
let receiver_future = async {
loop {
let maybe_paths = paths_to_watch_rx.recv().await;
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap(), &exclude_set);
}
};
// If we got this far, it means that the `operation` has finished; let's wait
// and see if there are any new paths to watch received or any of the already
// watched paths has changed.
select! {
_ = receiver_future => {},
_ = restart_rx.recv() => {
print_after_restart();
continue;
},
}
}
}
fn new_watcher(
sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
) -> Result<RecommendedWatcher, AnyError> {
Ok(Watcher::new(
move |res: Result<NotifyEvent, NotifyError>| {
let Ok(event) = res else {
return;
};
if !matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
) {
return;
}
let paths = event
.paths
.iter()
.filter_map(|path| canonicalize_path(path).ok())
.collect();
sender.send(paths).unwrap();
},
Default::default(),
)?)
}
fn add_paths_to_watcher(
watcher: &mut RecommendedWatcher,
paths: &[PathBuf],
paths_to_exclude: &PathOrPatternSet,
) {
// Ignore any error e.g. `PathNotFound`
let mut watched_paths = Vec::new();
for path in paths {
if paths_to_exclude.matches_path(path) {
continue;
}
watched_paths.push(path.clone());
let _ = watcher.watch(path, RecursiveMode::Recursive);
}
log::debug!("Watching paths: {:?}", watched_paths);
}
fn consume_paths_to_watch(
watcher: &mut RecommendedWatcher,
receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
exclude_set: &PathOrPatternSet,
) {
loop {
match receiver.try_recv() {
Ok(paths) => {
add_paths_to_watcher(watcher, &paths, exclude_set);
}
Err(e) => match e {
mpsc::error::TryRecvError::Empty => {
break;
}
// there must be at least one receiver alive
_ => unreachable!(),
},
}
}
}