Add setters for callbacks on StreamController.

This allows you to create a stream controller at one point and
add or change the callbacks later. This can be useful if you
want to store a stream controller in a final instance field and
also want the controller to call instance methods as callbacks.

R=floitsch@google.com

Review URL: https://codereview.chromium.org//1242023007.
This commit is contained in:
Lasse R.H. Nielsen 2015-07-17 12:03:00 +02:00
parent 3ab5fa1aaa
commit 4a8a844c34
4 changed files with 218 additions and 51 deletions

View file

@ -2,6 +2,10 @@
### Core library changes
* `dart:async`
* `StreamController` added setters for the `onListen`, `onPause`, `onResume`
and `onCancel` callbacks.
* `dart:convert`
* `LineSplitter` added a `split` static method returning an `Iterable`.

View file

@ -67,7 +67,6 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T>
// _onCancel is inherited.
}
abstract class _BroadcastStreamController<T>
implements StreamController<T>,
_StreamControllerLifecycle<T>,
@ -80,8 +79,8 @@ abstract class _BroadcastStreamController<T>
static const int _STATE_CLOSED = 4;
static const int _STATE_ADDSTREAM = 8;
final _NotificationHandler _onListen;
final _NotificationHandler _onCancel;
_NotificationHandler _onListen;
_NotificationHandler _onCancel;
// State of the controller.
int _state;
@ -113,6 +112,20 @@ abstract class _BroadcastStreamController<T>
_next = _previous = this;
}
void set onListen(void onListenHandler()) { _onListen = onListenHandler; }
void set onPause(void onPauseHandler()) {
throw new UnsupportedError(
"Broadcast stream controllers do not support pause callbacks");
}
void set onResume(void onResumeHandler()) {
throw new UnsupportedError(
"Broadcast stream controllers do not support pause callbacks");
}
void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
// StreamController interface.
Stream<T> get stream => new _BroadcastStream<T>(this);

View file

@ -77,12 +77,6 @@ abstract class StreamController<T> implements StreamSink<T> {
void onResume(),
onCancel(),
bool sync: false}) {
if (onListen == null && onPause == null &&
onResume == null && onCancel == null) {
return sync
? new _NoCallbackSyncStreamController<T>()
: new _NoCallbackAsyncStreamController<T>();
}
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
@ -138,6 +132,42 @@ abstract class StreamController<T> implements StreamSink<T> {
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
/**
* Sets the callback which is called when the stream is listened to.
*
* This overrides the previous callback, or clears it if the [onListenHandler]
* is `null`.
*/
void set onListen(void onListenHandler());
/**
* Sets the callback which is called when the stream is paused.
*
* This overrides the previous callback, or clears it if the [onPauseHandler]
* is `null`.
*
* Pause related callbacks are not supported on broadcast stream controllers.
*/
void set onPause(void onPauseHandler());
/**
* Sets the callback which is called when the stream is resumed.
*
* This overrides the previous callback, or clears it if the [onResumeHandler]
* is `null`.
*
* Pause related callbacks are not supported on broadcast stream controllers.
*/
void set onResume(void onResumeHandler());
/**
* Sets the callback which is called when the stream is canceled.
*
* This overrides the previous callback, or clears it if the [onCancelHandler]
* is `null`.
*/
void set onCancel(onCancelHandler());
/**
* Returns a view of this object that only exposes the [StreamSink] interface.
*/
@ -383,12 +413,23 @@ abstract class _StreamController<T> implements StreamController<T>,
// accessed earlier, or if close is called before subscribing.
_Future _doneFuture;
_StreamController();
_NotificationHandler _onListen;
_NotificationHandler _onPause;
_NotificationHandler _onResume;
_NotificationHandler _onCancel;
_NotificationHandler get _onListen;
_NotificationHandler get _onPause;
_NotificationHandler get _onResume;
_NotificationHandler get _onCancel;
_StreamController(void this._onListen(),
void this._onPause(),
void this._onResume(),
this._onCancel());
void set onListen(void onListenHandler()) { _onListen = onListenHandler; }
void set onPause(void onPauseHandler()) { _onPause = onPauseHandler; }
void set onResume(void onResumeHandler()) { _onResume = onResumeHandler; }
void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
// Return a new stream every time. The streams are equal, but not identical.
Stream<T> get stream => new _ControllerStream<T>(this);
@ -722,44 +763,11 @@ abstract class _AsyncStreamControllerDispatch<T>
// TODO(lrn): Use common superclass for callback-controllers when VM supports
// constructors in mixin superclasses.
class _AsyncStreamController<T> extends _StreamController<T>
with _AsyncStreamControllerDispatch<T> {
final _NotificationHandler _onListen;
final _NotificationHandler _onPause;
final _NotificationHandler _onResume;
final _NotificationHandler _onCancel;
class _AsyncStreamController<T> = _StreamController<T>
with _AsyncStreamControllerDispatch<T>;
_AsyncStreamController(void this._onListen(),
void this._onPause(),
void this._onResume(),
this._onCancel());
}
class _SyncStreamController<T> extends _StreamController<T>
with _SyncStreamControllerDispatch<T> {
final _NotificationHandler _onListen;
final _NotificationHandler _onPause;
final _NotificationHandler _onResume;
final _NotificationHandler _onCancel;
_SyncStreamController(void this._onListen(),
void this._onPause(),
void this._onResume(),
this._onCancel());
}
abstract class _NoCallbacks {
_NotificationHandler get _onListen => null;
_NotificationHandler get _onPause => null;
_NotificationHandler get _onResume => null;
_NotificationHandler get _onCancel => null;
}
class _NoCallbackAsyncStreamController<T> = _StreamController<T>
with _AsyncStreamControllerDispatch<T>, _NoCallbacks;
class _NoCallbackSyncStreamController<T> = _StreamController<T>
with _SyncStreamControllerDispatch<T>, _NoCallbacks;
class _SyncStreamController<T> = _StreamController<T>
with _SyncStreamControllerDispatch<T>;
typedef _NotificationHandler();

View file

@ -719,6 +719,144 @@ void testSyncControllerNotReentrant() {
asyncEnd();
}
void testSettingCallbacks() {
const int initial = 0;
const int running = 1;
const int paused = 2;
const int canceled = 3;
var controller = new StreamController();
var stream = controller.stream;
var state = initial;
controller..onListen = () { state = running; }
..onPause = () { state = paused; }
..onResume = () { state = running; }
..onCancel = () { state = canceled; };
Expect.equals(initial, state);
var sub = stream.listen(null);
Expect.equals(running, state);
sub.pause();
Expect.equals(paused, state);
Expect.isTrue(controller.isPaused);
sub.resume();
Expect.equals(running, state);
Expect.isFalse(controller.isPaused);
// Changing them later does make a difference.
controller..onListen = () { throw "Second listen?"; }
..onPause = () { state = -paused; }
..onResume = () { state = -running; }
..onCancel = () { state = -canceled; };
Expect.equals(running, state);
sub.pause();
Expect.equals(-paused, state);
Expect.isTrue(controller.isPaused);
sub.resume();
Expect.equals(-running, state);
Expect.isFalse(controller.isPaused);
sub.cancel();
Expect.equals(-canceled, state);
}
void testSettingNullCallbacks() {
failCallback() => fail("Callback should not be called");
var controller = new StreamController(onListen: failCallback,
onPause : failCallback,
onResume: failCallback,
onCancel: failCallback);
var stream = controller.stream;
Expect.isFalse(controller.hasListener);
Expect.isTrue(controller.isPaused);
controller.onListen = null;
var sub = stream.listen(null);
Expect.isTrue(controller.hasListener);
Expect.isFalse(controller.isPaused);
controller.onPause = null;
sub.pause();
Expect.isTrue(controller.hasListener);
Expect.isTrue(controller.isPaused);
controller.onResume = null;
sub.resume();
Expect.isTrue(controller.hasListener);
Expect.isFalse(controller.isPaused);
controller.onCancel = null;
sub.cancel();
Expect.isFalse(controller.hasListener);
Expect.isFalse(controller.isPaused);
}
void testBroadcastSettingCallbacks() {
const int initial = 0;
const int running = 1;
const int canceled = 2;
var controller = new StreamController.broadcast();
var stream = controller.stream;
var state = initial;
Expect.throws(() { controller.onPause = (){}; },
(e) => e is UnsupportedError);
Expect.throws(() { controller.onResume = (){}; },
(e) => e is UnsupportedError);
controller..onListen = () { state = running; }
..onCancel = () { state = canceled; };
Expect.equals(initial, state);
var sub = stream.listen(null);
Expect.equals(running, state);
sub.cancel();
Expect.equals(canceled, state);
// Changing them later does make a difference.
controller..onListen = () { state = -running; }
..onCancel = () { state = -canceled; };
var sub2 = stream.listen(null);
Expect.equals(-running, state);
sub2.cancel();
Expect.equals(-canceled, state);
}
void testBroadcastSettingNullCallbacks() {
failCallback() => fail("Callback should not be called");
var controller = new StreamController.broadcast(onListen: failCallback,
onCancel: failCallback);
var stream = controller.stream;
Expect.isFalse(controller.hasListener);
controller.onListen = null;
var sub = stream.listen(null);
Expect.isTrue(controller.hasListener);
controller.onCancel = null;
sub.cancel();
Expect.isFalse(controller.hasListener);
}
main() {
asyncStart();
testMultiController();
@ -738,5 +876,9 @@ main() {
testAsBroadcastListenAfterClosePaused();
testEventInListen();
testSyncControllerNotReentrant();
testSettingCallbacks();
testSettingNullCallbacks();
testBroadcastSettingCallbacks();
testBroadcastSettingNullCallbacks();
asyncEnd();
}