Make CastStreamSubscription.onData handle a null callback

Fixes #33166

Bug: http://dartbug/com/33166
Change-Id: If2f0f714b96ce592936d3298bb5d84732d34f69b
Reviewed-on: https://dart-review.googlesource.com/56101
Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
Reviewed-by: Leaf Petersen <leafp@google.com>
This commit is contained in:
Lasse R.H. Nielsen 2018-06-18 13:24:17 +00:00 committed by commit-bot@chromium.org
parent f7289aa219
commit 060ee6769e
3 changed files with 70 additions and 9 deletions

View file

@ -13,9 +13,10 @@ class CastStream<S, T> extends Stream<T> {
StreamSubscription<T> listen(void onData(T data),
{Function onError, void onDone(), bool cancelOnError}) {
return new CastStreamSubscription<S, T>(_source.listen(null,
onError: onError, onDone: onDone, cancelOnError: cancelOnError))
..onData(onData);
return new CastStreamSubscription<S, T>(
_source.listen(null, onDone: onDone, cancelOnError: cancelOnError))
..onData(onData)
..onError(onError);
}
Stream<R> cast<R>() => new CastStream<S, R>(_source);
@ -24,23 +25,62 @@ class CastStream<S, T> extends Stream<T> {
class CastStreamSubscription<S, T> implements StreamSubscription<T> {
final StreamSubscription<S> _source;
CastStreamSubscription(this._source);
/// Zone where listen was called.
final Zone _zone = Zone.current;
/// User's data handler. May be null.
void Function(T) _handleData;
/// Copy of _source's handleError so we can report errors in onData.
/// May be null.
Function _handleError;
CastStreamSubscription(this._source) {
_source.onData(_onData);
}
Future cancel() => _source.cancel();
void onData(void handleData(T data)) {
_source
.onData(handleData == null ? null : (S data) => handleData(data as T));
_handleData = handleData == null
? null
: _zone.registerUnaryCallback<dynamic, T>(handleData);
}
void onError(Function handleError) {
_source.onError(handleError);
if (handleError == null) {
_handleError = null;
} else if (handleError is Function(Null, Null)) {
_handleError = _zone
.registerBinaryCallback<dynamic, Object, StackTrace>(handleError);
} else {
_handleError = _zone.registerUnaryCallback<dynamic, Object>(handleError);
}
}
void onDone(void handleDone()) {
_source.onDone(handleDone);
}
void _onData(S data) {
if (_handleData == null) return;
T targetData;
try {
targetData = data as T;
} catch (error, stack) {
if (_handleError == null) {
_zone.handleUncaughtError(error, stack);
} else if (_handleError is Function(Null, Null)) {
_zone.runBinaryGuarded(_handleError, error, stack);
} else {
_zone.runUnaryGuarded(_handleError, error);
}
return;
}
_zone.runUnaryGuarded(_handleData, targetData);
}
void pause([Future resumeSignal]) {
_source.pause(resumeSignal);
}

View file

@ -12,7 +12,8 @@ import 'dart:async'
Stream,
StreamSubscription,
StreamTransformer,
StreamTransformerBase;
StreamTransformerBase,
Zone;
import 'dart:convert' show Converter;
import 'dart:core' hide Symbol;
import 'dart:core' as core;

View file

@ -7,6 +7,26 @@ import "package:expect/expect.dart";
// Regression test for https://github.com/dart-lang/sdk/issues/33166
void main() async {
var stream = new Stream.fromIterable([1, 2, 3]);
Expect.equals(await stream.cast<int>().drain().then((_) => 'Done'), 'Done');
// Check that a `null` data handler (like the one passe by `drain`)
// doesn't crash.
{
var stream = new Stream<Object>.fromIterable([1, 2, 3]);
Expect.equals(await stream.cast<int>().drain().then((_) => 'Done'), 'Done');
}
// Check that type errors go into stream error channel.
{
var stream = new Stream<Object>.fromIterable([1, 2, 3]);
var errors = [];
var done = new Completer();
var subscription = stream.cast<String>().listen((value) {
Expect.fail("Unexpected value: $value");
}, onError: (e, s) {
errors.add(e);
}, onDone: () {
done.complete(null);
});
await done.future;
Expect.equals(3, errors.length);
}
}