Throw when adding something to a closed sink and improve documentation.

As port of this CL all changes of https://codereview.chromium.org/20036002 are reverted.

Also fixes issue 29122.

BUG= http://dartbug.com/29122
R=lrn@google.com, whesse@google.com

Review-Url: https://codereview.chromium.org/2822173002 .
This commit is contained in:
Florian Loitsch 2017-05-04 11:42:18 +02:00
parent f85091f5d7
commit 8e19729b89
12 changed files with 113 additions and 35 deletions

View file

@ -17,6 +17,8 @@
* JSON maps are now typed as `Map<String, dynamic>` instead of
`Map<dynamic, dynamic>`. A JSON-map is not a `HashMap` or `LinkedHashMap`
anymore (but just a `Map`).
* `dart:async`, `dart:io`, `dart:core`
* Adding to a closed sink, including `IOSink`, now throws.
### Dart VM

View file

@ -1462,16 +1462,37 @@ abstract class StreamSubscription<T> {
}
/**
* An interface that abstracts creation or handling of [Stream] events.
* A [Sink] that supports adding errors.
*
* This makes it suitable for capturing the results of asynchronous
* computations, which can complete with a value or an error.
*
* The [EventSink] has been designed to handle asynchronous events from
* [Stream]s. See, for example, [Stream.eventTransformed] which uses
* `EventSink`s to transform events.
*/
abstract class EventSink<T> implements Sink<T> {
/** Send a data event to a stream. */
/**
* Adds a data [event] to the sink.
*
* Must not be called on a closed sink.
*/
void add(T event);
/** Send an async error to a stream. */
void addError(Object errorEvent, [StackTrace stackTrace]);
/**
* Adds an [error] to the sink.
*
* Must not be called on a closed sink.
*/
void addError(Object error, [StackTrace stackTrace]);
/** Close the sink. No further events can be added after closing. */
/**
* Closes the sink.
*
* Calling this method more than once is allowed, but does nothing.
*
* Neither [add] nor [addError] must be called after this method.
*/
void close();
}
@ -1504,10 +1525,6 @@ class StreamView<T> extends Stream<T> {
* and when no further data need to be added, the [close] method tells the
* consumer to complete its work and shut down.
*
* This class is not just a [Sink<Stream>] because it is also combined with
* other [Sink] classes, like it's combined with [EventSink] in the
* [StreamSink] class.
*
* The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
* to the consumer's [addStream] method. When that completes, it will
* call [close] and then complete its own returned future.
@ -1549,8 +1566,7 @@ abstract class StreamConsumer<S> {
/**
* A object that accepts stream events both synchronously and asynchronously.
*
* A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
* the synchronous methods from [EventSink].
* A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
*
* The [EventSink] methods can't be used while the [addStream] is called.
* As soon as the [addStream]'s [Future] completes with a value, the

View file

@ -245,12 +245,41 @@ abstract class StreamController<T> implements StreamSink<T> {
bool get hasListener;
/**
* Send or enqueue an error event.
* Sends a data [event].
*
* Listeners receive this event in a later microtask.
*
* Note that a synchronous controller (created by passing true to the `sync`
* parameter of the `StreamController` constructor) delivers events
* immediately. Since this behavior violates the contract mentioned here,
* synchronous controllers should only be used as described in the
* documentation to ensure that the delivered events always *appear* as if
* they were delivered in a separate microtask.
*/
void add(T event);
/**
* Sends or enqueues an error event.
*
* If [error] is `null`, it is replaced by a [NullThrownError].
*
* Listeners receive this event at a later microtask. This behavior can be
* overridden by using `sync` controllers. Note, however, that sync
* controllers have to satisfy the preconditions mentioned in the
* documentation of the constructors.
*/
void addError(Object error, [StackTrace stackTrace]);
/**
* Closes the stream.
*
* Listeners receive the done event at a later microtask. This behavior can be
* overridden by using `sync` controllers. Note, however, that sync
* controllers have to satisfy the preconditions mentioned in the
* documentation of the constructors.
*/
Future close();
/**
* Receives events from [source] and puts them into this controller's stream.
*

View file

@ -209,12 +209,19 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
final _TransformDoneHandler<T> _handleDone;
/// The output sink where the handlers should send their data into.
final EventSink<T> _sink;
EventSink<T> _sink;
_HandlerEventSink(
this._handleData, this._handleError, this._handleDone, this._sink);
this._handleData, this._handleError, this._handleDone, this._sink) {
if (_sink == null) {
throw new ArgumentError("The provided sink must not be null.");
}
}
bool get _isClosed => _sink == null;
void add(S data) {
if (_isClosed) throw new StateError("Sink is closed");
if (_handleData != null) {
_handleData(data, _sink);
} else {
@ -223,6 +230,7 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
}
void addError(Object error, [StackTrace stackTrace]) {
if (_isClosed) throw new StateError("Sink is closed");
if (_handleError != null) {
_handleError(error, stackTrace, _sink);
} else {
@ -231,10 +239,13 @@ class _HandlerEventSink<S, T> implements EventSink<S> {
}
void close() {
if (_isClosed) return;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(_sink);
_handleDone(sink);
} else {
_sink.close();
sink.close();
}
}
}

View file

@ -14,18 +14,18 @@ part of dart.core;
*/
abstract class Sink<T> {
/**
* Put the data into the sink.
* Adds [data] to the sink.
*
* Must not be called after a call to [close].
*/
void add(T data);
/**
* Tell the sink that no further data will be added.
*
* Calling this method more than once is allowed, but does nothing.
* Closes the sink.
*
* The [add] method must not be called after this method.
*
* Calling this method more than once is allowed, but does nothing.
*/
void close();
}

View file

@ -15,8 +15,7 @@ part of dart.io;
* While a stream is being added using [addStream], any further attempts
* to add or write to the [IOSink] will fail until the [addStream] completes.
*
* If data is added to the [IOSink] after the sink is closed, the data will be
* ignored. Use the [done] future to be notified when the [IOSink] is closed.
* It is an error to add data to the [IOSink] after the sink is closed.
*/
abstract class IOSink implements StreamSink<List<int>>, StringSink {
/**
@ -148,11 +147,12 @@ class _StreamSinkImpl<T> implements StreamSink<T> {
_StreamSinkImpl(this._target);
void add(T data) {
if (_isClosed) return;
if (_isClosed) throw new StateError("StreamSink is closed");
_controller.add(data);
}
void addError(error, [StackTrace stackTrace]) {
if (_isClosed) throw new StateError("StreamSink is closed");
_controller.addError(error, stackTrace);
}

View file

@ -58,6 +58,8 @@ abstract class SecureSocket implements Socket {
* `pause` on this subscription before starting TLS handshake is
* the right thing to do.
*
* The given [socket] is closed and may not be used anymore.
*
* If the [host] argument is passed it will be used as the host name
* for the TLS handshake. If [host] is not passed the host name from
* the [socket] will be used. The [host] can be either a [String] or

View file

@ -988,7 +988,9 @@ class FileTest {
file.createSync();
var output = file.openWrite();
output.close();
output.add(buffer); // Ignored.
Expect.throws(() {
output.add(buffer);
});
output.done.then((_) {
file.deleteSync();
asyncTestDone("testCloseExceptionStream");

View file

@ -29,7 +29,9 @@ void testHttp10NoKeepAlive() {
response.write("Z");
response.write("Z");
response.close();
response.write("x");
Expect.throws(() {
response.write("x");
}, (e) => e is StateError);
}, onError: (e, trace) {
String msg = "Unexpected error $e";
if (trace != null) msg += "\nStackTrace: $trace";

View file

@ -37,7 +37,9 @@ void testNoBody(int totalConnections, bool explicitContentLength) {
// After an explicit close, write becomes a state error
// because we have said we will not add more.
response.close();
response.write("x");
Expect.throws(() {
response.write("x");
}, (e) => e is StateError);
}, onError: (e, trace) {
String msg = "Unexpected server error $e";
if (trace != null) msg += "\nStackTrace: $trace";
@ -89,7 +91,9 @@ void testBody(int totalConnections, bool useHeader) {
}
});
response.close();
response.write("x");
Expect.throws(() {
response.write("x");
}, (e) => e is StateError);
});
}, onError: (e, trace) {
String msg = "Unexpected error $e";
@ -149,7 +153,9 @@ void testBodyChunked(int totalConnections, bool useHeader) {
response.write("x");
response.write("x");
response.close();
response.write("x");
Expect.throws(() {
response.write("x");
}, (e) => e is StateError);
});
}, onError: (e, trace) {
String msg = "Unexpected error $e";

View file

@ -90,6 +90,8 @@ class SocketExceptionTest {
}
Expect.isFalse(exceptionCaught);
Expect.isFalse(wrongExceptionCaught);
// From here exceptions are expected.
try {
List<int> buffer = new List<int>(10);
client.add(buffer);
@ -98,10 +100,8 @@ class SocketExceptionTest {
} catch (ex) {
wrongExceptionCaught = true;
}
Expect.isFalse(exceptionCaught);
Expect.isTrue(exceptionCaught);
Expect.isFalse(wrongExceptionCaught);
// From here exceptions are expected.
exceptionCaught = false;
try {
client.port;

View file

@ -164,7 +164,9 @@ void test(bool hostnameInConnect, bool handshakeBeforeSecure,
SecureSocket.secure(socket, host: HOST, context: clientContext);
}
return future.then((secureSocket) {
socket.add([0]);
Expect.throws(() {
socket.add([0]);
});
return secureSocket;
});
});
@ -179,7 +181,9 @@ void test(bool hostnameInConnect, bool handshakeBeforeSecure,
SecureSocket.secure(socket, host: HOST, context: clientContext);
}
return future.then((secureSocket) {
socket.add([0]);
Expect.throws(() {
socket.add([0]);
});
return secureSocket;
});
});
@ -191,7 +195,9 @@ void test(bool hostnameInConnect, bool handshakeBeforeSecure,
server.listen((client) {
if (!handshakeBeforeSecure) {
SecureSocket.secureServer(client, serverContext).then((secureClient) {
client.add([0]);
Expect.throws(() {
client.add([0]);
});
runServer(secureClient).then((_) => server.close());
});
} else {
@ -199,7 +205,9 @@ void test(bool hostnameInConnect, bool handshakeBeforeSecure,
SecureSocket
.secureServer(client, serverContext, bufferedData: carryOverData)
.then((secureClient) {
client.add([0]);
Expect.throws(() {
client.add([0]);
});
runServer(secureClient).then((_) => server.close());
});
});