From 7ff3b89abae383cf1de261a5fdd8b178980ed015 Mon Sep 17 00:00:00 2001 From: "nweiz@google.com" Date: Tue, 22 Oct 2013 18:36:57 +0000 Subject: [PATCH] Make sure barback's FilePool doesn't take up *all* the available FDs. BUG=13752 R=alanknight@google.com Review URL: https://codereview.chromium.org//28733009 git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@29021 260f80e4-7a28-3924-810f-c04153c831b5 --- pkg/barback/lib/src/file_pool.dart | 167 ++++++++++++++--------------- pkg/barback/lib/src/utils.dart | 21 ++++ 2 files changed, 104 insertions(+), 84 deletions(-) diff --git a/pkg/barback/lib/src/file_pool.dart b/pkg/barback/lib/src/file_pool.dart index 06192e766ea..64aa404e10b 100644 --- a/pkg/barback/lib/src/file_pool.dart +++ b/pkg/barback/lib/src/file_pool.dart @@ -9,6 +9,10 @@ import 'dart:collection'; import 'dart:convert'; import 'dart:io'; +import 'package:stack_trace/stack_trace.dart'; + +import 'utils.dart'; + /// Manages a pool of files that are opened for reading to cope with maximum /// file descriptor limits. /// @@ -21,12 +25,38 @@ class FilePool { /// another file to close so they can be retried. final _pendingListens = new Queue<_FileReader>(); + /// The timeout timer. + /// + /// This timer is set as soon as the file limit is reached and is reset every + /// time a file finishes being read or a new file is opened. If it fires, that + /// indicates that the caller became deadlocked, likely due to files waiting + /// for additional files to be read before they could be closed. + Timer _timer; + + /// The number of files currently open in the pool. + int _openFiles = 0; + + /// The maximum number of file descriptors that the pool will allocate. + /// + /// Barback may only use half the available file descriptors. + int get _maxOpenFiles => (maxFileDescriptors / 2).floor(); + /// Opens [file] for reading. /// /// When the returned stream is listened to, if there are too many files /// open, this will wait for a previously opened file to be closed and then /// try again. - Stream> openRead(File file) => new _FileReader(this, file).stream; + Stream> openRead(File file) { + var reader = new _FileReader(this, file); + if (_openFiles < _maxOpenFiles) { + _openFiles++; + reader.start(); + } else { + _pendingListens.add(reader); + _heartbeat(); + } + return reader.stream; + } /// Reads [file] as a string using [encoding]. /// @@ -49,12 +79,38 @@ class FilePool { return completer.future; } - /// Tries to re-listen to the next pending file reader if there are any. - void _retryPendingListen() { - if (_pendingListens.isEmpty) return; + /// If there are any file reads that are waiting for available descriptors, + /// this will allow the oldest one to start reading. + void _startPendingListen() { + if (_pendingListens.isEmpty) { + _openFiles--; + if (_timer != null) { + _timer.cancel(); + _timer = null; + } + return; + } + _heartbeat(); var pending = _pendingListens.removeFirst(); - pending._listen(); + pending.start(); + } + + /// Indicates that some external action has occurred and the timer should be + /// restarted. + void _heartbeat() { + if (_timer != null) _timer.cancel(); + _timer = new Timer(new Duration(seconds: 60), _onTimeout); + } + + /// Handles [_timer] timing out by causing all pending file readers to emit + /// exceptions. + void _onTimeout() { + for (var reader in _pendingListens) { + reader.timeout(); + } + _pendingListens.clear(); + _timer = null; } } @@ -67,6 +123,9 @@ class _FileReader { final FilePool _pool; final File _file; + /// Whether the caller has paused this reader's stream. + bool _isPaused = false; + /// The underyling file stream. Stream> _fileStream; @@ -78,29 +137,16 @@ class _FileReader { /// This will only be non-null while the wrapped stream is being listened to. StreamSubscription _subscription; - /// The timeout timer. - /// - /// If this timer fires before the listen is retried, it gives up and throws - /// the original error. - Timer _timer; - - /// When a [listen] call has thrown a "too many files" error, this will be - /// the exception object. - Object _exception; - - /// When a [listen] call has thrown a "too many files" error, this will be - /// the captured stack trace. - Object _stackTrace; - /// The wrapped stream that the file can be read from. Stream> get stream => _controller.stream; _FileReader(this._pool, this._file) { - _controller = new StreamController>(onListen: _listen, - onPause: () { - _subscription.pause(); + _controller = new StreamController>(onPause: () { + _isPaused = true; + if (_subscription != null) _subscription.pause(); }, onResume: () { - _subscription.resume(); + _isPaused = false; + if (_subscription != null) _subscription.resume(); }, onCancel: () { if (_subscription != null) _subscription.cancel(); _subscription = null; @@ -108,78 +154,31 @@ class _FileReader { } /// Starts listening to the underlying file stream. - void _listen() { - if (_timer != null) { - _timer.cancel(); - _timer = null; - } - - _exception = null; - _stackTrace = null; - + void start() { _fileStream = _file.openRead(); _subscription = _fileStream.listen(_controller.add, onError: _onError, onDone: _onDone, cancelOnError: true); + if (_isPaused) _subscription.pause(); } - /// Handles an error from the underlying file stream. - /// - /// "Too many file" errors are caught so that we can retry later. Other - /// errors are passed to the wrapped stream and the underlying stream - /// subscription is canceled. - void _onError(Object exception, Object stackTrace) { - assert(_subscription != null); - assert(_exception == null); + /// Emits a timeout exception. + void timeout() { + assert(_subscription == null); + _controller.addError("FilePool deadlock: all file descriptors have been in " + "use for too long.", new Trace.current().vmTrace); + _controller.close(); + } - // The subscription is canceled after an error. - _subscription = null; - - // We only handle "Too many open files errors". - if (exception is! FileException || exception.osError.errorCode != 24) { - _controller.addError(exception, stackTrace); - return; - } - - _exception = exception; - _stackTrace = stackTrace; - - // We'll try to defer the listen in the hopes that another file will close - // and we can try. If that doesn't happen after a while, give up and just - // throw the original error. - // TODO(rnystrom): The point of this timer is to not get stuck forever in - // a deadlock scenario. But this can also erroneously fire if there is a - // large number of slow reads that do incrementally finish. A file may not - // move to the front of the queue in time even though it is making - // progress. A better solution is to have a single deadlock timer on the - // FilePool itself that starts when a pending listen is enqueued and checks - // to see if progress has been made when it fires. - _timer = new Timer(new Duration(seconds: 60), _onTimeout); - - // Tell the pool that this file is waiting. - _pool._pendingListens.add(this); + /// Forwards an error from the underlying file stream. + void _onError(Object exception, StackTrace stackTrace) { + _controller.addError(exception, stackTrace); + _onDone(); } /// Handles the underlying file stream finishing. void _onDone() { _subscription = null; - _controller.close(); - _pool._retryPendingListen(); - } - - /// If this file failed to be read because there were too many open files and - /// no file was closed in time to retry, this handles giving up. - void _onTimeout() { - assert(_subscription == null); - assert(_exception != null); - - // We failed to open in time, so just fail with the original error. - _pool._pendingListens.remove(this); - _controller.addError(_exception, _stackTrace); - _controller.close(); - - _timer = null; - _exception = null; - _stackTrace = null; + _pool._startPendingListen(); } } diff --git a/pkg/barback/lib/src/utils.dart b/pkg/barback/lib/src/utils.dart index 22203b4c89d..ef6a7830538 100644 --- a/pkg/barback/lib/src/utils.dart +++ b/pkg/barback/lib/src/utils.dart @@ -5,6 +5,7 @@ library barback.utils; import 'dart:async'; +import 'dart:io'; /// A pair of values. class Pair { @@ -195,3 +196,23 @@ Stream futureStream(Future future) { }); return controller.stream; } + +// TODO(nweiz): Use a built-in function when issue 14244 is fixed. +int get maxFileDescriptors { + if (_maxFileDescriptors != null) return _maxFileDescriptors; + + // Running "sh -c ulimit -n" via the command line always reports "unlimited", + // even when that's clearly false. Instead we fall back on OS-based + // heuristics. + if (Platform.isWindows) { + _maxFileDescriptors = 512; + return _maxFileDescriptors; + } else if (Platform.isMacOS) { + _maxFileDescriptors = 16348; + return _maxFileDescriptors; + } else { + _maxFileDescriptors = 256; + return _maxFileDescriptors; + } +} +int _maxFileDescriptors;