Make chained futures point to their source instead of opposite.

This means that a chained future that nobody cares about will
not be kept alive by the source.
Any listeners added to the chained future is forwarded to the
source. If the chained source notices that the source has completed,
it copies the values and drops the link completely.

This should allow some unused futures to be GC'ed earlier than
otherwise.

R=floitsch@google.com

Review URL: https://codereview.chromium.org/1516783003.
This commit is contained in:
Lasse R.H. Nielsen 2015-12-11 11:13:39 +01:00
parent ea55dc9bec
commit ed0bc43158

View file

@ -95,11 +95,6 @@ class _FutureListener {
errorCallback = null,
state = STATE_WHENCOMPLETE;
_FutureListener.chain(this.result)
: callback = null,
errorCallback = null,
state = STATE_CHAIN;
Zone get _zone => result._zone;
bool get handlesValue => (state & MASK_VALUE != 0);
@ -133,8 +128,7 @@ class _Future<T> implements Future<T> {
static const int _PENDING_COMPLETE = 1;
/// The future has been chained to another future. The result of that
/// other future becomes the result of this future as well.
// TODO(floitsch): we don't really need a special "_CHAINED" state. We could
// just use the PENDING_COMPLETE state instead.
/// [resultOrListeners] contains the source future.
static const int _CHAINED = 2;
/// The future has been completed with a value result.
static const int _VALUE = 4;
@ -167,11 +161,6 @@ class _Future<T> implements Future<T> {
* is waiting for the other future to complete, and when it does, this future
* will complete with the same result.
* All listeners are forwarded to the other future.
*
* The cases are disjoint - incomplete and unchained ([_INCOMPLETE]),
* incomplete and chained ([_CHAINED]), or completed with value or error
* ([_VALUE] or [_ERROR]) - so the field only needs to hold
* one value at a time.
*/
var _resultOrListeners;
@ -189,14 +178,15 @@ class _Future<T> implements Future<T> {
bool get _mayComplete => _state == _INCOMPLETE;
bool get _isPendingComplete => _state == _PENDING_COMPLETE;
bool get _mayAddListener => _state <= _PENDING_COMPLETE;
bool get _isChained => _state == _CHAINED;
bool get _isComplete => _state >= _VALUE;
bool get _hasValue => _state == _VALUE;
bool get _hasError => _state == _ERROR;
void _setChained() {
assert(!_isComplete);
void _setChained(_Future source) {
assert(_mayAddListener);
_state = _CHAINED;
_resultOrListeners = source;
}
Future then(f(T value), { Function onError }) {
@ -238,18 +228,18 @@ class _Future<T> implements Future<T> {
Stream<T> asStream() => new Stream<T>.fromFuture(this);
void _markPendingCompletion() {
if (!_mayComplete) throw new StateError("Future already completed");
void _setPendingComplete() {
assert(_mayComplete);
_state = _PENDING_COMPLETE;
}
T get _value {
assert(_isComplete && _hasValue);
AsyncError get _error {
assert(_hasError);
return _resultOrListeners;
}
AsyncError get _error {
assert(_isComplete && _hasError);
_Future get _chainSource {
assert(_isChained);
return _resultOrListeners;
}
@ -270,16 +260,70 @@ class _Future<T> implements Future<T> {
_setErrorObject(new AsyncError(error, stackTrace));
}
/// Copy the completion result of [source] into this future.
///
/// Used when a chained future notices that its source is completed.
void _cloneResult(_Future source) {
assert(!_isComplete);
assert(source._isComplete);
_state = source._state;
_resultOrListeners = source._resultOrListeners;
}
void _addListener(_FutureListener listener) {
assert(listener._nextListener == null);
if (_isComplete) {
if (_mayAddListener) {
listener._nextListener = _resultOrListeners;
_resultOrListeners = listener;
} else {
if (_isChained) {
// Delegate listeners to chained source future.
// If the source is complete, instead copy its values and
// drop the chaining.
_Future source = _chainSource;
if (!source._isComplete) {
source._addListener(listener);
return;
}
_cloneResult(source);
}
assert(_isComplete);
// Handle late listeners asynchronously.
_zone.scheduleMicrotask(() {
_propagateToListeners(this, listener);
});
}
}
void _prependListeners(_FutureListener listeners) {
if (listeners == null) return;
if (_mayAddListener) {
_FutureListener existingListeners = _resultOrListeners;
_resultOrListeners = listeners;
if (existingListeners != null) {
_FutureListener cursor = listeners;
while (cursor._nextListener != null) {
cursor = cursor._nextListener;
}
cursor._nextListener = existingListeners;
}
} else {
listener._nextListener = _resultOrListeners;
_resultOrListeners = listener;
if (_isChained) {
// Delegate listeners to chained source future.
// If the source is complete, instead copy its values and
// drop the chaining.
_Future source = _chainSource;
if (!source._isComplete) {
source._prependListeners(listeners);
return;
}
_cloneResult(source);
}
assert(_isComplete);
listeners = _reverseListeners(listeners);
_zone.scheduleMicrotask(() {
_propagateToListeners(this, listeners);
});
}
}
@ -289,7 +333,12 @@ class _Future<T> implements Future<T> {
assert(!_isComplete);
_FutureListener current = _resultOrListeners;
_resultOrListeners = null;
return _reverseListeners(current);
}
_FutureListener _reverseListeners(_FutureListener listeners) {
_FutureListener prev = null;
_FutureListener current = listeners;
while (current != null) {
_FutureListener next = current._nextListener;
current._nextListener = prev;
@ -308,10 +357,10 @@ class _Future<T> implements Future<T> {
assert(source is! _Future);
// Mark the target as chained (and as such half-completed).
target._setChained();
target._setPendingComplete();
try {
source.then((value) {
assert(target._isChained);
assert(target._isPendingComplete);
target._completeWithValue(value);
},
// TODO(floitsch): eventually we would like to make this non-optional
@ -319,7 +368,7 @@ class _Future<T> implements Future<T> {
// the target future's listeners want to have the stack trace we don't
// need a trace.
onError: (error, [stackTrace]) {
assert(target._isChained);
assert(target._isPendingComplete);
target._completeError(error, stackTrace);
});
} catch (e, s) {
@ -336,16 +385,18 @@ class _Future<T> implements Future<T> {
// Take the value (when completed) of source and complete target with that
// value (or error). This function expects that source is a _Future.
static void _chainCoreFuture(_Future source, _Future target) {
assert(!target._isComplete);
assert(source is _Future);
// Mark the target as chained (and as such half-completed).
target._setChained();
_FutureListener listener = new _FutureListener.chain(target);
assert(target._mayAddListener); // Not completed, not already chained.
while (source._isChained) {
source = source._chainSource;
}
if (source._isComplete) {
_propagateToListeners(source, listener);
_FutureListener listeners = target._removeListeners();
target._cloneResult(source);
_propagateToListeners(target, listeners);
} else {
source._addListener(listener);
_FutureListener listeners = target._resultOrListeners;
target._setChained(source);
source._prependListeners(listeners);
}
}
@ -404,7 +455,7 @@ class _Future<T> implements Future<T> {
if (coreFuture._hasError) {
// Case 1 from above. Delay completion to enable the user to register
// callbacks.
_markPendingCompletion();
_setPendingComplete();
_zone.scheduleMicrotask(() {
_chainCoreFuture(coreFuture, this);
});
@ -420,9 +471,10 @@ class _Future<T> implements Future<T> {
return;
} else {
T typedValue = value;
assert(typedValue is T); // Avoid warning that typedValue is unused.
}
_markPendingCompletion();
_setPendingComplete();
_zone.scheduleMicrotask(() {
_completeWithValue(value);
});
@ -431,7 +483,7 @@ class _Future<T> implements Future<T> {
void _asyncCompleteError(error, StackTrace stackTrace) {
assert(!_isComplete);
_markPendingCompletion();
_setPendingComplete();
_zone.scheduleMicrotask(() {
_completeError(error, stackTrace);
});
@ -463,11 +515,15 @@ class _Future<T> implements Future<T> {
_propagateToListeners(source, listener);
}
_FutureListener listener = listeners;
final sourceResult = source._resultOrListeners;
// Do the actual propagation.
// Set initial state of listenerHasError and listenerValueOrError. These
// variables are updated with the outcome of potential callbacks.
// Non-error results, including futures, are stored in
// listenerValueOrError and listenerHasError is set to false. Errors
// are stored in listenerValueOrError as an [AsyncError] and
// listenerHasError is set to true.
bool listenerHasError = hasError;
final sourceResult = source._resultOrListeners;
var listenerValueOrError = sourceResult;
// Only if we either have an error or callbacks, go into this, somewhat
@ -598,10 +654,9 @@ class _Future<T> implements Future<T> {
_Future result = listener.result;
if (chainSource is _Future) {
if (chainSource._isComplete) {
// propagate the value (simulating a tail call).
result._setChained();
listeners = result._removeListeners();
result._cloneResult(chainSource);
source = chainSource;
listeners = new _FutureListener.chain(result);
continue;
} else {
_chainCoreFuture(chainSource, result);