Refactor to implement Stream.fromIterator more directly.

The current implementation uses a specialized version of the
pending-event queue used by stream subscriptions to remember
pending events.
That makes the queue polymorphic, and slightly more complicated than
necessary, and that again makes further refactorings of the
Stream implementation harder.

This change moves the logic from the specialized pending-queue
into a simple function instead, so you only pay for it if you
actually use `Stream.fromIterable`.

Also allows `Stream.fromIterable` to be listened to more than once.
(It uses `Stream.multi` for the general async+sync controller API,
and it would cost extra code to make it only work once.)

Change-Id: I44b2010225cd3d32c2bcdb8a315c94881331bdae
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/248146
Reviewed-by: Nate Bosch <nbosch@google.com>
Commit-Queue: Lasse Nielsen <lrn@google.com>
This commit is contained in:
Lasse R.H. Nielsen 2022-06-16 11:39:30 +00:00 committed by Commit Bot
parent e4e3e0927c
commit 330759efc0
7 changed files with 126 additions and 134 deletions

View file

@ -18,6 +18,12 @@
This feature has not been supported in most compilation targets for some
time but is now completely removed.
### Libraries
#### `dart:async`
- The `Stream.fromIterable` stream can now be listened to more than once.
## 2.18.0
### Language

View file

@ -452,7 +452,7 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
/// on another stream, and it is fine to forward them synchronously.
class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {
_StreamImplEvents<T>? _pending;
_PendingEvents<T>? _pending;
_AsBroadcastStreamController(void onListen()?, void onCancel()?)
: super(onListen, onCancel);
@ -463,7 +463,7 @@ class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
}
void _addPendingEvent(_DelayedEvent event) {
(_pending ??= new _StreamImplEvents<T>()).add(event);
(_pending ??= new _PendingEvents<T>()).add(event);
}
void add(T data) {
@ -489,9 +489,10 @@ class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
void _flushPending() {
var pending = _pending;
while (pending != null && !pending.isEmpty) {
pending.handleNext(this);
pending = _pending;
if (pending != null) {
while (!pending.isEmpty) {
pending.handleNext(this);
}
}
}

View file

@ -321,7 +321,7 @@ abstract class Stream<T> {
return controller.stream;
}
/// Creates a single-subscription stream that gets its data from [elements].
/// Creates a stream that gets its data from [elements].
///
/// The iterable is iterated when the stream receives a listener, and stops
/// iterating if the listener cancels the subscription, or if the
@ -333,15 +333,66 @@ abstract class Stream<T> {
/// If reading [Iterator.current] on `elements.iterator` throws,
/// the stream emits that error, but keeps iterating.
///
/// Can be listened to more than once. Each listener iterates [elements]
/// independently.
///
/// Example:
/// ```dart
/// final numbers = [1, 2, 3, 5, 6, 7];
/// final stream = Stream.fromIterable(numbers);
/// ```
factory Stream.fromIterable(Iterable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(elements));
}
factory Stream.fromIterable(Iterable<T> elements) =>
Stream<T>.multi((controller) {
Iterator<T> iterator;
try {
iterator = elements.iterator;
} catch (e, s) {
controller.addError(e, s);
controller.close();
return;
}
var zone = Zone.current;
var isScheduled = true;
void next() {
if (!controller.hasListener || controller.isPaused) {
// Cancelled or paused since scheduled.
isScheduled = false;
return;
}
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller.addErrorSync(e, s);
controller.closeSync();
return;
}
if (hasNext) {
try {
controller.addSync(iterator.current);
} catch (e, s) {
controller.addErrorSync(e, s);
}
if (controller.hasListener && !controller.isPaused) {
zone.scheduleMicrotask(next);
} else {
isScheduled = false;
}
} else {
controller.closeSync();
}
}
controller.onResume = () {
if (!isScheduled) {
isScheduled = true;
zone.scheduleMicrotask(next);
}
};
zone.scheduleMicrotask(next);
});
/// Creates a multi-subscription stream.
///

View file

@ -527,19 +527,19 @@ abstract class _StreamController<T> implements _StreamControllerBase<T> {
}
// Returns the pending events, and creates the object if necessary.
_StreamImplEvents<T> _ensurePendingEvents() {
_PendingEvents<T> _ensurePendingEvents() {
assert(_isInitialState);
if (!_isAddingStream) {
Object? events = _varData;
if (events == null) {
_varData = events = _StreamImplEvents<T>();
_varData = events = _PendingEvents<T>();
}
return events as dynamic;
}
_StreamControllerAddStreamState<T> state = _varData as dynamic;
Object? events = state.varData;
if (events == null) {
state.varData = events = _StreamImplEvents<T>();
state.varData = events = _PendingEvents<T>();
}
return events as dynamic;
}

View file

@ -80,9 +80,9 @@ class _BufferingStreamSubscription<T>
/* Event handlers provided in constructor. */
@pragma("vm:entry-point")
_DataHandler<T> _onData;
void Function(T) _onData;
Function _onError;
_DoneHandler _onDone;
void Function() _onDone;
final Zone _zone;
@ -318,9 +318,7 @@ class _BufferingStreamSubscription<T>
/// If the subscription is not paused, this also schedules a firing
/// of pending events later (if necessary).
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T>? pending = _pending as dynamic;
pending ??= _StreamImplEvents<T>();
_pending = pending;
var pending = _pending ??= _PendingEvents<T>();
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
@ -487,82 +485,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _onListen(StreamSubscription subscription) {}
}
typedef _PendingEvents<T> _EventGenerator<T>();
/// Stream that generates its own events.
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
final _EventGenerator<T> _pending;
bool _isUsed = false;
/// Initializes the stream to have only the events provided by a
/// [_PendingEvents].
///
/// A new [_PendingEvents] must be generated for each listen.
_GeneratedStreamImpl(this._pending);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError)
.._setPendingEvents(_pending());
}
}
/// Pending events object that gets its events from an [Iterable].
class _IterablePendingEvents<T> extends _PendingEvents<T> {
// The iterator providing data for data events.
// Set to null when iteration has completed.
Iterator<T>? _iterator;
_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
bool get isEmpty => _iterator == null;
void handleNext(_EventDispatch<T> dispatch) {
var iterator = _iterator;
if (iterator == null) {
throw new StateError("No events pending.");
}
// Send one event per call to moveNext.
// If moveNext returns true, send the current element as data.
// If current throws, send that error, but keep iterating.
// If moveNext returns false, send a done event and clear the _iterator.
// If moveNext throws an error, send an error and prepare to send a done
// event afterwards.
bool movedNext = false;
try {
if (iterator.moveNext()) {
movedNext = true;
dispatch._sendData(iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (e, s) {
if (!movedNext) {
// Threw in .moveNext().
// Ensure that we send a done afterwards.
_iterator = const EmptyIterator<Never>();
}
// Else threw in .current.
dispatch._sendError(e, s);
}
}
void clear() {
if (isScheduled) cancelSchedule();
_iterator = null;
}
}
// Internal helpers.
// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
typedef void _DoneHandler();
/// Default data handler, does nothing.
void _nullDataHandler(dynamic value) {}
@ -617,32 +541,36 @@ class _DelayedDone implements _DelayedEvent {
}
}
/// Superclass for provider of pending events.
abstract class _PendingEvents<T> {
/// Container and manager of pending events for a stream subscription.
class _PendingEvents<T> {
// No async event has been scheduled.
static const int _STATE_UNSCHEDULED = 0;
static const int stateUnscheduled = 0;
// An async event has been scheduled to run a function.
static const int _STATE_SCHEDULED = 1;
static const int stateScheduled = 1;
// An async event has been scheduled, but it will do nothing when it runs.
// Async events can't be preempted.
static const int _STATE_CANCELED = 3;
static const int stateCanceled = 3;
/// State of being scheduled.
///
/// Set to [_STATE_SCHEDULED] when pending events are scheduled for
/// Set to [stateScheduled] when pending events are scheduled for
/// async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
/// scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
/// scheduling is "canceled", the _state is simply set to [stateCanceled]
/// which will make the async code do nothing except resetting [_state].
///
/// If events are scheduled while the state is [_STATE_CANCELED], it is
/// merely switched back to [_STATE_SCHEDULED], but no new call to
/// If events are scheduled while the state is [stateCanceled], it is
/// merely switched back to [stateScheduled], but no new call to
/// [scheduleMicrotask] is performed.
int _state = _STATE_UNSCHEDULED;
int _state = stateUnscheduled;
bool get isEmpty;
/// First element in the list of pending events, if any.
_DelayedEvent? firstPendingEvent;
bool get isScheduled => _state == _STATE_SCHEDULED;
bool get _eventScheduled => _state >= _STATE_SCHEDULED;
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent? lastPendingEvent;
bool get isScheduled => _state == stateScheduled;
bool get _eventScheduled => _state >= stateScheduled;
/// Schedule an event to run later.
///
@ -652,37 +580,23 @@ abstract class _PendingEvents<T> {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
assert(_state == stateCanceled);
_state = stateScheduled;
return;
}
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
_state = stateUnscheduled;
if (oldState == stateCanceled) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
_state = stateScheduled;
}
void cancelSchedule() {
if (isScheduled) _state = _STATE_CANCELED;
if (isScheduled) _state = stateCanceled;
}
void handleNext(_EventDispatch<T> dispatch);
/// Throw away any pending events and cancel scheduled events.
void clear();
}
/// Class holding pending events for a [_StreamImpl].
class _StreamImplEvents<T> extends _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent? firstPendingEvent;
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent? lastPendingEvent;
bool get isEmpty => lastPendingEvent == null;
void add(_DelayedEvent event) {
@ -722,7 +636,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
final Zone _zone;
int _state = 0;
_DoneHandler? _onDone;
void Function()? _onDone;
_DoneStreamSubscription(this._onDone) : _zone = Zone.current {
_schedule();

View file

@ -92,10 +92,16 @@ main() {
}
{
// Test that you can't listen twice..
asyncStart();
// Test that you can listen twice.
Stream stream = new Stream.fromIterable(iter);
stream.listen((x) {}).cancel();
Expect.throws<StateError>(() => stream.listen((x) {}));
stream.listen((x) {}).cancel(); // Doesn't throw.
Future.wait([stream.toList(), stream.toList()]).then((result) {
Expect.listEquals(iter.toList(), result[0]);
Expect.listEquals(iter.toList(), result[1]);
asyncEnd();
});
}
{
@ -216,9 +222,13 @@ Future<List<Object>> collectEvents(Stream<Object> stream) {
var c = new Completer<List<Object>>();
var events = <Object>[];
stream.listen((value) {
events..add("value")..add(value);
events
..add("value")
..add(value);
}, onError: (error) {
events..add("error")..add(error);
events
..add("error")
..add(error);
}, onDone: () {
c.complete(events);
});

View file

@ -94,10 +94,16 @@ main() {
}
{
// Test that you can't listen twice..
asyncStart();
// Test that you can listen twice.
Stream stream = new Stream.fromIterable(iter);
stream.listen((x) {}).cancel();
Expect.throws<StateError>(() => stream.listen((x) {}));
stream.listen((x) {}).cancel(); // Doesn't throw.
Future.wait([stream.toList(), stream.toList()]).then((result) {
Expect.listEquals(iter.toList(), result[0]);
Expect.listEquals(iter.toList(), result[1]);
asyncEnd();
});
}
{
@ -218,9 +224,13 @@ Future<List<Object>> collectEvents(Stream<Object> stream) {
var c = new Completer<List<Object>>();
var events = <Object>[];
stream.listen((value) {
events..add("value")..add(value);
events
..add("value")
..add(value);
}, onError: (error) {
events..add("error")..add(error);
events
..add("error")
..add(error);
}, onDone: () {
c.complete(events);
});