Warn when adding something to a closed sink and improve documentation.

Adding to a closed sink will throw once https://codereview.chromium.org/2857393003/ is committed.

Also:
Fixes #29122.

BUG= http://dartbug.com/29122
R=lrn@google.com, whesse@google.com

Committed: 8e19729b89
Reverted: 8f3ae7e7e1
Committed: c0090fa01f
Reverted: a4eb8b8fe3
Review-Url: https://codereview.chromium.org/2822173002 .
This commit is contained in:
Florian Loitsch 2017-05-08 15:52:15 +02:00
parent 959b7c14b8
commit 5202df2238
8 changed files with 122 additions and 27 deletions

View file

@ -34,6 +34,11 @@ entirely to allow inference to fill in the type.
* `dart:async`
* Add `groupBy` to `Stream`. Allows splitting a string into separate streams
depending on "key" property computed from the individual events.
* `dart:async`, `dart:io`, `dart:core`
* Adding to a closed sink, including `IOSink`, is not allowed anymore. In
1.24, violations are only reported (on stdout or stderr), but a future
version of the Dart SDK will change this to throwing a `StateError`.
### Dart VM

View file

@ -1532,16 +1532,37 @@ abstract class StreamSubscription<T> {
}
/**
* An interface that abstracts creation or handling of [Stream] events.
* A [Sink] that supports adding errors.
*
* This makes it suitable for capturing the results of asynchronous
* computations, which can complete with a value or an error.
*
* The [EventSink] has been designed to handle asynchronous events from
* [Stream]s. See, for example, [Stream.eventTransformed] which uses
* `EventSink`s to transform events.
*/
abstract class EventSink<T> implements Sink<T> {
/** Send a data event to a stream. */
/**
* Adds a data [event] to the sink.
*
* Must not be called on a closed sink.
*/
void add(T event);
/** Send an async error to a stream. */
void addError(Object errorEvent, [StackTrace stackTrace]);
/**
* Adds an [error] to the sink.
*
* Must not be called on a closed sink.
*/
void addError(Object error, [StackTrace stackTrace]);
/** Close the sink. No further events can be added after closing. */
/**
* Closes the sink.
*
* Calling this method more than once is allowed, but does nothing.
*
* Neither [add] nor [addError] must be called after this method.
*/
void close();
}
@ -1574,10 +1595,6 @@ class StreamView<T> extends Stream<T> {
* and when no further data need to be added, the [close] method tells the
* consumer to complete its work and shut down.
*
* This class is not just a [Sink<Stream>] because it is also combined with
* other [Sink] classes, like it's combined with [EventSink] in the
* [StreamSink] class.
*
* The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
* to the consumer's [addStream] method. When that completes, it will
* call [close] and then complete its own returned future.
@ -1619,8 +1636,7 @@ abstract class StreamConsumer<S> {
/**
* A object that accepts stream events both synchronously and asynchronously.
*
* A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
* the synchronous methods from [EventSink].
* A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
*
* The [EventSink] methods can't be used while the [addStream] is called.
* As soon as the [addStream]'s [Future] completes with a value, the

View file

@ -245,12 +245,41 @@ abstract class StreamController<T> implements StreamSink<T> {
bool get hasListener;
/**
* Send or enqueue an error event.
* Sends a data [event].
*
* Listeners receive this event in a later microtask.
*
* Note that a synchronous controller (created by passing true to the `sync`
* parameter of the `StreamController` constructor) delivers events
* immediately. Since this behavior violates the contract mentioned here,
* synchronous controllers should only be used as described in the
* documentation to ensure that the delivered events always *appear* as if
* they were delivered in a separate microtask.
*/
void add(T event);
/**
* Sends or enqueues an error event.
*
* If [error] is `null`, it is replaced by a [NullThrownError].
*
* Listeners receive this event at a later microtask. This behavior can be
* overridden by using `sync` controllers. Note, however, that sync
* controllers have to satisfy the preconditions mentioned in the
* documentation of the constructors.
*/
void addError(Object error, [StackTrace stackTrace]);
/**
* Closes the stream.
*
* Listeners receive the done event at a later microtask. This behavior can be
* overridden by using `sync` controllers. Note, however, that sync
* controllers have to satisfy the preconditions mentioned in the
* documentation of the constructors.
*/
Future close();
/**
* Receives events from [source] and puts them into this controller's stream.
*

View file

@ -209,12 +209,29 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
final _TransformDoneHandler<T> _handleDone;
/// The output sink where the handlers should send their data into.
final EventSink<T> _sink;
EventSink<T> _sink;
_HandlerEventSink(
this._handleData, this._handleError, this._handleDone, this._sink);
this._handleData, this._handleError, this._handleDone, this._sink) {
if (_sink == null) {
throw new ArgumentError("The provided sink must not be null.");
}
}
bool get _isClosed => _sink == null;
_reportClosedSink() {
// TODO(29554): throw a StateError, and don't just report the problem.
Zone.ROOT
..print("Sink is closed and adding to it is an error.")
..print(" See http://dartbug.com/29554.")
..print(StackTrace.current.toString());
}
void add(S data) {
if (_isClosed) {
_reportClosedSink();
}
if (_handleData != null) {
_handleData(data, _sink);
} else {
@ -223,6 +240,9 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
}
void addError(Object error, [StackTrace stackTrace]) {
if (_isClosed) {
_reportClosedSink();
}
if (_handleError != null) {
_handleError(error, stackTrace, _sink);
} else {
@ -231,10 +251,13 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
}
void close() {
if (_isClosed) return;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(_sink);
_handleDone(sink);
} else {
_sink.close();
sink.close();
}
}
}

View file

@ -14,18 +14,18 @@ part of dart.core;
*/
abstract class Sink<T> {
/**
* Put the data into the sink.
* Adds [data] to the sink.
*
* Must not be called after a call to [close].
*/
void add(T data);
/**
* Tell the sink that no further data will be added.
*
* Calling this method more than once is allowed, but does nothing.
* Closes the sink.
*
* The [add] method must not be called after this method.
*
* Calling this method more than once is allowed, but does nothing.
*/
void close();
}

View file

@ -15,8 +15,7 @@ part of dart.io;
* While a stream is being added using [addStream], any further attempts
* to add or write to the [IOSink] will fail until the [addStream] completes.
*
* If data is added to the [IOSink] after the sink is closed, the data will be
* ignored. Use the [done] future to be notified when the [IOSink] is closed.
* It is an error to add data to the [IOSink] after the sink is closed.
*/
abstract class IOSink implements StreamSink<List<int>>, StringSink {
/**
@ -147,12 +146,33 @@ class _StreamSinkImpl<T> implements StreamSink<T> {
_StreamSinkImpl(this._target);
void _reportClosedSink() {
// TODO(29554): this is very brittle and depends on the layout of the
// stderr class.
if (this == stderr._sink) {
// We can't report on stderr anymore (as we would otherwise
// have an infinite recursion.
throw new StateError("Stderr is closed.");
}
// TODO(29554): throw a StateError, and don't just report the problem.
stderr.writeln("StreamSink is closed and adding to it is an error.");
stderr.writeln(" See http://dartbug.com/29554.");
stderr.writeln(StackTrace.current);
}
void add(T data) {
if (_isClosed) return;
if (_isClosed) {
_reportClosedSink();
return;
}
_controller.add(data);
}
void addError(error, [StackTrace stackTrace]) {
if (_isClosed) {
_reportClosedSink();
return;
}
_controller.addError(error, stackTrace);
}

View file

@ -58,6 +58,8 @@ abstract class SecureSocket implements Socket {
* `pause` on this subscription before starting TLS handshake is
* the right thing to do.
*
* The given [socket] is closed and may not be used anymore.
*
* If the [host] argument is passed it will be used as the host name
* for the TLS handshake. If [host] is not passed the host name from
* the [socket] will be used. The [host] can be either a [String] or

View file

@ -103,10 +103,10 @@ io/file_constructor_test: fail
# This is runtime test.
io/process_exit_negative_test: Skip
io/http_parser_test: Pass, CompileTimeError # Issue 28843
io/http_headers_test: Pass, CompileTimeError # Issue 28843
io/http_cookie_date_test: Pass, CompileTimeError # Issue 28843
io/web_socket_protocol_processor_test: Pass, CompileTimeError # Issue 28843
io/http_parser_test: Pass, StaticWarning, CompileTimeError # Issue 28843
io/http_headers_test: Pass, StaticWarning, CompileTimeError # Issue 28843
io/http_cookie_date_test: Pass, StaticWarning, CompileTimeError # Issue 28843
io/web_socket_protocol_processor_test: Pass, StaticWarning, CompileTimeError # Issue 28843
[ $compiler == dart2analyzer && $builder_tag == strong ]
*: Skip # Issue 28649