Re-implement directory polling.

BUG=https://code.google.com/p/dart/issues/detail?id=12107
R=nweiz@google.com

Review URL: https://codereview.chromium.org//21628002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@25746 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
rnystrom@google.com 2013-08-02 22:01:39 +00:00
parent 02ea589b94
commit b44ec0e706
5 changed files with 235 additions and 134 deletions

View file

@ -15,10 +15,6 @@ scheduled_test/lib/*: Skip
scheduled_test/test/scheduled_server_test: Pass, Fail, Slow, Crash # Issue 9231, 9582
scheduled_test/test/scheduled_process_test: Pass, Slow # Issue 9231
[ $runtime == vm ]
watcher/test/no_subscription_test: Pass, Fail # Issue 12107
watcher/test/directory_watcher_test: Pass, Fail # Issue 12107
[ $runtime == d8 || $runtime == jsshell ]
unittest/test/unittest_nested_groups_setup_teardown_test: Pass, Fail # http://dartbug.com/10109
stack_trace/test/vm_test: Fail, OK # VM-specific traces

View file

@ -0,0 +1,74 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library watcher.async_queue;
import 'dart:async';
import 'dart:collection';
typedef Future ItemProcessor<T>(T item);
typedef void ErrorHandler(error);
/// A queue of items that are sequentially, asynchronously processed.
///
/// Unlike [Stream.map] or [Stream.forEach], the callback used to process each
/// item returns a [Future], and it will not advance to the next item until the
/// current item is finished processing.
///
/// Items can be added at any point in time and processing will be started as
/// needed. When all items are processed, it stops processing until more items
/// are added.
class AsyncQueue<T> {
final _items = new Queue<T>();
/// Whether or not the queue is currently waiting on a processing future to
/// complete.
bool _isProcessing = false;
/// The callback to invoke on each queued item.
///
/// The next item in the queue will not be processed until the [Future]
/// returned by this completes.
final ItemProcessor<T> _processor;
/// The handler for errors thrown during processing.
///
/// Used to avoid top-leveling asynchronous errors.
final ErrorHandler _errorHandler;
AsyncQueue(this._processor, {ErrorHandler onError})
: _errorHandler = onError;
/// Enqueues [item] to be processed and starts asynchronously processing it
/// if a process isn't already running.
void add(T item) {
_items.add(item);
// Start up the asynchronous processing if not already running.
if (_isProcessing) return;
_isProcessing = true;
_processNextItem().catchError(_errorHandler);
}
/// Removes all remaining items to be processed.
void clear() {
_items.clear();
}
/// Pulls the next item off [_items] and processes it.
///
/// When complete, recursively calls itself to continue processing unless
/// the process was cancelled.
Future _processNextItem() {
var item = _items.removeFirst();
return _processor(item).then((_) {
if (_items.isNotEmpty) return _processNextItem();
// We have drained the queue, stop processing and wait until something
// has been enqueued.
_isProcessing = false;
});
}
}

View file

@ -5,10 +5,12 @@
library watcher.directory_watcher;
import 'dart:async';
import 'dart:collection';
import 'dart:io';
import 'package:crypto/crypto.dart';
import 'async_queue.dart';
import 'stat.dart';
import 'watch_event.dart';
@ -27,7 +29,7 @@ class DirectoryWatcher {
Stream<WatchEvent> get events => _events.stream;
StreamController<WatchEvent> _events;
_WatchState _state = _WatchState.notWatching;
_WatchState _state = _WatchState.UNSUBSCRIBED;
/// A [Future] that completes when the watcher is initialized and watching
/// for file changes.
@ -51,6 +53,26 @@ class DirectoryWatcher {
/// Used to tell which files have been modified.
final _statuses = new Map<String, _FileStatus>();
/// The subscription used while [directory] is being listed.
///
/// Will be `null` if a list is not currently happening.
StreamSubscription<FileSystemEntity> _listSubscription;
/// The queue of files waiting to be processed to see if they have been
/// modified.
///
/// Processing a file is asynchronous, as is listing the directory, so the
/// queue exists to let each of those proceed at their own rate. The lister
/// will enqueue files as quickly as it can. Meanwhile, files are dequeued
/// and processed sequentially.
AsyncQueue<String> _filesToProcess;
/// The set of files that have been seen in the current directory listing.
///
/// Used to tell which files have been removed: files that are in [_statuses]
/// but not in here when a poll completes have been removed.
final _polledFiles = new Set<String>();
/// Creates a new [DirectoryWatcher] monitoring [directory].
///
/// If [pollingDelay] is passed, it specifies the amount of time the watcher
@ -60,82 +82,133 @@ class DirectoryWatcher {
DirectoryWatcher(this.directory, {Duration pollingDelay})
: pollingDelay = pollingDelay != null ? pollingDelay :
new Duration(seconds: 1) {
_events = new StreamController<WatchEvent>.broadcast(onListen: () {
_state = _state.listen(this);
}, onCancel: () {
_state = _state.cancel(this);
});
_events = new StreamController<WatchEvent>.broadcast(
onListen: _watch, onCancel: _cancel);
_filesToProcess = new AsyncQueue<String>(_processFile,
onError: _events.addError);
}
/// Starts the asynchronous polling process.
///
/// Scans the contents of the directory and compares the results to the
/// previous scan. Loops to continue monitoring as long as there are
/// subscribers to the [events] stream.
Future _watch() {
var files = new Set<String>();
/// Scans to see which files were already present before the watcher was
/// subscribed to, and then starts watching the directory for changes.
void _watch() {
assert(_state == _WatchState.UNSUBSCRIBED);
_state = _WatchState.SCANNING;
_poll();
}
/// Stops watching the directory when there are no more subscribers.
void _cancel() {
assert(_state != _WatchState.UNSUBSCRIBED);
_state = _WatchState.UNSUBSCRIBED;
// If we're in the middle of listing the directory, stop.
if (_listSubscription != null) _listSubscription.cancel();
// Don't process any remaining files.
_filesToProcess.clear();
_polledFiles.clear();
_statuses.clear();
_ready = new Completer();
}
/// Scans the contents of the directory once to see which files have been
/// added, removed, and modified.
void _poll() {
_filesToProcess.clear();
_polledFiles.clear();
var stream = new Directory(directory).list(recursive: true);
_listSubscription = stream.listen((entity) {
assert(_state != _WatchState.UNSUBSCRIBED);
return stream.map((entity) {
if (entity is! File) return new Future.value();
files.add(entity.path);
// TODO(rnystrom): These all run as fast as possible and read the
// contents of the files. That means there's a pretty big IO hit all at
// once. Maybe these should be queued up and rate limited?
return _refreshFile(entity.path);
}).toList().then((futures) {
// Once the listing is done, make sure to wait until each file is also
// done.
return Future.wait(futures);
}).then((_) {
var removedFiles = _statuses.keys.toSet().difference(files);
for (var removed in removedFiles) {
if (_state.shouldNotify) {
_events.add(new WatchEvent(ChangeType.REMOVE, removed));
}
_statuses.remove(removed);
}
if (entity is! File) return;
_filesToProcess.add(entity.path);
}, onDone: () {
assert(_state != _WatchState.UNSUBSCRIBED);
_listSubscription = null;
var previousState = _state;
_state = _state.finish(this);
// If we were already sending notifications, add a bit of delay before
// restarting just so that we don't whale on the file system.
// TODO(rnystrom): Tune this and/or make it tunable?
if (_state.shouldNotify) {
return new Future.delayed(pollingDelay);
}
}).then((_) {
// Make sure we haven't transitioned to a non-watching state during the
// delay.
if (_state.shouldWatch) _watch();
// Null tells the queue consumer that we're done listing.
_filesToProcess.add(null);
});
}
/// Compares the current state of the file at [path] to the state it was in
/// the last time it was scanned.
Future _refreshFile(String path) {
return getModificationTime(path).then((modified) {
var lastStatus = _statuses[path];
/// Processes [file] to determine if it has been modified since the last
/// time it was scanned.
Future _processFile(String file) {
assert(_state != _WatchState.UNSUBSCRIBED);
// If it's modification time hasn't changed, assume the file is unchanged.
if (lastStatus != null && lastStatus.modified == modified) return;
// `null` is the sentinel which means the directory listing is complete.
if (file == null) return _completePoll();
return getModificationTime(file).then((modified) {
if (_checkForCancel()) return;
var lastStatus = _statuses[file];
// If its modification time hasn't changed, assume the file is unchanged.
if (lastStatus != null && lastStatus.modified == modified) {
// The file is still here.
_polledFiles.add(file);
return;
}
return _hashFile(file).then((hash) {
if (_checkForCancel()) return;
return _hashFile(path).then((hash) {
var status = new _FileStatus(modified, hash);
_statuses[path] = status;
_statuses[file] = status;
_polledFiles.add(file);
// Only notify if the file contents changed.
if (_state.shouldNotify &&
(lastStatus == null || !_sameHash(lastStatus.hash, hash))) {
var change = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
_events.add(new WatchEvent(change, path));
}
// Only notify while in the watching state.
if (_state != _WatchState.WATCHING) return;
// And the file is different.
var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
if (!changed) return;
var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
_events.add(new WatchEvent(type, file));
});
});
}
/// After the directory listing is complete, this determines which files were
/// removed and then restarts the next poll.
Future _completePoll() {
// Any files that were not seen in the last poll but that we have a
// status for must have been removed.
var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
for (var removed in removedFiles) {
if (_state == _WatchState.WATCHING) {
_events.add(new WatchEvent(ChangeType.REMOVE, removed));
}
_statuses.remove(removed);
}
if (_state == _WatchState.SCANNING) {
_state = _WatchState.WATCHING;
_ready.complete();
}
// Wait and then poll again.
return new Future.delayed(pollingDelay).then((_) {
if (_checkForCancel()) return;
_poll();
});
}
/// Returns `true` and clears the processing queue if the watcher has been
/// unsubscribed.
bool _checkForCancel() {
if (_state != _WatchState.UNSUBSCRIBED) return false;
// Don't process any more files.
_filesToProcess.clear();
return true;
}
/// Calculates the SHA-1 hash of the file at [path].
Future<List<int>> _hashFile(String path) {
return new File(path).readAsBytes().then((bytes) {
@ -159,71 +232,29 @@ class DirectoryWatcher {
}
}
/// An "event" that is sent to the [_WatchState] FSM to trigger state
/// transitions.
typedef _WatchState _WatchStateEvent(DirectoryWatcher watcher);
/// The different states that the watcher can be in and the transitions between
/// them.
///
/// This class defines a finite state machine for keeping track of what the
/// asynchronous file polling is doing. Each instance of this is a state in the
/// machine and its [listen], [cancel], and [finish] fields define the state
/// transitions when those events occur.
/// Enum class for the states that the [DirectoryWatcher] can be in.
class _WatchState {
/// The watcher has no subscribers.
static final notWatching = new _WatchState(
listen: (watcher) {
watcher._watch();
return _WatchState.scanning;
});
/// There are no subscribers to the watcher's event stream and no watching
/// is going on.
static const UNSUBSCRIBED = const _WatchState("unsubscribed");
/// The watcher has subscribers and is scanning for pre-existing files.
static final scanning = new _WatchState(
cancel: (watcher) {
// No longer watching, so create a new incomplete ready future.
watcher._ready = new Completer();
return _WatchState.cancelling;
}, finish: (watcher) {
watcher._ready.complete();
return _WatchState.watching;
}, shouldWatch: true);
/// There are subscribers and the watcher is doing an initial scan of the
/// directory to see which files were already present before watching started.
///
/// The watcher does not send notifications for changes that occurred while
/// there were no subscribers, or for files already present before watching.
/// The initial scan is used to determine what "before watching" state of
/// the file system was.
static const SCANNING = const _WatchState("scanning");
/// The watcher was unsubscribed while polling and we're waiting for the poll
/// to finish.
static final cancelling = new _WatchState(
listen: (_) => _WatchState.scanning,
finish: (_) => _WatchState.notWatching);
/// There are subscribers and the watcher is polling the directory to look
/// for changes.
static const WATCHING = const _WatchState("watching");
/// The watcher has subscribers, we have scanned for pre-existing files and
/// now we're polling for changes.
static final watching = new _WatchState(
cancel: (watcher) {
// No longer watching, so create a new incomplete ready future.
watcher._ready = new Completer();
return _WatchState.cancelling;
}, finish: (_) => _WatchState.watching,
shouldWatch: true, shouldNotify: true);
/// The name of the state.
final String name;
/// Called when the first subscriber to the watcher has been added.
final _WatchStateEvent listen;
/// Called when all subscriptions on the watcher have been cancelled.
final _WatchStateEvent cancel;
/// Called when a poll loop has finished.
final _WatchStateEvent finish;
/// If the directory watcher should be watching the file system while in
/// this state.
final bool shouldWatch;
/// If a change event should be sent for a file modification while in this
/// state.
final bool shouldNotify;
_WatchState({this.listen, this.cancel, this.finish,
this.shouldWatch: false, this.shouldNotify: false});
const _WatchState(this.name);
}
class _FileStatus {

View file

@ -23,11 +23,11 @@ main() {
// Subscribe to the events.
var completer = new Completer();
var subscription = watcher.events.listen((event) {
var subscription = watcher.events.listen(wrapAsync((event) {
expect(event.type, equals(ChangeType.ADD));
expect(event.path, endsWith("file.txt"));
completer.complete();
});
}));
writeFile("file.txt");
@ -45,20 +45,20 @@ main() {
// Then start listening again.
schedule(() {
completer = new Completer();
subscription = watcher.events.listen((event) {
subscription = watcher.events.listen(wrapAsync((event) {
// We should get an event for the third file, not the one added while
// we weren't subscribed.
expect(event.type, equals(ChangeType.ADD));
expect(event.path, endsWith("added.txt"));
completer.complete();
});
}));
});
// The watcher will have been cancelled and then resumed in the middle of
// its pause between polling loops. That means the second scan to skip
// what changed while we were unsubscribed won't happen until after that
// delay is done. Wait long enough for that to happen.
schedule(() => new Future.delayed(new Duration(seconds: 1)));
schedule(() => new Future.delayed(watcher.pollingDelay * 2));
// And add a third file.
writeFile("added.txt");

View file

@ -84,7 +84,7 @@ DirectoryWatcher createWatcher({bool waitForReady}) {
// Wait until the scan is finished so that we don't miss changes to files
// that could occur before the scan completes.
if (waitForReady != false) {
schedule(() => _watcher.ready);
schedule(() => _watcher.ready, "wait for watcher to be ready");
}
currentSchedule.onComplete.schedule(() {
@ -107,7 +107,7 @@ void expectEvent(ChangeType type, String path) {
// Schedule it so that later file modifications don't occur until after this
// event is received.
schedule(() => future);
schedule(() => future, "wait for $type event");
}
void expectAddEvent(String path) {
@ -149,14 +149,14 @@ void writeFile(String path, {String contents, bool updateModified}) {
var milliseconds = _mockFileModificationTimes.putIfAbsent(path, () => 0);
_mockFileModificationTimes[path]++;
}
});
}, "write file $path");
}
/// Schedules deleting a file in the sandbox at [path].
void deleteFile(String path) {
schedule(() {
new File(p.join(_sandboxDir, path)).deleteSync();
});
}, "delete file $path");
}
/// Schedules renaming a file in the sandbox from [from] to [to].
@ -172,7 +172,7 @@ void renameFile(String from, String to) {
// Manually update the mock modification time for the file.
var milliseconds = _mockFileModificationTimes.putIfAbsent(to, () => 0);
_mockFileModificationTimes[to]++;
});
}, "rename file $from to $to");
}
/// A [Matcher] for [WatchEvent]s.