Change singleSubscription/multiSubscription to normal/broadcast.

Also make StreamController not a Stream.

Review URL: https://codereview.chromium.org//12049013

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@17563 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
lrn@google.com 2013-01-24 13:30:27 +00:00
parent ed938af868
commit 1f05296600
13 changed files with 176 additions and 192 deletions

View file

@ -19,7 +19,7 @@ part of dart.async;
* you receive a [StreamSubscription] object that can be used to stop listening,
* or to temporarily pause events from the stream.
*
* When an event is fired, all listeners at that time are informed.
* When an event is fired, the listeners at that time are informed.
* If a listener is added or removed while an event is being fired, the change
* will only take effect after the event is completely fired.
*
@ -27,25 +27,25 @@ part of dart.async;
* their input, but often, and preferably, they can simply request their input
* to pause too.
*
* There are two kinds of streams: Single-subscription streams and
* multi-subscription streams.
* There are two kinds of streams: The normal "single-subscription" streams and
* "broadcast" streams.
*
* A single-subscription stream allows only a single listener in its entire
* life-cycle. It holds back events until it gets a listener, and it exhausts
* A single-subscription stream allows only a single listener at a time.
* It holds back events until it gets a listener, and it may exhaust
* itself when the listener is unsubscribed, even if the stream wasn't done.
*
* Single-subscription streams are generally used for streaming parts of
* contiguous data like file I/O.
*
* A multi-subscription stream allows any number of listeners, and it fires
* A broadcast stream allows any number of listeners, and it fires
* its events when they are ready, whether there are listeners or not.
*
* Multi-subscription streams are used for independent events/observers.
* Braodcast streams are used for independent events/observers.
*
* The default implementation of [isSingleSubscription] and
* [asMultiSubscriptionStream] are assuming this is a single-subscription stream
* and a multi-subscription stream inheriting from [Stream] must override these
* to return [:false:] and [:this:] respectively.
* The default implementation of [isBroadcast] and
* [asBroadcastStream] are assuming this is a single-subscription stream
* and a broadcast stream inheriting from [Stream] must override these
* to return [:true:] and [:this:] respectively.
*/
abstract class Stream<T> {
Stream();
@ -78,9 +78,9 @@ abstract class Stream<T> {
}
/**
* Whether the stream is a single-subscription stream.
* Whether the stream is a broadcast stream.
*/
bool get isSingleSubscription => true;
bool get isBroadcast => false;
/**
* Returns a multi-subscription stream that produces the same events as this.
@ -90,9 +90,9 @@ abstract class Stream<T> {
* subscriber is added, and unsubscribe again when the last subscription is
* cancelled.
*
* If this stream is already multi-subscriber, it is returned unmodified.
* If this stream is already a broadcast stream, it is returned unmodified.
*/
Stream<T> asMultiSubscriberStream() {
Stream<T> asBroadcastStream() {
return new _SingleStreamMultiplexer<T>(this);
}
@ -842,7 +842,9 @@ class StreamView<T> extends Stream<T> {
StreamView(this._stream);
bool get isSingleSubscription => _stream.isSingleSubscription;
bool get isBroadcast => _stream.isBroadcast;
Stream<T> asBroadcastStream() => _stream.asBroadcastStream();
StreamSubscription<T> listen(void onData(T value),
{ void onError(AsyncError error),

View file

@ -5,12 +5,11 @@
part of dart.async;
// -------------------------------------------------------------------
// Default implementation of a stream with a controller for adding
// events to the stream.
// Controller for creating and adding events to a stream.
// -------------------------------------------------------------------
/**
* A controller and the stream it controls.
* A controller with the stream it controls.
*
* This controller allows sending data, error and done events on
* its [stream].
@ -21,12 +20,11 @@ part of dart.async;
* it has subscribers or not, as well as getting a callback when either of
* these change.
*/
class StreamController<T> extends Stream<T> implements StreamSink<T> {
_StreamImpl<T> _stream;
Stream<T> get stream => _stream;
class StreamController<T> implements StreamSink<T> {
final _StreamImpl<T> stream;
/**
* A controller with a [stream] that supports multiple subscribers.
* A controller with a broadcast [stream]..
*
* The [onPauseStateChange] function is called when the stream becomes
* paused or resumes after being paused. The current pause state can
@ -36,13 +34,14 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* receives its first listener or loses its last. The current subscription
* state can be read from [hasSubscribers]. Ignored if [:null:].
*/
StreamController.multiSubscription({void onPauseStateChange(),
void onSubscriptionStateChange()}) {
_stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
onPauseStateChange);
}
StreamController.broadcast({void onPauseStateChange(),
void onSubscriptionStateChange()})
: stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
onPauseStateChange);
/**
* A controller with a [stream] that supports only one single subscriber.
*
* The controller will buffer all incoming events until the subscriber is
* registered.
*
@ -55,24 +54,9 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* state can be read from [hasSubscribers]. Ignored if [:null:].
*/
StreamController({void onPauseStateChange(),
void onSubscriptionStateChange()}) {
_stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
onPauseStateChange);
}
bool get isSingleSubscription => _stream.isSingleSubscription;
Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream();
StreamSubscription listen(void onData(T data),
{ void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _stream.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
void onSubscriptionStateChange()})
: stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
onPauseStateChange);
/**
* Returns a view of this object that only exposes the [StreamSink] interface.
@ -80,15 +64,15 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
StreamSink<T> get sink => new StreamSinkView<T>(this);
/** Whether one or more active subscribers have requested a pause. */
bool get isPaused => _stream._isPaused;
bool get isPaused => stream._isPaused;
/** Whether there are currently any subscribers on this [Stream]. */
bool get hasSubscribers => _stream._hasSubscribers;
bool get hasSubscribers => stream._hasSubscribers;
/**
* Send or queue a data event.
*/
void add(T value) => _stream._add(value);
void add(T value) => stream._add(value);
/**
* Send or enqueue an error event.
@ -109,7 +93,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
} else {
asyncError = new AsyncError(error, stackTrace);
}
_stream._signalError(asyncError);
stream._signalError(asyncError);
}
/**
@ -118,19 +102,7 @@ class StreamController<T> extends Stream<T> implements StreamSink<T> {
* The "done" message should be sent at most once by a stream, and it
* should be the last message sent.
*/
void close() { _stream._close(); }
void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
_stream._forEachSubscriber(() {
try {
action();
} on AsyncError catch (e) {
e.throwDelayed();
} catch (e, s) {
new AsyncError(e, s).throwDelayed();
}
});
}
void close() { stream._close(); }
}
typedef void _NotificationHandler();

View file

@ -544,9 +544,9 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
_nextLink = _previousLink = this;
}
bool get isSingleSubscription => false;
bool get isBroadcast => true;
Stream<T> asMultiSubscriberStream() => this;
Stream<T> asBroadcastStream() => this;
// ------------------------------------------------------------------
// Helper functions that can be overridden in subclasses.

View file

@ -51,7 +51,9 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
_ForwardingStream(this._source);
bool get isSingleSubscription => _source.isSingleSubscription;
bool get isBroadcast => _source.isBroadcast;
bool asBroadcastStream() => _source.asBroadcastStream;
StreamSubscription listen(void onData(T value),
{ void onError(AsyncError error),

View file

@ -52,7 +52,7 @@ class _CloseToken {
class IsolateStream extends Stream<dynamic> {
bool _isClosed = false;
final ReceivePort _port;
StreamController _controller = new StreamController.multiSubscription();
StreamController _controller = new StreamController.broadcast();
IsolateStream._fromOriginalReceivePort(this._port) {
_port.receive((message, replyTo) {
@ -95,10 +95,10 @@ class IsolateStream extends Stream<dynamic> {
{ void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _controller.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
return _controller.stream.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
dynamic _unmangleMessage(var message) {

View file

@ -11,9 +11,9 @@ import 'event_helper.dart';
testSupercedeStream() {
{ // Simple case of superceding lower priority streams.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close();
Events actual = new Events.capture(merge);
@ -28,9 +28,9 @@ testSupercedeStream() {
}
{ // Superceding more than one stream at a time.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..close();
Events actual = new Events.capture(merge);
@ -43,9 +43,9 @@ testSupercedeStream() {
}
{ // Closing a stream before superceding it.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3)..close();
Events actual = new Events.capture(merge);
@ -59,9 +59,9 @@ testSupercedeStream() {
}
{ // Errors from all non-superceded streams are forwarded.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..error("1")..error("2")..error("3")
@ -83,9 +83,9 @@ testSupercedeStream() {
}
test("Pausing on a superceding stream", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3);
Events actual = new Events.capture(merge);
@ -114,9 +114,9 @@ testSupercedeStream() {
void testCyclicStream() {
test("Simple case of superceding lower priority streams", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close();
@ -139,9 +139,9 @@ void testCyclicStream() {
});
test("Cyclic merge with errors", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
StreamController s1 = new StreamController.broadcast();
StreamController s2 = new StreamController.broadcast();
StreamController s3 = new StreamController.broadcast();
Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..error("1")..add(2)..add(3)..error("2")

View file

@ -13,8 +13,9 @@ import 'event_helper.dart';
testController() {
// Test reduce
test("StreamController.reduce", () {
StreamController c = new StreamController.multiSubscription();
c.reduce(0, (a,b) => a + b)
StreamController c = new StreamController.broadcast();
Stream stream = c.stream;
stream.reduce(0, (a,b) => a + b)
.then(expectAsync1((int v) {
Expect.equals(42, v);
}));
@ -24,16 +25,18 @@ testController() {
});
test("StreamController.reduce throws", () {
StreamController c = new StreamController.multiSubscription();
c.reduce(0, (a,b) { throw "Fnyf!"; })
StreamController c = new StreamController.broadcast();
Stream stream = c.stream;
stream.reduce(0, (a,b) { throw "Fnyf!"; })
.catchError(expectAsync1((e) { Expect.equals("Fnyf!", e.error); }));
c.add(42);
});
test("StreamController.pipeInto", () {
StreamController c = new StreamController.multiSubscription();
StreamController c = new StreamController.broadcast();
var list = <int>[];
c.pipeInto(new CollectionSink<int>(list))
Stream stream = c.stream;
stream.pipeInto(new CollectionSink<int>(list))
.whenComplete(expectAsync0(() {
Expect.listEquals(<int>[1,2,9,3,9], list);
}));
@ -49,7 +52,8 @@ testController() {
testSingleController() {
test("Single-subscription StreamController.reduce", () {
StreamController c = new StreamController();
c.reduce(0, (a,b) => a + b)
Stream stream = c.stream;
stream.reduce(0, (a,b) => a + b)
.then(expectAsync1((int v) { Expect.equals(42, v); }));
c.add(10);
c.add(32);
@ -58,7 +62,8 @@ testSingleController() {
test("Single-subscription StreamController.reduce throws", () {
StreamController c = new StreamController();
c.reduce(0, (a,b) { throw "Fnyf!"; })
Stream stream = c.stream;
stream.reduce(0, (a,b) { throw "Fnyf!"; })
.catchError(expectAsync1((e) { Expect.equals("Fnyf!", e.error); }));
c.add(42);
});
@ -66,7 +71,8 @@ testSingleController() {
test("Single-subscription StreamController.pipeInto", () {
StreamController c = new StreamController();
var list = <int>[];
c.pipeInto(new CollectionSink<int>(list))
Stream stream = c.stream;
stream.pipeInto(new CollectionSink<int>(list))
.whenComplete(expectAsync0(() {
Expect.listEquals(<int>[1,2,9,3,9], list);
}));
@ -152,21 +158,21 @@ testExtraMethods() {
test("firstMatching", () {
StreamController c = new StreamController();
Future f = c.firstMatching((x) => (x % 3) == 0);
Future f = c.stream.firstMatching((x) => (x % 3) == 0);
f.then(expectAsync1((v) { Expect.equals(9, v); }));
sentEvents.replay(c);
});
test("firstMatching 2", () {
StreamController c = new StreamController();
Future f = c.firstMatching((x) => (x % 4) == 0);
Future f = c.stream.firstMatching((x) => (x % 4) == 0);
f.catchError(expectAsync1((e) {}));
sentEvents.replay(c);
});
test("firstMatching 3", () {
StreamController c = new StreamController();
Future f = c.firstMatching((x) => (x % 4) == 0, defaultValue: () => 999);
Future f = c.stream.firstMatching((x) => (x % 4) == 0, defaultValue: () => 999);
f.then(expectAsync1((v) { Expect.equals(999, v); }));
sentEvents.replay(c);
});
@ -174,49 +180,49 @@ testExtraMethods() {
test("lastMatching", () {
StreamController c = new StreamController();
Future f = c.lastMatching((x) => (x % 3) == 0);
Future f = c.stream.lastMatching((x) => (x % 3) == 0);
f.then(expectAsync1((v) { Expect.equals(87, v); }));
sentEvents.replay(c);
});
test("lastMatching 2", () {
StreamController c = new StreamController();
Future f = c.lastMatching((x) => (x % 4) == 0);
Future f = c.stream.lastMatching((x) => (x % 4) == 0);
f.catchError(expectAsync1((e) {}));
sentEvents.replay(c);
});
test("lastMatching 3", () {
StreamController c = new StreamController();
Future f = c.lastMatching((x) => (x % 4) == 0, defaultValue: () => 999);
Future f = c.stream.lastMatching((x) => (x % 4) == 0, defaultValue: () => 999);
f.then(expectAsync1((v) { Expect.equals(999, v); }));
sentEvents.replay(c);
});
test("singleMatching", () {
StreamController c = new StreamController();
Future f = c.singleMatching((x) => (x % 9) == 0);
Future f = c.stream.singleMatching((x) => (x % 9) == 0);
f.then(expectAsync1((v) { Expect.equals(9, v); }));
sentEvents.replay(c);
});
test("singleMatching 2", () {
StreamController c = new StreamController();
Future f = c.singleMatching((x) => (x % 3) == 0); // Matches both 9 and 87..
Future f = c.stream.singleMatching((x) => (x % 3) == 0); // Matches both 9 and 87..
f.catchError(expectAsync1((e) { Expect.isTrue(e.error is StateError); }));
sentEvents.replay(c);
});
test("first", () {
StreamController c = new StreamController();
Future f = c.first;
Future f = c.stream.first;
f.then(expectAsync1((v) { Expect.equals(7, v);}));
sentEvents.replay(c);
});
test("first empty", () {
StreamController c = new StreamController();
Future f = c.first;
Future f = c.stream.first;
f.catchError(expectAsync1((e) { Expect.isTrue(e.error is StateError); }));
Events emptyEvents = new Events()..close();
emptyEvents.replay(c);
@ -224,7 +230,7 @@ testExtraMethods() {
test("first error", () {
StreamController c = new StreamController();
Future f = c.first;
Future f = c.stream.first;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..close();
errorEvents.replay(c);
@ -232,7 +238,7 @@ testExtraMethods() {
test("first error 2", () {
StreamController c = new StreamController();
Future f = c.first;
Future f = c.stream.first;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..error("error2")..close();
errorEvents.replay(c);
@ -240,14 +246,14 @@ testExtraMethods() {
test("last", () {
StreamController c = new StreamController();
Future f = c.last;
Future f = c.stream.last;
f.then(expectAsync1((v) { Expect.equals(87, v);}));
sentEvents.replay(c);
});
test("last empty", () {
StreamController c = new StreamController();
Future f = c.last;
Future f = c.stream.last;
f.catchError(expectAsync1((e) { Expect.isTrue(e.error is StateError); }));
Events emptyEvents = new Events()..close();
emptyEvents.replay(c);
@ -255,7 +261,7 @@ testExtraMethods() {
test("last error", () {
StreamController c = new StreamController();
Future f = c.last;
Future f = c.stream.last;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..close();
errorEvents.replay(c);
@ -263,7 +269,7 @@ testExtraMethods() {
test("last error 2", () {
StreamController c = new StreamController();
Future f = c.last;
Future f = c.stream.last;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..error("error2")..close();
errorEvents.replay(c);
@ -271,14 +277,14 @@ testExtraMethods() {
test("elementAt", () {
StreamController c = new StreamController();
Future f = c.elementAt(2);
Future f = c.stream.elementAt(2);
f.then(expectAsync1((v) { Expect.equals(13, v);}));
sentEvents.replay(c);
});
test("elementAt 2", () {
StreamController c = new StreamController();
Future f = c.elementAt(20);
Future f = c.stream.elementAt(20);
f.catchError(expectAsync1((e) { Expect.isTrue(e.error is StateError); }));
sentEvents.replay(c);
});
@ -287,7 +293,7 @@ testExtraMethods() {
testPause() {
test("pause event-unpause", () {
StreamController c = new StreamController();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
Events expectedEvents = new Events();
expectedEvents.add(42);
c.add(42);
@ -305,7 +311,7 @@ testPause() {
test("pause twice event-unpause", () {
StreamController c = new StreamController();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
Events expectedEvents = new Events();
expectedEvents.add(42);
c.add(42);
@ -327,7 +333,7 @@ testPause() {
test("pause twice direct-unpause", () {
StreamController c = new StreamController();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
Events expectedEvents = new Events();
expectedEvents.add(42);
c.add(42);
@ -349,7 +355,7 @@ testPause() {
test("pause twice direct-event-unpause", () {
StreamController c = new StreamController();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
Events expectedEvents = new Events();
expectedEvents.add(42);
c.add(42);
@ -372,7 +378,7 @@ testPause() {
test("pause twice direct-unpause", () {
StreamController c = new StreamController();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
Events expectedEvents = new Events();
expectedEvents.add(42);
c.add(42);

View file

@ -10,58 +10,58 @@ import 'event_helper.dart';
testMultiController() {
// Test normal flow.
var c = new StreamController.multiSubscription();
var c = new StreamController.broadcast();
Events expectedEvents = new Events()
..add(42)
..add("dibs")
..error("error!")
..error("error too!")
..close();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
expectedEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test automatic unsubscription on error.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
expectedEvents = new Events()..add(42)..error("error");
actualEvents = new Events.capture(c, unsubscribeOnError: true);
actualEvents = new Events.capture(c.stream, unsubscribeOnError: true);
Events sentEvents =
new Events()..add(42)..error("error")..add("Are you there?");
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test manual unsubscription.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
expectedEvents = new Events()..add(42)..error("error")..add(37);
actualEvents = new Events.capture(c, unsubscribeOnError: false);
actualEvents = new Events.capture(c.stream, unsubscribeOnError: false);
expectedEvents.replay(c);
actualEvents.subscription.cancel();
c.add("Are you there"); // Not sent to actualEvents.
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test filter.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
expectedEvents = new Events()
..add("a string")..add("another string")..close();
sentEvents = new Events()
..add("a string")..add(42)..add("another string")..close();
actualEvents = new Events.capture(c.where((v) => v is String));
actualEvents = new Events.capture(c.stream.where((v) => v is String));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test map.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
expectedEvents = new Events()..add("abab")..error("error")..close();
sentEvents = new Events()..add("ab")..error("error")..close();
actualEvents = new Events.capture(c.mappedBy((v) => "$v$v"));
actualEvents = new Events.capture(c.stream.mappedBy((v) => "$v$v"));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test handleError.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
expectedEvents = new Events()..add("ab")..error("[foo]");
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
actualEvents = new Events.capture(c.handleError((v) {
actualEvents = new Events.capture(c.stream.handleError((v) {
if (v.error is String) {
throw new AsyncError("[${v.error}]",
"other stack");
@ -73,13 +73,13 @@ testMultiController() {
// reduce is tested asynchronously and therefore not in this file.
// Test expand
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
sentEvents = new Events()..add(3)..add(2)..add(4)..close();
expectedEvents = new Events()..add(1)..add(2)..add(3)
..add(1)..add(2)
..add(1)..add(2)..add(3)..add(4)
..close();
actualEvents = new Events.capture(c.expand((v) {
actualEvents = new Events.capture(c.stream.expand((v) {
var l = [];
for (int i = 0; i < v; i++) l.add(i + 1);
return l;
@ -88,22 +88,23 @@ testMultiController() {
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test transform.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
expectedEvents =
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
actualEvents = new Events.capture(c.transform(new StreamTransformer.from(
onData: (v, s) { s.signalError(new AsyncError(v)); },
onError: (e, s) { s.add(e.error); },
onDone: (s) {
s.add("foo");
s.close();
})));
actualEvents = new Events.capture(c.stream.transform(
new StreamTransformer.from(
onData: (v, s) { s.signalError(new AsyncError(v)); },
onError: (e, s) { s.add(e.error); },
onDone: (s) {
s.add("foo");
s.close();
})));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test multiple filters.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
sentEvents = new Events()..add(42)
..add("snugglefluffy")
..add(7)
@ -112,7 +113,7 @@ testMultiController() {
..close();
expectedEvents = new Events()..add(42)..error("not FormatException");
actualEvents = new Events.capture(
c.where((v) => v is String)
c.stream.where((v) => v is String)
.mappedBy((v) => int.parse(v))
.handleError((v) {
if (v.error is! FormatException) throw v;
@ -123,7 +124,7 @@ testMultiController() {
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test subscription changes while firing.
c = new StreamController.multiSubscription();
c = new StreamController.broadcast();
var sink = c.sink;
var stream = c.stream;
var counter = 0;
@ -157,14 +158,14 @@ testSingleController() {
..error("error!")
..error("error too!")
..close();
Events actualEvents = new Events.capture(c);
Events actualEvents = new Events.capture(c.stream);
expectedEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
// Test automatic unsubscription on error.
c = new StreamController();
expectedEvents = new Events()..add(42)..error("error");
actualEvents = new Events.capture(c, unsubscribeOnError: true);
actualEvents = new Events.capture(c.stream, unsubscribeOnError: true);
Events sentEvents =
new Events()..add(42)..error("error")..add("Are you there?");
sentEvents.replay(c);
@ -173,7 +174,7 @@ testSingleController() {
// Test manual unsubscription.
c = new StreamController();
expectedEvents = new Events()..add(42)..error("error")..add(37);
actualEvents = new Events.capture(c, unsubscribeOnError: false);
actualEvents = new Events.capture(c.stream, unsubscribeOnError: false);
expectedEvents.replay(c);
actualEvents.subscription.cancel();
c.add("Are you there"); // Not sent to actualEvents.
@ -185,7 +186,7 @@ testSingleController() {
..add("a string")..add("another string")..close();
sentEvents = new Events()
..add("a string")..add(42)..add("another string")..close();
actualEvents = new Events.capture(c.where((v) => v is String));
actualEvents = new Events.capture(c.stream.where((v) => v is String));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
@ -193,7 +194,7 @@ testSingleController() {
c = new StreamController();
expectedEvents = new Events()..add("abab")..error("error")..close();
sentEvents = new Events()..add("ab")..error("error")..close();
actualEvents = new Events.capture(c.mappedBy((v) => "$v$v"));
actualEvents = new Events.capture(c.stream.mappedBy((v) => "$v$v"));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
@ -201,7 +202,7 @@ testSingleController() {
c = new StreamController();
expectedEvents = new Events()..add("ab")..error("[foo]");
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
actualEvents = new Events.capture(c.handleError((v) {
actualEvents = new Events.capture(c.stream.handleError((v) {
if (v.error is String) {
throw new AsyncError("[${v.error}]",
"other stack");
@ -219,7 +220,7 @@ testSingleController() {
..add(1)..add(2)
..add(1)..add(2)..add(3)..add(4)
..close();
actualEvents = new Events.capture(c.expand((v) {
actualEvents = new Events.capture(c.stream.expand((v) {
var l = [];
for (int i = 0; i < v; i++) l.add(i + 1);
return l;
@ -230,7 +231,7 @@ testSingleController() {
// pipe is tested asynchronously and therefore not in this file.
c = new StreamController();
var list = <int>[];
c.pipeInto(new CollectionSink<int>(list))
c.stream.pipeInto(new CollectionSink<int>(list))
.whenComplete(() { Expect.listEquals(<int>[1,2,9,3,9], list); });
c.add(1);
c.add(2);
@ -244,13 +245,14 @@ testSingleController() {
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
expectedEvents =
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
actualEvents = new Events.capture(c.transform(new StreamTransformer.from(
onData: (v, s) { s.signalError(new AsyncError(v)); },
onError: (e, s) { s.add(e.error); },
onDone: (s) {
s.add("foo");
s.close();
})));
actualEvents = new Events.capture(c.stream.transform(
new StreamTransformer.from(
onData: (v, s) { s.signalError(new AsyncError(v)); },
onError: (e, s) { s.add(e.error); },
onDone: (s) {
s.add("foo");
s.close();
})));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
@ -264,7 +266,7 @@ testSingleController() {
..close();
expectedEvents = new Events()..add(42)..error("not FormatException");
actualEvents = new Events.capture(
c.where((v) => v is String)
c.stream.where((v) => v is String)
.mappedBy((v) => int.parse(v))
.handleError((v) {
if (v.error is! FormatException) throw v;
@ -291,46 +293,46 @@ testExtraMethods() {
var c = new StreamController();
Events expectedEvents = new Events()..add(3)..close();
Events actualEvents = new Events.capture(c.skip(2));
Events actualEvents = new Events.capture(c.stream.skip(2));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = new Events()..close();
actualEvents = new Events.capture(c.skip(3));
actualEvents = new Events.capture(c.stream.skip(3));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = new Events()..close();
actualEvents = new Events.capture(c.skip(7));
actualEvents = new Events.capture(c.stream.skip(7));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = sentEvents;
actualEvents = new Events.capture(c.skip(0));
actualEvents = new Events.capture(c.stream.skip(0));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = new Events()..add(3)..close();
actualEvents = new Events.capture(c.skipWhile((x) => x <= 2));
actualEvents = new Events.capture(c.stream.skipWhile((x) => x <= 2));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = new Events()..add(1)..add(2)..close();
actualEvents = new Events.capture(c.take(2));
actualEvents = new Events.capture(c.stream.take(2));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
c = new StreamController();
expectedEvents = new Events()..add(1)..add(2)..close();
actualEvents = new Events.capture(c.takeWhile((x) => x <= 2));
actualEvents = new Events.capture(c.stream.takeWhile((x) => x <= 2));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
@ -339,7 +341,7 @@ testExtraMethods() {
..add(1)..add(1)..add(2)..add(1)..add(2)..add(2)..add(2)..close();
expectedEvents = new Events()
..add(1)..add(2)..add(1)..add(2)..close();
actualEvents = new Events.capture(c.distinct());
actualEvents = new Events.capture(c.stream.distinct());
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
@ -349,7 +351,7 @@ testExtraMethods() {
expectedEvents = new Events()
..add(5)..add(4)..add(3)..add(1)..close();
// Use 'distinct' as a filter with access to the previously emitted event.
actualEvents = new Events.capture(c.distinct((a, b) => a < b));
actualEvents = new Events.capture(c.stream.distinct((a, b) => a < b));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}

View file

@ -18,13 +18,13 @@ main() {
testMinMax(name, iterable, min, max, [int compare(a, b)]) {
test("$name-min", () {
StreamController c = new StreamController();
Future f = c.min(compare);
Future f = c.stream.min(compare);
f.then(expectAsync1((v) { Expect.equals(min, v);}));
new Events.fromIterable(iterable).replay(c);
});
test("$name-max", () {
StreamController c = new StreamController();
Future f = c.max(compare);
Future f = c.stream.max(compare);
f.then(expectAsync1((v) { Expect.equals(max, v);}));
new Events.fromIterable(iterable).replay(c);
});

View file

@ -13,21 +13,21 @@ import 'event_helper.dart';
main() {
test("single", () {
StreamController c = new StreamController();
Future f = c.single;
Future f = c.stream.single;
f.then(expectAsync1((v) { Expect.equals(42, v);}));
new Events.fromIterable([42]).replay(c);
});
test("single empty", () {
StreamController c = new StreamController();
Future f = c.single;
Future f = c.stream.single;
f.catchError(expectAsync1((e) { Expect.isTrue(e.error is StateError); }));
new Events.fromIterable([]).replay(c);
});
test("single error", () {
StreamController c = new StreamController();
Future f = c.single;
Future f = c.stream.single;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..close();
errorEvents.replay(c);
@ -35,7 +35,7 @@ main() {
test("single error 2", () {
StreamController c = new StreamController();
Future f = c.single;
Future f = c.stream.single;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..error("error")..error("error2")..close();
errorEvents.replay(c);
@ -43,7 +43,7 @@ main() {
test("single error 3", () {
StreamController c = new StreamController();
Future f = c.single;
Future f = c.stream.single;
f.catchError(expectAsync1((e) { Expect.equals("error", e.error); }));
Events errorEvents = new Events()..add(499)..error("error")..close();
errorEvents.replay(c);

View file

@ -13,7 +13,7 @@ import 'event_helper.dart';
main() {
test("tomulti 1", () {
StreamController c = new StreamController<int>();
Stream<int> multi = c.stream.asMultiSubscriberStream();
Stream<int> multi = c.stream.asBroadcastStream();
// Listen twice.
multi.listen(expectAsync1((v) => Expect.equals(42, v)));
multi.listen(expectAsync1((v) => Expect.equals(42, v)));
@ -22,7 +22,7 @@ main() {
test("tomulti 2", () {
StreamController c = new StreamController<int>();
Stream<int> multi = c.stream.asMultiSubscriberStream();
Stream<int> multi = c.stream.asBroadcastStream();
Events expected = new Events.fromIterable([1, 2, 3, 4, 5]);
Events actual1 = new Events.capture(multi);
Events actual2 = new Events.capture(multi);
@ -36,8 +36,8 @@ main() {
});
test("tomulti no-op", () {
StreamController c = new StreamController<int>.multiSubscription();
Stream<int> multi = c.stream.asMultiSubscriberStream();
StreamController c = new StreamController<int>.broadcast();
Stream<int> multi = c.stream.asBroadcastStream();
Events expected = new Events.fromIterable([1, 2, 3, 4, 5]);
Events actual1 = new Events.capture(multi);
Events actual2 = new Events.capture(multi);

View file

@ -233,9 +233,9 @@ class _ErrorGroupStream extends Stream {
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
/// [inner].
_ErrorGroupStream(this._group, Stream inner)
: _controller = inner.isSingleSubscription ?
new StreamController() :
new StreamController.multiSubscription() {
: _controller = inner.isBroadcast ?
new StreamController.broadcast() :
new StreamController() {
_subscription = inner.listen(_controller.add,
onError: (e) => _group._signalError(e),
onDone: () {

View file

@ -202,7 +202,7 @@ main() {
setUp(() {
errorGroup = new ErrorGroup();
controller = new StreamController.multiSubscription();
controller = new StreamController.broadcast();
stream = errorGroup.registerStream(controller.stream);
});
@ -341,8 +341,8 @@ main() {
setUp(() {
errorGroup = new ErrorGroup();
controller1 = new StreamController.multiSubscription();
controller2 = new StreamController.multiSubscription();
controller1 = new StreamController.broadcast();
controller2 = new StreamController.broadcast();
stream1 = errorGroup.registerStream(controller1.stream);
stream2 = errorGroup.registerStream(controller2.stream);
});
@ -396,7 +396,7 @@ main() {
setUp(() {
errorGroup = new ErrorGroup();
controller = new StreamController.multiSubscription();
controller = new StreamController.broadcast();
stream = errorGroup.registerStream(controller.stream);
completer = new Completer();
future = errorGroup.registerFuture(completer.future);