Make sure barback's FilePool doesn't take up *all* the available FDs.

BUG=13752
R=alanknight@google.com

Review URL: https://codereview.chromium.org//28733009

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@29021 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
nweiz@google.com 2013-10-22 18:36:57 +00:00
parent ccb7d5304f
commit 7ff3b89aba
2 changed files with 104 additions and 84 deletions

View file

@ -9,6 +9,10 @@ import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'package:stack_trace/stack_trace.dart';
import 'utils.dart';
/// Manages a pool of files that are opened for reading to cope with maximum
/// file descriptor limits.
///
@ -21,12 +25,38 @@ class FilePool {
/// another file to close so they can be retried.
final _pendingListens = new Queue<_FileReader>();
/// The timeout timer.
///
/// This timer is set as soon as the file limit is reached and is reset every
/// time a file finishes being read or a new file is opened. If it fires, that
/// indicates that the caller became deadlocked, likely due to files waiting
/// for additional files to be read before they could be closed.
Timer _timer;
/// The number of files currently open in the pool.
int _openFiles = 0;
/// The maximum number of file descriptors that the pool will allocate.
///
/// Barback may only use half the available file descriptors.
int get _maxOpenFiles => (maxFileDescriptors / 2).floor();
/// Opens [file] for reading.
///
/// When the returned stream is listened to, if there are too many files
/// open, this will wait for a previously opened file to be closed and then
/// try again.
Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream;
Stream<List<int>> openRead(File file) {
var reader = new _FileReader(this, file);
if (_openFiles < _maxOpenFiles) {
_openFiles++;
reader.start();
} else {
_pendingListens.add(reader);
_heartbeat();
}
return reader.stream;
}
/// Reads [file] as a string using [encoding].
///
@ -49,12 +79,38 @@ class FilePool {
return completer.future;
}
/// Tries to re-listen to the next pending file reader if there are any.
void _retryPendingListen() {
if (_pendingListens.isEmpty) return;
/// If there are any file reads that are waiting for available descriptors,
/// this will allow the oldest one to start reading.
void _startPendingListen() {
if (_pendingListens.isEmpty) {
_openFiles--;
if (_timer != null) {
_timer.cancel();
_timer = null;
}
return;
}
_heartbeat();
var pending = _pendingListens.removeFirst();
pending._listen();
pending.start();
}
/// Indicates that some external action has occurred and the timer should be
/// restarted.
void _heartbeat() {
if (_timer != null) _timer.cancel();
_timer = new Timer(new Duration(seconds: 60), _onTimeout);
}
/// Handles [_timer] timing out by causing all pending file readers to emit
/// exceptions.
void _onTimeout() {
for (var reader in _pendingListens) {
reader.timeout();
}
_pendingListens.clear();
_timer = null;
}
}
@ -67,6 +123,9 @@ class _FileReader {
final FilePool _pool;
final File _file;
/// Whether the caller has paused this reader's stream.
bool _isPaused = false;
/// The underyling file stream.
Stream<List<int>> _fileStream;
@ -78,29 +137,16 @@ class _FileReader {
/// This will only be non-null while the wrapped stream is being listened to.
StreamSubscription _subscription;
/// The timeout timer.
///
/// If this timer fires before the listen is retried, it gives up and throws
/// the original error.
Timer _timer;
/// When a [listen] call has thrown a "too many files" error, this will be
/// the exception object.
Object _exception;
/// When a [listen] call has thrown a "too many files" error, this will be
/// the captured stack trace.
Object _stackTrace;
/// The wrapped stream that the file can be read from.
Stream<List<int>> get stream => _controller.stream;
_FileReader(this._pool, this._file) {
_controller = new StreamController<List<int>>(onListen: _listen,
onPause: () {
_subscription.pause();
_controller = new StreamController<List<int>>(onPause: () {
_isPaused = true;
if (_subscription != null) _subscription.pause();
}, onResume: () {
_subscription.resume();
_isPaused = false;
if (_subscription != null) _subscription.resume();
}, onCancel: () {
if (_subscription != null) _subscription.cancel();
_subscription = null;
@ -108,78 +154,31 @@ class _FileReader {
}
/// Starts listening to the underlying file stream.
void _listen() {
if (_timer != null) {
_timer.cancel();
_timer = null;
}
_exception = null;
_stackTrace = null;
void start() {
_fileStream = _file.openRead();
_subscription = _fileStream.listen(_controller.add,
onError: _onError, onDone: _onDone, cancelOnError: true);
if (_isPaused) _subscription.pause();
}
/// Handles an error from the underlying file stream.
///
/// "Too many file" errors are caught so that we can retry later. Other
/// errors are passed to the wrapped stream and the underlying stream
/// subscription is canceled.
void _onError(Object exception, Object stackTrace) {
assert(_subscription != null);
assert(_exception == null);
/// Emits a timeout exception.
void timeout() {
assert(_subscription == null);
_controller.addError("FilePool deadlock: all file descriptors have been in "
"use for too long.", new Trace.current().vmTrace);
_controller.close();
}
// The subscription is canceled after an error.
_subscription = null;
// We only handle "Too many open files errors".
if (exception is! FileException || exception.osError.errorCode != 24) {
_controller.addError(exception, stackTrace);
return;
}
_exception = exception;
_stackTrace = stackTrace;
// We'll try to defer the listen in the hopes that another file will close
// and we can try. If that doesn't happen after a while, give up and just
// throw the original error.
// TODO(rnystrom): The point of this timer is to not get stuck forever in
// a deadlock scenario. But this can also erroneously fire if there is a
// large number of slow reads that do incrementally finish. A file may not
// move to the front of the queue in time even though it is making
// progress. A better solution is to have a single deadlock timer on the
// FilePool itself that starts when a pending listen is enqueued and checks
// to see if progress has been made when it fires.
_timer = new Timer(new Duration(seconds: 60), _onTimeout);
// Tell the pool that this file is waiting.
_pool._pendingListens.add(this);
/// Forwards an error from the underlying file stream.
void _onError(Object exception, StackTrace stackTrace) {
_controller.addError(exception, stackTrace);
_onDone();
}
/// Handles the underlying file stream finishing.
void _onDone() {
_subscription = null;
_controller.close();
_pool._retryPendingListen();
}
/// If this file failed to be read because there were too many open files and
/// no file was closed in time to retry, this handles giving up.
void _onTimeout() {
assert(_subscription == null);
assert(_exception != null);
// We failed to open in time, so just fail with the original error.
_pool._pendingListens.remove(this);
_controller.addError(_exception, _stackTrace);
_controller.close();
_timer = null;
_exception = null;
_stackTrace = null;
_pool._startPendingListen();
}
}

View file

@ -5,6 +5,7 @@
library barback.utils;
import 'dart:async';
import 'dart:io';
/// A pair of values.
class Pair<E, F> {
@ -195,3 +196,23 @@ Stream futureStream(Future<Stream> future) {
});
return controller.stream;
}
// TODO(nweiz): Use a built-in function when issue 14244 is fixed.
int get maxFileDescriptors {
if (_maxFileDescriptors != null) return _maxFileDescriptors;
// Running "sh -c ulimit -n" via the command line always reports "unlimited",
// even when that's clearly false. Instead we fall back on OS-based
// heuristics.
if (Platform.isWindows) {
_maxFileDescriptors = 512;
return _maxFileDescriptors;
} else if (Platform.isMacOS) {
_maxFileDescriptors = 16348;
return _maxFileDescriptors;
} else {
_maxFileDescriptors = 256;
return _maxFileDescriptors;
}
}
int _maxFileDescriptors;