mirror of
https://github.com/dart-lang/sdk
synced 2024-09-15 23:29:47 +00:00
Remove unnecessary completers from async_patch code.
Use _Future directly. Add ability to get trace of awaited continuations. Change-Id: I6c3aba0bdc2e54afe1d84fdd802fb5210d7598ac Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/112721 Reviewed-by: Martin Kustermann <kustermann@google.com> Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
This commit is contained in:
parent
b9217efc77
commit
9dcd7267ba
|
@ -18,30 +18,24 @@ import "dart:_internal" show VMLibraryHooks, patch;
|
|||
_fatal(msg) native "DartAsync_fatal";
|
||||
|
||||
class _AsyncAwaitCompleter<T> implements Completer<T> {
|
||||
final _completer = new Completer<T>.sync();
|
||||
final _future = new _Future<T>();
|
||||
bool isSync;
|
||||
|
||||
_AsyncAwaitCompleter() : isSync = false;
|
||||
|
||||
void complete([FutureOr<T> value]) {
|
||||
if (isSync) {
|
||||
_completer.complete(value);
|
||||
} else if (value is Future<T>) {
|
||||
value.then(_completer.complete, onError: _completer.completeError);
|
||||
if (!isSync || value is Future<T>) {
|
||||
_future._asyncComplete(value);
|
||||
} else {
|
||||
scheduleMicrotask(() {
|
||||
_completer.complete(value);
|
||||
});
|
||||
_future._completeWithValue(value);
|
||||
}
|
||||
}
|
||||
|
||||
void completeError(e, [st]) {
|
||||
if (isSync) {
|
||||
_completer.completeError(e, st);
|
||||
_future._completeError(e, st);
|
||||
} else {
|
||||
scheduleMicrotask(() {
|
||||
_completer.completeError(e, st);
|
||||
});
|
||||
_future._asyncCompleteError(e, st);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,8 +44,8 @@ class _AsyncAwaitCompleter<T> implements Completer<T> {
|
|||
isSync = true;
|
||||
}
|
||||
|
||||
Future<T> get future => _completer.future;
|
||||
bool get isCompleted => _completer.isCompleted;
|
||||
Future<T> get future => _future;
|
||||
bool get isCompleted => !_future._mayComplete;
|
||||
}
|
||||
|
||||
// We need to pass the value as first argument and leave the second and third
|
||||
|
@ -107,7 +101,7 @@ Future _awaitHelper(
|
|||
// We can only do this for our internal futures (the default implementation of
|
||||
// all futures that are constructed by the `dart:async` library).
|
||||
object._awaiter = awaiter;
|
||||
return object._thenNoZoneRegistration(thenCallback, errorCallback);
|
||||
return object._thenAwait(thenCallback, errorCallback);
|
||||
}
|
||||
|
||||
// Called as part of the 'await for (...)' construct. Registers the
|
||||
|
@ -143,7 +137,7 @@ class _AsyncStarStreamController<T> {
|
|||
bool onListenReceived = false;
|
||||
bool isScheduled = false;
|
||||
bool isSuspendedAtYield = false;
|
||||
Completer cancellationCompleter = null;
|
||||
_Future cancellationFuture = null;
|
||||
|
||||
Stream<T> get stream {
|
||||
final Stream<T> local = controller.stream;
|
||||
|
@ -210,10 +204,10 @@ class _AsyncStarStreamController<T> {
|
|||
}
|
||||
|
||||
void addError(Object error, StackTrace stackTrace) {
|
||||
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
|
||||
if ((cancellationFuture != null) && cancellationFuture._mayComplete) {
|
||||
// If the stream has been cancelled, complete the cancellation future
|
||||
// with the error.
|
||||
cancellationCompleter.completeError(error, stackTrace);
|
||||
cancellationFuture._completeError(error, stackTrace);
|
||||
return;
|
||||
}
|
||||
// If stream is cancelled, tell caller to exit the async generator.
|
||||
|
@ -226,10 +220,10 @@ class _AsyncStarStreamController<T> {
|
|||
}
|
||||
|
||||
close() {
|
||||
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
|
||||
if ((cancellationFuture != null) && cancellationFuture._mayComplete) {
|
||||
// If the stream has been cancelled, complete the cancellation future
|
||||
// with the error.
|
||||
cancellationCompleter.complete();
|
||||
cancellationFuture._completeWithValue(null);
|
||||
}
|
||||
controller.close();
|
||||
}
|
||||
|
@ -257,8 +251,8 @@ class _AsyncStarStreamController<T> {
|
|||
if (controller.isClosed) {
|
||||
return null;
|
||||
}
|
||||
if (cancellationCompleter == null) {
|
||||
cancellationCompleter = new Completer();
|
||||
if (cancellationFuture == null) {
|
||||
cancellationFuture = new _Future();
|
||||
// Only resume the generator if it is suspended at a yield.
|
||||
// Cancellation does not affect an async generator that is
|
||||
// suspended at an await.
|
||||
|
@ -266,7 +260,7 @@ class _AsyncStarStreamController<T> {
|
|||
scheduleGenerator();
|
||||
}
|
||||
}
|
||||
return cancellationCompleter.future;
|
||||
return cancellationFuture;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ _async<T>(Function() initGenerator) {
|
|||
} else {
|
||||
f = _Future.value(value);
|
||||
}
|
||||
f = JS('', '#', f._thenNoZoneRegistration(onValue, onError));
|
||||
f = JS('', '#', f._thenAwait(onValue, onError));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
@ -363,7 +363,7 @@ class _AsyncStarImpl<T> {
|
|||
} else {
|
||||
f = _Future.value(value);
|
||||
}
|
||||
f._thenNoZoneRegistration(_runBodyCallback, handleError);
|
||||
f._thenAwait(_runBodyCallback, handleError);
|
||||
}
|
||||
|
||||
/// Adds element to [stream] and returns true if the caller should terminate
|
||||
|
|
|
@ -190,35 +190,29 @@ bool _hasTimer() {
|
|||
}
|
||||
|
||||
class _AsyncAwaitCompleter<T> implements Completer<T> {
|
||||
final _completer = new Completer<T>.sync();
|
||||
final _future = new _Future<T>();
|
||||
bool isSync;
|
||||
|
||||
_AsyncAwaitCompleter() : isSync = false;
|
||||
|
||||
void complete([FutureOr<T> value]) {
|
||||
if (isSync) {
|
||||
_completer.complete(value);
|
||||
} else if (value is Future<T>) {
|
||||
value.then(_completer.complete, onError: _completer.completeError);
|
||||
if (!isSync || value is Future<T>) {
|
||||
_future._asyncComplete(value);
|
||||
} else {
|
||||
scheduleMicrotask(() {
|
||||
_completer.complete(value);
|
||||
});
|
||||
_future._completeWithValue(value);
|
||||
}
|
||||
}
|
||||
|
||||
void completeError(e, [st]) {
|
||||
if (isSync) {
|
||||
_completer.completeError(e, st);
|
||||
_future._completeError(e, st);
|
||||
} else {
|
||||
scheduleMicrotask(() {
|
||||
_completer.completeError(e, st);
|
||||
});
|
||||
_future._asyncCompleteError(e, st);
|
||||
}
|
||||
}
|
||||
|
||||
Future<T> get future => _completer.future;
|
||||
bool get isCompleted => _completer.isCompleted;
|
||||
Future<T> get future => _future;
|
||||
bool get isCompleted => !_future._mayComplete;
|
||||
}
|
||||
|
||||
/// Creates a Completer for an `async` function.
|
||||
|
@ -295,15 +289,14 @@ void _awaitOnObject(object, _WrappedAsyncBody bodyFunction) {
|
|||
if (object is _Future) {
|
||||
// We can skip the zone registration, since the bodyFunction is already
|
||||
// registered (see [_wrapJsFunctionForAsync]).
|
||||
object._thenNoZoneRegistration(thenCallback, errorCallback);
|
||||
object._thenAwait(thenCallback, errorCallback);
|
||||
} else if (object is Future) {
|
||||
object.then(thenCallback, onError: errorCallback);
|
||||
} else {
|
||||
_Future future = new _Future();
|
||||
future._setValue(object);
|
||||
_Future future = new _Future().._setValue(object);
|
||||
// We can skip the zone registration, since the bodyFunction is already
|
||||
// registered (see [_wrapJsFunctionForAsync]).
|
||||
future._thenNoZoneRegistration(thenCallback, null);
|
||||
future._thenAwait(thenCallback, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,7 +374,7 @@ void _asyncStarHelper(
|
|||
if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) {
|
||||
// This happens on return from the async* function.
|
||||
if (controller.isCanceled) {
|
||||
controller.cancelationCompleter.complete();
|
||||
controller.cancelationFuture._completeWithValue(null);
|
||||
} else {
|
||||
controller.close();
|
||||
}
|
||||
|
@ -389,7 +382,7 @@ void _asyncStarHelper(
|
|||
} else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) {
|
||||
// The error is a js-error.
|
||||
if (controller.isCanceled) {
|
||||
controller.cancelationCompleter.completeError(
|
||||
controller.cancelationFuture._completeError(
|
||||
unwrapException(object), getTraceFromException(object));
|
||||
} else {
|
||||
controller.addError(
|
||||
|
@ -465,13 +458,13 @@ class _AsyncStarStreamController<T> {
|
|||
|
||||
bool get isPaused => controller.isPaused;
|
||||
|
||||
Completer cancelationCompleter = null;
|
||||
_Future cancelationFuture = null;
|
||||
|
||||
/// True after the StreamSubscription has been cancelled.
|
||||
/// When this is true, errors thrown from the async* body should go to the
|
||||
/// [cancelationCompleter] instead of adding them to [controller], and
|
||||
/// returning from the async function should complete [cancelationCompleter].
|
||||
bool get isCanceled => cancelationCompleter != null;
|
||||
/// [cancelationFuture] instead of adding them to [controller], and
|
||||
/// returning from the async function should complete [cancelationFuture].
|
||||
bool get isCanceled => cancelationFuture != null;
|
||||
|
||||
add(event) => controller.add(event);
|
||||
|
||||
|
@ -503,7 +496,7 @@ class _AsyncStarStreamController<T> {
|
|||
}, onCancel: () {
|
||||
// If the async* is finished we ignore cancel events.
|
||||
if (!controller.isClosed) {
|
||||
cancelationCompleter = new Completer();
|
||||
cancelationFuture = new _Future();
|
||||
if (isSuspended) {
|
||||
// Resume the suspended async* function to run finalizers.
|
||||
isSuspended = false;
|
||||
|
@ -511,7 +504,7 @@ class _AsyncStarStreamController<T> {
|
|||
body(async_error_codes.STREAM_WAS_CANCELED, null);
|
||||
});
|
||||
}
|
||||
return cancelationCompleter.future;
|
||||
return cancelationFuture;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -67,6 +67,9 @@ class _FutureListener<S, T> {
|
|||
static const int stateCatcherror = maskError;
|
||||
static const int stateCatcherrorTest = maskError | maskTestError;
|
||||
static const int stateWhencomplete = maskWhencomplete;
|
||||
static const int maskType =
|
||||
maskValue | maskError | maskTestError | maskWhencomplete;
|
||||
static const int stateIsAwait = 16;
|
||||
// Listeners on the same future are linked through this link.
|
||||
_FutureListener _nextListener;
|
||||
// The future to complete when this listener is activated.
|
||||
|
@ -84,6 +87,13 @@ class _FutureListener<S, T> {
|
|||
errorCallback = errorCallback,
|
||||
state = (errorCallback == null) ? stateThen : stateThenOnerror;
|
||||
|
||||
_FutureListener.thenAwait(
|
||||
this.result, _FutureOnValue<S, T> onValue, Function errorCallback)
|
||||
: callback = onValue,
|
||||
errorCallback = errorCallback,
|
||||
state = ((errorCallback == null) ? stateThen : stateThenOnerror)
|
||||
| stateIsAwait ;
|
||||
|
||||
_FutureListener.catchError(this.result, this.errorCallback, this.callback)
|
||||
: state = (callback == null) ? stateCatcherror : stateCatcherrorTest;
|
||||
|
||||
|
@ -95,8 +105,9 @@ class _FutureListener<S, T> {
|
|||
|
||||
bool get handlesValue => (state & maskValue != 0);
|
||||
bool get handlesError => (state & maskError != 0);
|
||||
bool get hasErrorTest => (state == stateCatcherrorTest);
|
||||
bool get handlesComplete => (state == stateWhencomplete);
|
||||
bool get hasErrorTest => (state & maskType == stateCatcherrorTest);
|
||||
bool get handlesComplete => (state & maskType == stateWhencomplete);
|
||||
bool get isAwait => (state & stateIsAwait != 0);
|
||||
|
||||
_FutureOnValue<S, T> get _onValue {
|
||||
assert(handlesValue);
|
||||
|
@ -229,6 +240,27 @@ class _Future<T> implements Future<T> {
|
|||
bool get _isComplete => _state >= _stateValue;
|
||||
bool get _hasError => _state == _stateError;
|
||||
|
||||
static List<Function> _continuationFunctions(_Future<Object> future) {
|
||||
List<Function> result = null;
|
||||
while (true) {
|
||||
if (future._mayAddListener) return result;
|
||||
assert(!future._isComplete);
|
||||
assert(!future._isChained);
|
||||
// So _resultOrListeners contains listeners.
|
||||
_FutureListener<Object, Object> listener = future._resultOrListeners;
|
||||
if (listener != null &&
|
||||
listener._nextListener == null &&
|
||||
listener.isAwait) {
|
||||
(result ??= <Function>[]).add(listener.handleValue);
|
||||
future = listener.result;
|
||||
assert(!future._isComplete);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void _setChained(_Future source) {
|
||||
assert(_mayAddListener);
|
||||
_state = _stateChained;
|
||||
|
@ -246,14 +278,21 @@ class _Future<T> implements Future<T> {
|
|||
onError = _registerErrorHandler(onError, currentZone);
|
||||
}
|
||||
}
|
||||
return _thenNoZoneRegistration<R>(f, onError);
|
||||
_Future<R> result = new _Future<R>();
|
||||
_addListener(new _FutureListener<T, R>.then(result, f, onError));
|
||||
return result;
|
||||
}
|
||||
|
||||
// This method is used by async/await.
|
||||
Future<E> _thenNoZoneRegistration<E>(
|
||||
/// Registers a system created result and error continuation.
|
||||
///
|
||||
/// Used by the implementation of `await` to listen to a future.
|
||||
/// The system created liseners are not registered in the zone,
|
||||
/// and the listener is marked as being from an `await`.
|
||||
/// This marker is used in [_continuationFunctions].
|
||||
Future<E> _thenAwait<E>(
|
||||
FutureOr<E> f(T value), Function onError) {
|
||||
_Future<E> result = new _Future<E>();
|
||||
_addListener(new _FutureListener<T, E>.then(result, f, onError));
|
||||
_addListener(new _FutureListener<T, E>.thenAwait(result, f, onError));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue