From 9dcd7267bacd0924e2bd17a8519e74d0ea480e94 Mon Sep 17 00:00:00 2001 From: "Lasse R.H. Nielsen" Date: Tue, 13 Aug 2019 10:49:50 +0000 Subject: [PATCH] Remove unnecessary completers from async_patch code. Use _Future directly. Add ability to get trace of awaited continuations. Change-Id: I6c3aba0bdc2e54afe1d84fdd802fb5210d7598ac Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/112721 Reviewed-by: Martin Kustermann Commit-Queue: Lasse R.H. Nielsen --- runtime/lib/async_patch.dart | 40 +++++++-------- .../js_dev_runtime/patch/async_patch.dart | 4 +- .../_internal/js_runtime/lib/async_patch.dart | 45 +++++++--------- sdk/lib/async/future_impl.dart | 51 ++++++++++++++++--- 4 files changed, 83 insertions(+), 57 deletions(-) diff --git a/runtime/lib/async_patch.dart b/runtime/lib/async_patch.dart index 8ee544e91af..c1143fb7535 100644 --- a/runtime/lib/async_patch.dart +++ b/runtime/lib/async_patch.dart @@ -18,30 +18,24 @@ import "dart:_internal" show VMLibraryHooks, patch; _fatal(msg) native "DartAsync_fatal"; class _AsyncAwaitCompleter implements Completer { - final _completer = new Completer.sync(); + final _future = new _Future(); bool isSync; _AsyncAwaitCompleter() : isSync = false; void complete([FutureOr value]) { - if (isSync) { - _completer.complete(value); - } else if (value is Future) { - value.then(_completer.complete, onError: _completer.completeError); + if (!isSync || value is Future) { + _future._asyncComplete(value); } else { - scheduleMicrotask(() { - _completer.complete(value); - }); + _future._completeWithValue(value); } } void completeError(e, [st]) { if (isSync) { - _completer.completeError(e, st); + _future._completeError(e, st); } else { - scheduleMicrotask(() { - _completer.completeError(e, st); - }); + _future._asyncCompleteError(e, st); } } @@ -50,8 +44,8 @@ class _AsyncAwaitCompleter implements Completer { isSync = true; } - Future get future => _completer.future; - bool get isCompleted => _completer.isCompleted; + Future get future => _future; + bool get isCompleted => !_future._mayComplete; } // We need to pass the value as first argument and leave the second and third @@ -107,7 +101,7 @@ Future _awaitHelper( // We can only do this for our internal futures (the default implementation of // all futures that are constructed by the `dart:async` library). object._awaiter = awaiter; - return object._thenNoZoneRegistration(thenCallback, errorCallback); + return object._thenAwait(thenCallback, errorCallback); } // Called as part of the 'await for (...)' construct. Registers the @@ -143,7 +137,7 @@ class _AsyncStarStreamController { bool onListenReceived = false; bool isScheduled = false; bool isSuspendedAtYield = false; - Completer cancellationCompleter = null; + _Future cancellationFuture = null; Stream get stream { final Stream local = controller.stream; @@ -210,10 +204,10 @@ class _AsyncStarStreamController { } void addError(Object error, StackTrace stackTrace) { - if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { + if ((cancellationFuture != null) && cancellationFuture._mayComplete) { // If the stream has been cancelled, complete the cancellation future // with the error. - cancellationCompleter.completeError(error, stackTrace); + cancellationFuture._completeError(error, stackTrace); return; } // If stream is cancelled, tell caller to exit the async generator. @@ -226,10 +220,10 @@ class _AsyncStarStreamController { } close() { - if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { + if ((cancellationFuture != null) && cancellationFuture._mayComplete) { // If the stream has been cancelled, complete the cancellation future // with the error. - cancellationCompleter.complete(); + cancellationFuture._completeWithValue(null); } controller.close(); } @@ -257,8 +251,8 @@ class _AsyncStarStreamController { if (controller.isClosed) { return null; } - if (cancellationCompleter == null) { - cancellationCompleter = new Completer(); + if (cancellationFuture == null) { + cancellationFuture = new _Future(); // Only resume the generator if it is suspended at a yield. // Cancellation does not affect an async generator that is // suspended at an await. @@ -266,7 +260,7 @@ class _AsyncStarStreamController { scheduleGenerator(); } } - return cancellationCompleter.future; + return cancellationFuture; } } diff --git a/sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart b/sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart index dd9e0ec585d..894aef4d62e 100644 --- a/sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart +++ b/sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart @@ -37,7 +37,7 @@ _async(Function() initGenerator) { } else { f = _Future.value(value); } - f = JS('', '#', f._thenNoZoneRegistration(onValue, onError)); + f = JS('', '#', f._thenAwait(onValue, onError)); return f; } @@ -363,7 +363,7 @@ class _AsyncStarImpl { } else { f = _Future.value(value); } - f._thenNoZoneRegistration(_runBodyCallback, handleError); + f._thenAwait(_runBodyCallback, handleError); } /// Adds element to [stream] and returns true if the caller should terminate diff --git a/sdk/lib/_internal/js_runtime/lib/async_patch.dart b/sdk/lib/_internal/js_runtime/lib/async_patch.dart index 4b63f9ed40c..68bcbeb1e7e 100644 --- a/sdk/lib/_internal/js_runtime/lib/async_patch.dart +++ b/sdk/lib/_internal/js_runtime/lib/async_patch.dart @@ -190,35 +190,29 @@ bool _hasTimer() { } class _AsyncAwaitCompleter implements Completer { - final _completer = new Completer.sync(); + final _future = new _Future(); bool isSync; _AsyncAwaitCompleter() : isSync = false; void complete([FutureOr value]) { - if (isSync) { - _completer.complete(value); - } else if (value is Future) { - value.then(_completer.complete, onError: _completer.completeError); + if (!isSync || value is Future) { + _future._asyncComplete(value); } else { - scheduleMicrotask(() { - _completer.complete(value); - }); + _future._completeWithValue(value); } } void completeError(e, [st]) { if (isSync) { - _completer.completeError(e, st); + _future._completeError(e, st); } else { - scheduleMicrotask(() { - _completer.completeError(e, st); - }); + _future._asyncCompleteError(e, st); } } - Future get future => _completer.future; - bool get isCompleted => _completer.isCompleted; + Future get future => _future; + bool get isCompleted => !_future._mayComplete; } /// Creates a Completer for an `async` function. @@ -295,15 +289,14 @@ void _awaitOnObject(object, _WrappedAsyncBody bodyFunction) { if (object is _Future) { // We can skip the zone registration, since the bodyFunction is already // registered (see [_wrapJsFunctionForAsync]). - object._thenNoZoneRegistration(thenCallback, errorCallback); + object._thenAwait(thenCallback, errorCallback); } else if (object is Future) { object.then(thenCallback, onError: errorCallback); } else { - _Future future = new _Future(); - future._setValue(object); + _Future future = new _Future().._setValue(object); // We can skip the zone registration, since the bodyFunction is already // registered (see [_wrapJsFunctionForAsync]). - future._thenNoZoneRegistration(thenCallback, null); + future._thenAwait(thenCallback, null); } } @@ -381,7 +374,7 @@ void _asyncStarHelper( if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) { // This happens on return from the async* function. if (controller.isCanceled) { - controller.cancelationCompleter.complete(); + controller.cancelationFuture._completeWithValue(null); } else { controller.close(); } @@ -389,7 +382,7 @@ void _asyncStarHelper( } else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) { // The error is a js-error. if (controller.isCanceled) { - controller.cancelationCompleter.completeError( + controller.cancelationFuture._completeError( unwrapException(object), getTraceFromException(object)); } else { controller.addError( @@ -465,13 +458,13 @@ class _AsyncStarStreamController { bool get isPaused => controller.isPaused; - Completer cancelationCompleter = null; + _Future cancelationFuture = null; /// True after the StreamSubscription has been cancelled. /// When this is true, errors thrown from the async* body should go to the - /// [cancelationCompleter] instead of adding them to [controller], and - /// returning from the async function should complete [cancelationCompleter]. - bool get isCanceled => cancelationCompleter != null; + /// [cancelationFuture] instead of adding them to [controller], and + /// returning from the async function should complete [cancelationFuture]. + bool get isCanceled => cancelationFuture != null; add(event) => controller.add(event); @@ -503,7 +496,7 @@ class _AsyncStarStreamController { }, onCancel: () { // If the async* is finished we ignore cancel events. if (!controller.isClosed) { - cancelationCompleter = new Completer(); + cancelationFuture = new _Future(); if (isSuspended) { // Resume the suspended async* function to run finalizers. isSuspended = false; @@ -511,7 +504,7 @@ class _AsyncStarStreamController { body(async_error_codes.STREAM_WAS_CANCELED, null); }); } - return cancelationCompleter.future; + return cancelationFuture; } }); } diff --git a/sdk/lib/async/future_impl.dart b/sdk/lib/async/future_impl.dart index 3fd6b596e82..1ec07910212 100644 --- a/sdk/lib/async/future_impl.dart +++ b/sdk/lib/async/future_impl.dart @@ -67,6 +67,9 @@ class _FutureListener { static const int stateCatcherror = maskError; static const int stateCatcherrorTest = maskError | maskTestError; static const int stateWhencomplete = maskWhencomplete; + static const int maskType = + maskValue | maskError | maskTestError | maskWhencomplete; + static const int stateIsAwait = 16; // Listeners on the same future are linked through this link. _FutureListener _nextListener; // The future to complete when this listener is activated. @@ -84,6 +87,13 @@ class _FutureListener { errorCallback = errorCallback, state = (errorCallback == null) ? stateThen : stateThenOnerror; + _FutureListener.thenAwait( + this.result, _FutureOnValue onValue, Function errorCallback) + : callback = onValue, + errorCallback = errorCallback, + state = ((errorCallback == null) ? stateThen : stateThenOnerror) + | stateIsAwait ; + _FutureListener.catchError(this.result, this.errorCallback, this.callback) : state = (callback == null) ? stateCatcherror : stateCatcherrorTest; @@ -95,8 +105,9 @@ class _FutureListener { bool get handlesValue => (state & maskValue != 0); bool get handlesError => (state & maskError != 0); - bool get hasErrorTest => (state == stateCatcherrorTest); - bool get handlesComplete => (state == stateWhencomplete); + bool get hasErrorTest => (state & maskType == stateCatcherrorTest); + bool get handlesComplete => (state & maskType == stateWhencomplete); + bool get isAwait => (state & stateIsAwait != 0); _FutureOnValue get _onValue { assert(handlesValue); @@ -229,6 +240,27 @@ class _Future implements Future { bool get _isComplete => _state >= _stateValue; bool get _hasError => _state == _stateError; + static List _continuationFunctions(_Future future) { + List result = null; + while (true) { + if (future._mayAddListener) return result; + assert(!future._isComplete); + assert(!future._isChained); + // So _resultOrListeners contains listeners. + _FutureListener listener = future._resultOrListeners; + if (listener != null && + listener._nextListener == null && + listener.isAwait) { + (result ??= []).add(listener.handleValue); + future = listener.result; + assert(!future._isComplete); + } else { + break; + } + } + return result; + } + void _setChained(_Future source) { assert(_mayAddListener); _state = _stateChained; @@ -246,14 +278,21 @@ class _Future implements Future { onError = _registerErrorHandler(onError, currentZone); } } - return _thenNoZoneRegistration(f, onError); + _Future result = new _Future(); + _addListener(new _FutureListener.then(result, f, onError)); + return result; } - // This method is used by async/await. - Future _thenNoZoneRegistration( + /// Registers a system created result and error continuation. + /// + /// Used by the implementation of `await` to listen to a future. + /// The system created liseners are not registered in the zone, + /// and the listener is marked as being from an `await`. + /// This marker is used in [_continuationFunctions]. + Future _thenAwait( FutureOr f(T value), Function onError) { _Future result = new _Future(); - _addListener(new _FutureListener.then(result, f, onError)); + _addListener(new _FutureListener.thenAwait(result, f, onError)); return result; }