mirror of
https://github.com/dart-lang/sdk
synced 2024-10-04 16:04:53 +00:00
Revert "Return futures on Stream.cancel when possible."
This reverts commit 1905ddafaa
.
Review URL: https://codereview.chromium.org/2223133002 .
This commit is contained in:
parent
53d968bd02
commit
46a8579c1e
|
@ -16,12 +16,6 @@
|
|||
* Report a better error when a bind fails because of a bad source address.
|
||||
* Handle HTTP header `charset` parameter with empty value.
|
||||
|
||||
* `dart:async`
|
||||
* More aggressively returns a Future on Stream.cancel operations.
|
||||
Discourages to return `null` from `cancel`.
|
||||
* Fixes a few bugs where the cancel future wasn't passed through
|
||||
transformations.
|
||||
|
||||
### Strong Mode
|
||||
|
||||
* New feature - an option to disable implicit casts
|
||||
|
|
|
@ -232,7 +232,6 @@ abstract class Stream<T> {
|
|||
onCancel: () {
|
||||
if (timer != null) timer.cancel();
|
||||
timer = null;
|
||||
return Future._nullFuture;
|
||||
});
|
||||
return controller.stream;
|
||||
}
|
||||
|
@ -442,7 +441,7 @@ abstract class Stream<T> {
|
|||
onListen: onListen,
|
||||
onPause: () { subscription.pause(); },
|
||||
onResume: () { subscription.resume(); },
|
||||
onCancel: () => subscription.cancel(),
|
||||
onCancel: () { subscription.cancel(); },
|
||||
sync: true
|
||||
);
|
||||
}
|
||||
|
@ -500,7 +499,7 @@ abstract class Stream<T> {
|
|||
onListen: onListen,
|
||||
onPause: () { subscription.pause(); },
|
||||
onResume: () { subscription.resume(); },
|
||||
onCancel: () => subscription.cancel(),
|
||||
onCancel: () { subscription.cancel(); },
|
||||
sync: true
|
||||
);
|
||||
}
|
||||
|
@ -1408,10 +1407,7 @@ abstract class StreamSubscription<T> {
|
|||
* the subscription is canceled.
|
||||
*
|
||||
* Returns a future that is completed once the stream has finished
|
||||
* its cleanup.
|
||||
*
|
||||
* For historical reasons, may also return `null` if no cleanup was necessary.
|
||||
* Returning `null` is deprecated and should be avoided.
|
||||
* its cleanup. May also return `null` if no cleanup was necessary.
|
||||
*
|
||||
* Typically, futures are returned when the stream needs to release resources.
|
||||
* For example, a stream might need to close an open file (as an asynchronous
|
||||
|
@ -1715,7 +1711,7 @@ abstract class StreamTransformer<S, T> {
|
|||
* },
|
||||
* onPause: () { subscription.pause(); },
|
||||
* onResume: () { subscription.resume(); },
|
||||
* onCancel: () => subscription.cancel(),
|
||||
* onCancel: () { subscription.cancel(); },
|
||||
* sync: true);
|
||||
* return controller.stream.listen(null);
|
||||
* });
|
||||
|
|
|
@ -188,10 +188,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|||
// error or done event pending (waiting for the cancel to be done) discard
|
||||
// that event.
|
||||
_state &= ~_STATE_WAIT_FOR_CANCEL;
|
||||
if (!_isCanceled) {
|
||||
_cancel();
|
||||
}
|
||||
return _cancelFuture ?? Future._nullFuture;
|
||||
if (_isCanceled) return _cancelFuture;
|
||||
_cancel();
|
||||
return _cancelFuture;
|
||||
}
|
||||
|
||||
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
||||
|
@ -200,14 +199,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|||
// Overwrite the onDone and onError handlers.
|
||||
_onDone = () { result._complete(futureValue); };
|
||||
_onError = (error, stackTrace) {
|
||||
Future cancelFuture = cancel();
|
||||
if (!identical(cancelFuture, Future._nullFuture)) {
|
||||
cancelFuture.whenComplete(() {
|
||||
result._completeError(error, stackTrace);
|
||||
});
|
||||
} else {
|
||||
result._completeError(error, stackTrace);
|
||||
}
|
||||
cancel();
|
||||
result._completeError(error, stackTrace);
|
||||
};
|
||||
|
||||
return result;
|
||||
|
@ -368,8 +361,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|||
if (_cancelOnError) {
|
||||
_state |= _STATE_WAIT_FOR_CANCEL;
|
||||
_cancel();
|
||||
if (_cancelFuture is Future &&
|
||||
!identical(_cancelFuture, Future._nullFuture)) {
|
||||
if (_cancelFuture is Future) {
|
||||
_cancelFuture.whenComplete(sendError);
|
||||
} else {
|
||||
sendError();
|
||||
|
@ -397,8 +389,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
|||
|
||||
_cancel();
|
||||
_state |= _STATE_WAIT_FOR_CANCEL;
|
||||
if (_cancelFuture is Future &&
|
||||
!identical(_cancelFuture, Future._nullFuture)) {
|
||||
if (_cancelFuture is Future) {
|
||||
_cancelFuture.whenComplete(sendDone);
|
||||
} else {
|
||||
sendDone();
|
||||
|
@ -787,7 +778,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
|||
}
|
||||
}
|
||||
|
||||
Future cancel() => Future._nullFuture;
|
||||
Future cancel() => null;
|
||||
|
||||
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
||||
_Future/*<E>*/ result = new _Future/*<E>*/();
|
||||
|
@ -925,7 +916,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
|||
|
||||
Future cancel() {
|
||||
_stream._cancelSubscription();
|
||||
return Future._nullFuture;
|
||||
return null;
|
||||
}
|
||||
|
||||
bool get isPaused {
|
||||
|
@ -1041,7 +1032,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
|||
|
||||
Future cancel() {
|
||||
StreamSubscription subscription = _subscription;
|
||||
if (subscription == null) return Future._nullFuture;
|
||||
if (subscription == null) return null;
|
||||
if (_state == _STATE_MOVING) {
|
||||
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
||||
_clear();
|
||||
|
|
|
@ -109,7 +109,7 @@ class _SinkTransformerStreamSubscription<S, T>
|
|||
if (_isSubscribed) {
|
||||
StreamSubscription subscription = _subscription;
|
||||
_subscription = null;
|
||||
return subscription.cancel();
|
||||
subscription.cancel();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -16,274 +16,237 @@ fail(e) { Expect.fail("Unexepected error: $e"); }
|
|||
|
||||
void testMultiController() {
|
||||
// Test normal flow.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
Events expectedEvents = new Events()
|
||||
..add(42)
|
||||
..add("dibs")
|
||||
..error("error!")
|
||||
..error("error too!")
|
||||
..close();
|
||||
CaptureEvents actualEvents =
|
||||
new Events.capture(c.stream.asBroadcastStream());
|
||||
expectedEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
var c = new StreamController(sync: true);
|
||||
Events expectedEvents = new Events()
|
||||
..add(42)
|
||||
..add("dibs")
|
||||
..error("error!")
|
||||
..error("error too!")
|
||||
..close();
|
||||
CaptureEvents actualEvents = new Events.capture(c.stream.asBroadcastStream());
|
||||
expectedEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test automatic unsubscription on error.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add(42)..error("error");
|
||||
var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
||||
cancelOnError: true);
|
||||
Events sentEvents =
|
||||
new Events()..add(42)..error("error")..add("Are you there?");
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add(42)..error("error");
|
||||
actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
||||
cancelOnError: true);
|
||||
Events sentEvents =
|
||||
new Events()..add(42)..error("error")..add("Are you there?");
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test manual unsubscription.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add(42)..error("error")..add(37);
|
||||
var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
||||
cancelOnError: false);
|
||||
expectedEvents.replay(c);
|
||||
actualEvents.subscription.cancel();
|
||||
c.add("Are you there"); // Not sent to actualEvents.
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add(42)..error("error")..add(37);
|
||||
actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
||||
cancelOnError: false);
|
||||
expectedEvents.replay(c);
|
||||
actualEvents.subscription.cancel();
|
||||
c.add("Are you there"); // Not sent to actualEvents.
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test filter.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()
|
||||
..add("a string")..add("another string")..close();
|
||||
var sentEvents = new Events()
|
||||
..add("a string")..add(42)..add("another string")..close();
|
||||
var actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.where((v) => v is String));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()
|
||||
..add("a string")..add("another string")..close();
|
||||
sentEvents = new Events()
|
||||
..add("a string")..add(42)..add("another string")..close();
|
||||
actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.where((v) => v is String));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test map.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add("abab")..error("error")..close();
|
||||
var sentEvents = new Events()..add("ab")..error("error")..close();
|
||||
var actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.map((v) => "$v$v"));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add("abab")..error("error")..close();
|
||||
sentEvents = new Events()..add("ab")..error("error")..close();
|
||||
actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.map((v) => "$v$v"));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test handleError.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add("ab")..error("[foo]");
|
||||
var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
||||
var actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.handleError((error) {
|
||||
if (error is String) {
|
||||
// TODO(floitsch): this test originally changed the stacktrace.
|
||||
throw "[${error}]";
|
||||
}
|
||||
}), cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add("ab")..error("[foo]");
|
||||
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
||||
actualEvents = new Events.capture(c.stream
|
||||
.asBroadcastStream()
|
||||
.handleError((error) {
|
||||
if (error is String) {
|
||||
// TODO(floitsch): this test originally changed the stacktrace.
|
||||
throw "[${error}]";
|
||||
}
|
||||
}), cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// reduce is tested asynchronously and therefore not in this file.
|
||||
|
||||
// Test expand
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
||||
var expectedEvents = new Events()..add(1)..add(2)..add(3)
|
||||
..add(1)..add(2)
|
||||
..add(1)..add(2)..add(3)..add(4)
|
||||
..close();
|
||||
var actualEvents =
|
||||
new Events.capture(c.stream.asBroadcastStream().expand((v) {
|
||||
var l = [];
|
||||
for (int i = 0; i < v; i++) l.add(i + 1);
|
||||
return l;
|
||||
}));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
||||
expectedEvents = new Events()..add(1)..add(2)..add(3)
|
||||
..add(1)..add(2)
|
||||
..add(1)..add(2)..add(3)..add(4)
|
||||
..close();
|
||||
actualEvents = new Events.capture(c.stream.asBroadcastStream().expand((v) {
|
||||
var l = [];
|
||||
for (int i = 0; i < v; i++) l.add(i + 1);
|
||||
return l;
|
||||
}));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test transform.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
||||
var expectedEvents =
|
||||
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
||||
var actualEvents =
|
||||
new Events.capture(c.stream.asBroadcastStream().transform(
|
||||
new StreamTransformer.fromHandlers(
|
||||
handleData: (v, s) { s.addError(v); },
|
||||
handleError: (e, st, s) { s.add(e); },
|
||||
handleDone: (s) {
|
||||
s.add("foo");
|
||||
s.close();
|
||||
})));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
||||
expectedEvents =
|
||||
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
||||
actualEvents = new Events.capture(c.stream.asBroadcastStream().transform(
|
||||
new StreamTransformer.fromHandlers(
|
||||
handleData: (v, s) { s.addError(v); },
|
||||
handleError: (e, st, s) { s.add(e); },
|
||||
handleDone: (s) {
|
||||
s.add("foo");
|
||||
s.close();
|
||||
})));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test multiple filters.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add(42)
|
||||
..add("snugglefluffy")
|
||||
..add(7)
|
||||
..add("42")
|
||||
..error("not FormatException") // Unsubscribes.
|
||||
..close();
|
||||
var expectedEvents = new Events()..add(42)..error("not FormatException");
|
||||
var actualEvents = new Events.capture(
|
||||
c.stream.asBroadcastStream().where((v) => v is String)
|
||||
.map((v) => int.parse(v))
|
||||
.handleError((error) {
|
||||
if (error is! FormatException) throw error;
|
||||
})
|
||||
.where((v) => v > 10),
|
||||
cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add(42)
|
||||
..add("snugglefluffy")
|
||||
..add(7)
|
||||
..add("42")
|
||||
..error("not FormatException") // Unsubscribes.
|
||||
..close();
|
||||
expectedEvents = new Events()..add(42)..error("not FormatException");
|
||||
actualEvents = new Events.capture(
|
||||
c.stream.asBroadcastStream().where((v) => v is String)
|
||||
.map((v) => int.parse(v))
|
||||
.handleError((error) {
|
||||
if (error is! FormatException) throw error;
|
||||
})
|
||||
.where((v) => v > 10),
|
||||
cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test subscription changes while firing.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sink = c.sink;
|
||||
var stream = c.stream.asBroadcastStream();
|
||||
var counter = 0;
|
||||
var subscription = stream.listen(null);
|
||||
subscription.onData((data) {
|
||||
counter += data;
|
||||
subscription.cancel();
|
||||
stream.listen((data) {
|
||||
counter += 10 * data;
|
||||
});
|
||||
var subscription2 = stream.listen(null);
|
||||
subscription2.onData((data) {
|
||||
counter += 100 * data;
|
||||
if (data == 4) subscription2.cancel();
|
||||
});
|
||||
c = new StreamController(sync: true);
|
||||
var sink = c.sink;
|
||||
var stream = c.stream.asBroadcastStream();
|
||||
var counter = 0;
|
||||
var subscription = stream.listen(null);
|
||||
subscription.onData((data) {
|
||||
counter += data;
|
||||
subscription.cancel();
|
||||
stream.listen((data) {
|
||||
counter += 10 * data;
|
||||
});
|
||||
sink.add(1); // seen by stream 1
|
||||
sink.add(2); // seen by stream 10 and 100
|
||||
sink.add(3); // -"-
|
||||
sink.add(4); // -"-
|
||||
sink.add(5); // seen by stream 10
|
||||
Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
|
||||
}
|
||||
var subscription2 = stream.listen(null);
|
||||
subscription2.onData((data) {
|
||||
counter += 100 * data;
|
||||
if (data == 4) subscription2.cancel();
|
||||
});
|
||||
});
|
||||
sink.add(1); // seen by stream 1
|
||||
sink.add(2); // seen by stream 10 and 100
|
||||
sink.add(3); // -"-
|
||||
sink.add(4); // -"-
|
||||
sink.add(5); // seen by stream 10
|
||||
Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
|
||||
}
|
||||
|
||||
testSingleController() {
|
||||
// Test normal flow.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
Events expectedEvents = new Events()
|
||||
..add(42)
|
||||
..add("dibs")
|
||||
..error("error!")
|
||||
..error("error too!")
|
||||
..close();
|
||||
CaptureEvents actualEvents = new Events.capture(c.stream);
|
||||
expectedEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
var c = new StreamController(sync: true);
|
||||
Events expectedEvents = new Events()
|
||||
..add(42)
|
||||
..add("dibs")
|
||||
..error("error!")
|
||||
..error("error too!")
|
||||
..close();
|
||||
CaptureEvents actualEvents = new Events.capture(c.stream);
|
||||
expectedEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test automatic unsubscription on error.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add(42)..error("error");
|
||||
var actualEvents = new Events.capture(c.stream, cancelOnError: true);
|
||||
Events sentEvents =
|
||||
new Events()..add(42)..error("error")..add("Are you there?");
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add(42)..error("error");
|
||||
actualEvents = new Events.capture(c.stream, cancelOnError: true);
|
||||
Events sentEvents =
|
||||
new Events()..add(42)..error("error")..add("Are you there?");
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test manual unsubscription.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add(42)..error("error")..add(37);
|
||||
var actualEvents = new Events.capture(c.stream, cancelOnError: false);
|
||||
expectedEvents.replay(c);
|
||||
actualEvents.subscription.cancel();
|
||||
c.add("Are you there"); // Not sent to actualEvents.
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add(42)..error("error")..add(37);
|
||||
actualEvents = new Events.capture(c.stream, cancelOnError: false);
|
||||
expectedEvents.replay(c);
|
||||
actualEvents.subscription.cancel();
|
||||
c.add("Are you there"); // Not sent to actualEvents.
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test filter.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()
|
||||
..add("a string")..add("another string")..close();
|
||||
var sentEvents = new Events()
|
||||
..add("a string")..add(42)..add("another string")..close();
|
||||
var actualEvents = new Events.capture(c.stream.where((v) => v is String));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()
|
||||
..add("a string")..add("another string")..close();
|
||||
sentEvents = new Events()
|
||||
..add("a string")..add(42)..add("another string")..close();
|
||||
actualEvents = new Events.capture(c.stream.where((v) => v is String));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test map.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add("abab")..error("error")..close();
|
||||
var sentEvents = new Events()..add("ab")..error("error")..close();
|
||||
var actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add("abab")..error("error")..close();
|
||||
sentEvents = new Events()..add("ab")..error("error")..close();
|
||||
actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test handleError.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var expectedEvents = new Events()..add("ab")..error("[foo]");
|
||||
var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
||||
var actualEvents = new Events.capture(c.stream.handleError((error) {
|
||||
if (error is String) {
|
||||
// TODO(floitsch): this error originally changed the stack trace.
|
||||
throw "[${error}]";
|
||||
}
|
||||
}), cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
expectedEvents = new Events()..add("ab")..error("[foo]");
|
||||
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
||||
actualEvents = new Events.capture(c.stream.handleError((error) {
|
||||
if (error is String) {
|
||||
// TODO(floitsch): this error originally changed the stack trace.
|
||||
throw "[${error}]";
|
||||
}
|
||||
}), cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// reduce is tested asynchronously and therefore not in this file.
|
||||
|
||||
// Test expand
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
||||
var expectedEvents = new Events()..add(1)..add(2)..add(3)
|
||||
..add(1)..add(2)
|
||||
..add(1)..add(2)..add(3)..add(4)
|
||||
..close();
|
||||
var actualEvents = new Events.capture(c.stream.expand((v) {
|
||||
var l = [];
|
||||
for (int i = 0; i < v; i++) l.add(i + 1);
|
||||
return l;
|
||||
}));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
||||
expectedEvents = new Events()..add(1)..add(2)..add(3)
|
||||
..add(1)..add(2)
|
||||
..add(1)..add(2)..add(3)..add(4)
|
||||
..close();
|
||||
actualEvents = new Events.capture(c.stream.expand((v) {
|
||||
var l = [];
|
||||
for (int i = 0; i < v; i++) l.add(i + 1);
|
||||
return l;
|
||||
}));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// test contains.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
c = new StreamController(sync: true);
|
||||
// Error after match is not important.
|
||||
var sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
|
||||
sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
|
||||
Future<bool> contains = c.stream.contains("x");
|
||||
contains.then((var c) {
|
||||
Expect.isTrue(c);
|
||||
|
@ -292,9 +255,9 @@ testSingleController() {
|
|||
}
|
||||
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
c = new StreamController(sync: true);
|
||||
// Not matching is ok.
|
||||
var sentEvents = new Events()..add("a")..add("x")..add("b")..close();
|
||||
sentEvents = new Events()..add("a")..add("x")..add("b")..close();
|
||||
Future<bool> contains = c.stream.contains("y");
|
||||
contains.then((var c) {
|
||||
Expect.isFalse(c);
|
||||
|
@ -303,9 +266,9 @@ testSingleController() {
|
|||
}
|
||||
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
c = new StreamController(sync: true);
|
||||
// Error before match makes future err.
|
||||
var sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
|
||||
sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
|
||||
Future<bool> contains = c.stream.contains("b");
|
||||
contains.then((var c) {
|
||||
Expect.fail("no value expected");
|
||||
|
@ -316,57 +279,51 @@ testSingleController() {
|
|||
}
|
||||
|
||||
// Test transform.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
||||
var expectedEvents =
|
||||
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
||||
var actualEvents = new Events.capture(c.stream.transform(
|
||||
new StreamTransformer.fromHandlers(
|
||||
handleData: (v, s) { s.addError(v); },
|
||||
handleError: (e, st, s) { s.add(e); },
|
||||
handleDone: (s) {
|
||||
s.add("foo");
|
||||
s.close();
|
||||
})));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
||||
expectedEvents =
|
||||
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
||||
actualEvents = new Events.capture(c.stream.transform(
|
||||
new StreamTransformer.fromHandlers(
|
||||
handleData: (v, s) { s.addError(v); },
|
||||
handleError: (e, st, s) { s.add(e); },
|
||||
handleDone: (s) {
|
||||
s.add("foo");
|
||||
s.close();
|
||||
})));
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test multiple filters.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sentEvents = new Events()..add(42)
|
||||
..add("snugglefluffy")
|
||||
..add(7)
|
||||
..add("42")
|
||||
..error("not FormatException") // Unsubscribes.
|
||||
..close();
|
||||
var expectedEvents = new Events()..add(42)..error("not FormatException");
|
||||
var actualEvents = new Events.capture(
|
||||
c.stream.where((v) => v is String)
|
||||
.map((v) => int.parse(v))
|
||||
.handleError((error) {
|
||||
if (error is! FormatException) throw error;
|
||||
})
|
||||
.where((v) => v > 10),
|
||||
cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
sentEvents = new Events()..add(42)
|
||||
..add("snugglefluffy")
|
||||
..add(7)
|
||||
..add("42")
|
||||
..error("not FormatException") // Unsubscribes.
|
||||
..close();
|
||||
expectedEvents = new Events()..add(42)..error("not FormatException");
|
||||
actualEvents = new Events.capture(
|
||||
c.stream.where((v) => v is String)
|
||||
.map((v) => int.parse(v))
|
||||
.handleError((error) {
|
||||
if (error is! FormatException) throw error;
|
||||
})
|
||||
.where((v) => v > 10),
|
||||
cancelOnError: true);
|
||||
sentEvents.replay(c);
|
||||
Expect.listEquals(expectedEvents.events, actualEvents.events);
|
||||
|
||||
// Test that only one subscription is allowed.
|
||||
{
|
||||
var c = new StreamController(sync: true);
|
||||
var sink = c.sink;
|
||||
var stream = c.stream;
|
||||
var counter = 0;
|
||||
var subscription = stream.listen((data) { counter += data; });
|
||||
Expect.throws(() => stream.listen(null), (e) => e is StateError);
|
||||
sink.add(1);
|
||||
Expect.equals(1, counter);
|
||||
c.close();
|
||||
}
|
||||
c = new StreamController(sync: true);
|
||||
var sink = c.sink;
|
||||
var stream = c.stream;
|
||||
var counter = 0;
|
||||
var subscription = stream.listen((data) { counter += data; });
|
||||
Expect.throws(() => stream.listen(null), (e) => e is StateError);
|
||||
sink.add(1);
|
||||
Expect.equals(1, counter);
|
||||
c.close();
|
||||
}
|
||||
|
||||
testExtraMethods() {
|
||||
|
|
|
@ -16,10 +16,7 @@ main() {
|
|||
subscription = stream.listen(expectAsync((data) {
|
||||
expect(data, isNull);
|
||||
receivedCount++;
|
||||
if (receivedCount == 5) {
|
||||
var future = subscription.cancel();
|
||||
expect(future, completes);
|
||||
}
|
||||
if (receivedCount == 5) subscription.cancel();
|
||||
}, count: 5));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import 'dart:async';
|
|||
import 'package:unittest/unittest.dart';
|
||||
|
||||
main() {
|
||||
test("subscription.asFuture success", () {
|
||||
test("subscription.asStream success", () {
|
||||
Stream stream = new Stream.fromIterable([1, 2, 3]);
|
||||
var output = [];
|
||||
var subscription = stream.listen((x) { output.add(x); });
|
||||
|
@ -19,7 +19,7 @@ main() {
|
|||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture success2", () {
|
||||
test("subscription.asStream success2", () {
|
||||
StreamController controller = new StreamController(sync: true);
|
||||
[1, 2, 3].forEach(controller.add);
|
||||
controller.close();
|
||||
|
@ -31,7 +31,7 @@ main() {
|
|||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture success 3", () {
|
||||
test("subscription.asStream success 3", () {
|
||||
Stream stream = new Stream.fromIterable([1, 2, 3]).map((x) => x);
|
||||
var output = [];
|
||||
var subscription = stream.listen((x) { output.add(x); });
|
||||
|
@ -40,7 +40,7 @@ main() {
|
|||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture different type", () {
|
||||
test("subscription.asStream different type", () {
|
||||
Stream stream = new Stream<int>.fromIterable([1, 2, 3]);
|
||||
var asyncCallback = expectAsync(() => {});
|
||||
var output = [];
|
||||
|
@ -52,7 +52,7 @@ main() {
|
|||
});
|
||||
});
|
||||
|
||||
test("subscription.asFuture failure", () {
|
||||
test("subscription.asStream failure", () {
|
||||
StreamController controller = new StreamController(sync: true);
|
||||
[1, 2, 3].forEach(controller.add);
|
||||
controller.addError("foo");
|
||||
|
@ -65,7 +65,7 @@ main() {
|
|||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture failure2", () {
|
||||
test("subscription.asStream failure2", () {
|
||||
Stream stream = new Stream.fromIterable([1, 2, 3, 4])
|
||||
.map((x) {
|
||||
if (x == 4) throw "foo";
|
||||
|
@ -77,50 +77,4 @@ main() {
|
|||
Expect.equals(error, "foo");
|
||||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture delayed cancel", () {
|
||||
var completer = new Completer();
|
||||
var controller =
|
||||
new StreamController(onCancel: () => completer.future, sync: true);
|
||||
[1, 2, 3].forEach(controller.add);
|
||||
controller.addError("foo");
|
||||
controller.close();
|
||||
Stream stream = controller.stream;
|
||||
var output = [];
|
||||
var subscription = stream.listen((x) { output.add(x); });
|
||||
bool catchErrorHasRun = false;
|
||||
subscription.asFuture(output).catchError(expectAsync((error) {
|
||||
Expect.equals(error, "foo");
|
||||
catchErrorHasRun = true;
|
||||
}));
|
||||
Timer.run(expectAsync(() {
|
||||
Expect.isFalse(catchErrorHasRun);
|
||||
completer.complete();
|
||||
}));
|
||||
});
|
||||
|
||||
test("subscription.asFuture failure in cancel", () {
|
||||
runZoned(() {
|
||||
var completer = new Completer();
|
||||
var controller =
|
||||
new StreamController(onCancel: () => completer.future, sync: true);
|
||||
[1, 2, 3].forEach(controller.add);
|
||||
controller.addError("foo");
|
||||
controller.close();
|
||||
Stream stream = controller.stream;
|
||||
var output = [];
|
||||
var subscription = stream.listen((x) { output.add(x); });
|
||||
bool catchErrorHasRun = false;
|
||||
subscription.asFuture(output).catchError(expectAsync((error) {
|
||||
Expect.equals(error, "foo");
|
||||
catchErrorHasRun = true;
|
||||
}));
|
||||
Timer.run(expectAsync(() {
|
||||
Expect.isFalse(catchErrorHasRun);
|
||||
completer.completeError(499);
|
||||
}));
|
||||
}, onError: expectAsync((e) {
|
||||
Expect.equals(499, e);
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -27,22 +27,17 @@ void main() {
|
|||
test('subscription.cancel after close', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () {
|
||||
completer.complete();
|
||||
return completer.future;
|
||||
});
|
||||
onCancel: completer.complete);
|
||||
|
||||
controller.close();
|
||||
|
||||
var completer2 = new Completer();
|
||||
var sub;
|
||||
void onDone() {
|
||||
sub.cancel().then(completer2.complete);
|
||||
expect(sub.cancel(), isNull);
|
||||
}
|
||||
|
||||
sub = controller.stream.listen(null, onDone: onDone);
|
||||
expect(completer.future, completes);
|
||||
expect(completer2.future, completes);
|
||||
});
|
||||
|
||||
test('subscription.cancel after error', () {
|
||||
|
@ -140,183 +135,4 @@ void main() {
|
|||
.cancel();
|
||||
expect(doneCompleter.future, completion(equals(true)));
|
||||
});
|
||||
|
||||
test('subscription.cancel through map', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.map((x) => x).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through asyncMap', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.asyncMap((x) => x).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through asyncExpand', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.asyncExpand((x) => x).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through handleError', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.handleError((x) => x).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through skip', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.skip(1).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through take', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.take(1).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through skipWhile', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.skipWhile((x) => true).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through takeWhile', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.takeWhile((x) => true).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through timeOut', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var duration = const Duration(hours: 5);
|
||||
var future = controller.stream.timeout(duration).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through transform', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var transformer =
|
||||
new StreamTransformer.fromHandlers(handleData: (x, y) {});
|
||||
var future = controller.stream.transform(transformer).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
|
||||
test('subscription.cancel through where', () {
|
||||
var completer = new Completer();
|
||||
StreamController controller = new StreamController(
|
||||
onCancel: () => completer.future);
|
||||
|
||||
bool done = false;
|
||||
var future = controller.stream.where((x) => true).listen(null).cancel();
|
||||
|
||||
expect(future.then((_) => done = true), completion(equals(true)));
|
||||
|
||||
Timer.run(() {
|
||||
expect(done, isFalse);
|
||||
completer.complete();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -166,7 +166,6 @@ async/stream_asyncexpand_test: RuntimeError # Timer interface not supported: Iss
|
|||
async/stream_asyncmap_test: RuntimeError # Timer interface not supported: Issue 7728.
|
||||
async/stream_transformation_broadcast_test: RuntimeError # Timer interface not supported: Issue 7728.
|
||||
async/stream_controller_test: Fail # Timer interface not supported: Issue 7728.
|
||||
async/stream_subscription_cancel_test: Fail # Timer interface not supported: Issue 7728.
|
||||
async/future_constructor2_test: Fail # Timer interface not supported: Issue 7728.
|
||||
mirrors/mirrors_reader_test: Skip # Running in v8 suffices. Issue 16589 - RuntimeError. Issue 22130 - Crash (out of memory).
|
||||
|
||||
|
|
Loading…
Reference in a new issue