Fix Stream.distinct.

Due to incorrectly shared state, a broadcast distinct stream listened to
more than once would give incorrect results.
Also update docs.

Fixes #29638, #29627.
BUG= http://dartbug.com/29638, http://dartbug.com/29627
R=floitsch@google.com

Review-Url: https://codereview.chromium.org/2885993005 .
This commit is contained in:
Lasse Reichstein Holst Nielsen 2017-05-18 16:53:06 +02:00
parent 091752f158
commit 25a770fc97
3 changed files with 157 additions and 10 deletions

View file

@ -1022,10 +1022,17 @@ abstract class Stream<T> {
*
* The returned stream provides the same events as this stream, except
* that it never provides two consecutive data events that are equal.
* That is, errors are passed through to the returned stream, and
* data events are passed through if they are distinct from the most
* recently emitted data event.
*
* Equality is determined by the provided [equals] method. If that is
* omitted, the '==' operator on the last provided data element is used.
*
* If [equals] throws, the data event is replaced by an error event
* containing the thrown error. The behavior is equivalent to the
* original stream emitting the error event.
*
* The returned stream is a broadcast stream if this stream is.
* If a broadcast stream is listened to more than once, each subscription
* will individually perform the `equals` test.

View file

@ -338,7 +338,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
/**
* A [_ForwardingStreamSubscription] with one extra state field.
*
* Use by several different classes, some storing an integer, others a bool.
* Use by several different classes, storing an integer, bool or general.
*/
class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
// Raw state field. Typed access provided by getters and setters below.
@ -357,6 +357,11 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
void set _count(int count) {
_sharedState = count;
}
Object get _value => _sharedState;
void set _value(Object value) {
_sharedState = value;
}
}
class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
@ -453,32 +458,41 @@ typedef bool _Equality<T>(T a, T b);
class _DistinctStream<T> extends _ForwardingStream<T, T> {
static var _SENTINEL = new Object();
_Equality<T> _equals;
var _previous = _SENTINEL;
final _Equality<T> _equals;
_DistinctStream(Stream<T> source, bool equals(T a, T b))
: _equals = equals,
super(source);
StreamSubscription<T> _createSubscription(void onData(T data),
Function onError, void onDone(), bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _SENTINEL);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
if (identical(_previous, _SENTINEL)) {
_previous = inputEvent;
return sink._add(inputEvent);
_StateStreamSubscription<T> subscription = sink;
var previous = subscription._value;
if (identical(previous, _SENTINEL)) {
// First event.
subscription._value = inputEvent;
sink._add(inputEvent);
} else {
T previousEvent = previous;
bool isEqual;
try {
if (_equals == null) {
isEqual = (_previous == inputEvent);
isEqual = (previousEvent == inputEvent);
} else {
isEqual = _equals(_previous as Object/*=T*/, inputEvent);
isEqual = _equals(previousEvent, inputEvent);
}
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return null;
return;
}
if (!isEqual) {
sink._add(inputEvent);
_previous = inputEvent;
subscription._value = inputEvent;
}
}
}

View file

@ -0,0 +1,126 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
class A {
const A();
}
class B extends A {
const B();
}
main() {
asyncStart();
// Correct behavior.
for (var eq in [null, (a, b) => a == b]) {
checkStream(mkSingleStream, eq, "single");
checkBroadcastStream(mkBroadcastStream, eq, "broadcast");
checkBroadcastStream(
() => mkSingleStream().asBroadcastStream(), eq, "asBroadcast");
}
// Regression test. Multiple listens on the same broadcast distinct stream.
var stream = mkBroadcastStream().distinct();
expectStream(stream, [1, 2, 3, 2], "broadcast.distinct#1");
expectStream(stream, [1, 2, 3, 2], "broadcast.distinct#2");
// Doesn't ignore equality.
expectStream(
new Stream.fromIterable([1, 2, 1, 3, 3]).distinct((a, b) => false),
[1, 2, 1, 3, 3],
"kFalse");
expectStream(
new Stream.fromIterable([1, 2, 1, 3, 3]).distinct((a, b) => true),
[1],
"kTrue");
expectStream(
new Stream.fromIterable([1, 2, 1, 3, 3]).distinct((a, b) => a != b),
[1, 1],
"neq");
expectStream(
new Stream.fromIterable([1, 2, 1, 3, 3]).distinct((a, b) => 2 == b),
[1, 1, 3, 3],
"is2");
// Forwards errors as errors.
expectStream(
new Stream.fromIterable([1, "E1", 2, "E2", 2, 3])
.map((v) => (v is String) ? (throw v) : v) // Make strings errors.
.distinct()
.transform(reifyErrors),
[1, "[E1]", 2, "[E2]", 3],
"errors");
// Equality throwing acts like error.
expectStream(
new Stream.fromIterable([1, "E1", 1, 2, "E2", 3])
.distinct((a, b) => (b is String) ? (throw b) : (a == b))
.transform(reifyErrors),
[1, "[E1]", 2, "[E2]", 3],
"eq-throws");
// Operator== throwing acts like error.
expectStream(
new Stream.fromIterable([1, 1, 2, 2, 1, 3])
.map((v) => new T(v))
.distinct()
.transform(reifyErrors)
.map((v) => v is T ? v.value : "$v"),
[1, "[2]", "[2]", 3],
"==-throws");
asyncEnd();
}
checkStream(mkStream, eq, name) {
expectStream(mkStream().distinct(eq), [1, 2, 3, 2], "$name.distinct");
expectStream(mkStream().expand((e) => [e, e]).distinct(eq), [1, 2, 3, 2],
"$name.expand.distinct");
expectStream(mkStream().where((x) => x != 3).distinct(eq), [1, 2],
"$name.where.distinct");
}
checkBroadcastStream(mkStream, eq, name) {
var stream = mkStream();
// Run all the tests, multiple times each.
checkStream(() => stream, eq, "$name#1");
checkStream(() => stream, eq, "$name#2");
}
mkSingleStream() async* {
yield 1;
yield 2;
yield 3;
yield 2;
}
mkBroadcastStream() {
var c = new StreamController.broadcast();
c.onListen = () {
c.addStream(mkSingleStream()).whenComplete(c.close);
};
return c.stream;
}
expectStream(stream, list, [name]) {
asyncStart();
return stream.toList().then((events) {
Expect.listEquals(list, events, name);
asyncEnd();
});
}
// Class where operator== throws.
class T {
final int value;
T(this.value);
int get hashCode => value.hashCode;
bool operator ==(Object other) =>
other is T && ((other.value == 2) ? throw 2 : (value == other.value));
}
final reifyErrors =
new StreamTransformer.fromHandlers(handleError: (e, s, sink) {
sink.add("[$e]");
});