StreamConsumer has an addStream and a close functions.

The 'consume' function will be removed.

Review URL: https://codereview.chromium.org//13680002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@20979 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
floitsch@google.com 2013-04-05 16:22:11 +00:00
parent bef8ebb5f6
commit e72250e49b
10 changed files with 143 additions and 20 deletions

View file

@ -31,7 +31,7 @@ class SafeHttpServer extends StreamView<HttpRequest> implements HttpServer {
: super(server),
_inner = server;
void close() => _inner.close();
Future close() => _inner.close();
int get port => _inner.port;
@ -136,7 +136,7 @@ class _HttpResponseWrapper implements HttpResponse {
_inner.consume(stream);
Future<HttpResponse> writeStream(Stream<List<int>> stream) =>
_inner.writeStream(stream);
void close() => _inner.close();
Future close() => _inner.close();
void write(Object obj) => _inner.write(obj);
void writeAll(Iterable objects, [String separator = ""]) =>
_inner.writeAll(objects, separator);

View file

@ -689,6 +689,34 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>, Socket> {
return socket._doneFuture;
}
Future<Socket> addStream(Stream<List<int>> stream) {
Completer completer = new Completer<Socket>();
if (socket._raw != null) {
subscription = stream.listen(
(data) {
assert(!paused);
assert(buffer == null);
buffer = data;
offset = 0;
write();
},
onError: (error) {
socket._consumerDone(error);
completer.completeError(error.error, error.stackTrace);
},
onDone: () {
completer.complete(socket);
},
unsubscribeOnError: true);
}
return completer.future;
}
Future<Socket> close() {
socket._consumerDone();
return completer.future;
}
void write() {
try {
if (subscription == null) return;

View file

@ -237,6 +237,8 @@ abstract class Stream<T> {
* Binds this stream as the input of the provided [StreamConsumer].
*/
Future pipe(StreamConsumer<T, dynamic> streamConsumer) {
// TODO(floitsch): switch to:
// streamConsumer.addStream(this).then((_) => streamConsumer.close());
return streamConsumer.consume(this);
}
@ -983,6 +985,16 @@ class EventSinkView<T> extends StreamSink<T> {
* done.
*/
abstract class StreamConsumer<S, T> {
// TODO(floitsch): generic types.
// Currently not possible to add generic types, since they clash with other
// types that have already been used.
Future addStream(Stream<S> stream);
Future close();
/**
* Consume is deprecated. Use [addStream] followed by [close] instead.
*/
Future<T> consume(Stream<S> stream);
}

View file

@ -151,6 +151,7 @@ class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
File _file;
Future<RandomAccessFile> _openFuture;
StreamSubscription _subscription;
_FileStreamConsumer(File this._file, FileMode mode) {
_openFuture = _file.open(mode: mode);
@ -162,6 +163,10 @@ class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
}
Future<File> consume(Stream<List<int>> stream) {
return addStream(stream).then((_) => close());
}
Future<File> addStream(Stream<List<int>> stream) {
Completer<File> completer = new Completer<File>();
_openFuture
.then((openedFile) {
@ -176,15 +181,7 @@ class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
});
},
onDone: () {
// Wait for the file to close (and therefore flush) before
// completing the future.
openedFile.close()
.then((_) {
completer.complete(_file);
})
.catchError((e) {
completer.completeError(e);
});
completer.complete(_file);
},
onError: (e) {
openedFile.close();
@ -197,6 +194,10 @@ class _FileStreamConsumer extends StreamConsumer<List<int>, File> {
});
return completer.future;
}
Future<File> close() {
return _openFuture.then((openedFile) => openedFile.close());
}
}

View file

@ -407,12 +407,16 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
return _ioSink.consume(stream);
}
Future<T> writeStream(Stream<List<int>> stream) {
Future<T> addStream(Stream<List<int>> stream) {
_writeHeaders();
return _ioSink.writeStream(stream).then((_) => this);
}
void close() {
Future<T> writeStream(Stream<List<int>> stream) {
return addStream(stream);
}
Future close() {
// TODO(ajohnsen): Currently, contentLength, chunkedTransferEncoding and
// persistentConnection is not guaranteed to be in sync.
if (!_headersWritten && !_ignoreBody && headers.contentLength == -1) {
@ -422,7 +426,7 @@ abstract class _HttpOutboundMessage<T> implements IOSink {
headers.contentLength = 0;
}
_writeHeaders();
_ioSink.close();
return _ioSink.close();
}
Future<T> get done {
@ -487,6 +491,14 @@ class _HttpOutboundConsumer implements StreamConsumer {
bool this._asGZip);
Future consume(var stream) => _consume(_ioSink, stream, _asGZip);
Future addStream(var stream) {
throw new UnimplementedError("_HttpOutboundConsumer.addStream");
}
Future close() {
throw new UnimplementedError("_HttpOutboundConsumer.close");
}
}
@ -894,6 +906,14 @@ class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> {
// Use .then to ensure a Future branch.
return _consumeCompleter.future.then((_) => this);
}
Future addStream(Stream<List<int>> stream) {
throw new UnimplementedError("_HttpOutgoing.addStream");
}
Future close() {
throw new UnimplementedError("_HttpOutgoing.close");
}
}
@ -1653,13 +1673,17 @@ class _DetachedSocket extends Stream<List<int>> implements Socket {
return _socket.consume(stream);
}
Future<Socket> addStream(Stream<List<int>> stream) {
return _socket.addStream(stream);
}
Future<Socket> writeStream(Stream<List<int>> stream) {
return _socket.writeStream(stream);
}
void destroy() => _socket.destroy();
void close() => _socket.close();
Future close() => _socket.close();
Future<Socket> get done => _socket.done;

View file

@ -242,6 +242,14 @@ class _HttpParser
return completer.future;
}
Future<_HttpParser> addStream(Stream<List<int>> stream) {
throw new UnimplementedError("_HttpParser.addStream");
}
Future<_HttpParser> close() {
throw new UnimplementedError("_HttpParser.close");
}
// From RFC 2616.
// generic-message = start-line
// *(message-header CRLF)

View file

@ -36,15 +36,24 @@ abstract class IOSink<T> implements StreamConsumer<List<int>, T>, StringSink {
*/
Future<T> consume(Stream<List<int>> stream);
/**
* Adds all elements of the given [stream] to `this`.
*/
Future<T> addStream(Stream<List<int>> stream);
/**
* Like [consume], but will not close the target when done.
*
* *Deprecated*: use [addStream] instead.
*/
Future<T> writeStream(Stream<List<int>> stream);
/**
* Close the target.
*/
void close();
// TODO(floitsch): Currently the future cannot be typed because it has
// hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest.
Future close();
/**
* Get future that will complete when all data has been written to
@ -134,17 +143,22 @@ class _IOSinkImpl<T> implements IOSink<T> {
}
Future<T> writeStream(Stream<List<int>> stream) {
return addStream(stream);
}
Future<T> addStream(Stream<List<int>> stream) {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
}
return _fillFromStream(stream, unbind: true);
}
void close() {
Future close() {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
}
_controller.close();
return _pipeFuture;
}
Future<T> get done {

View file

@ -19,10 +19,15 @@ class SlowConsumer extends StreamConsumer {
final int bufferSize;
final List bufferedData = [];
int usedBufferSize = 0;
int finalCount;
SlowConsumer(int this.bytesPerSecond, int this.bufferSize);
Future consume(Stream stream) {
return addStream(stream).then((_) => close());
}
Future addStream(Stream stream) {
Completer result = new Completer();
var subscription;
subscription = stream.listen(
@ -44,9 +49,16 @@ class SlowConsumer extends StreamConsumer {
});
}
},
onDone: () { result.complete(receivedCount); });
onDone: () {
finalCount = receivedCount;
result.complete(receivedCount);
});
return result.future;
}
Future close() {
return new Future.immediate(finalCount);
}
}
class DataProvider {

View file

@ -19,10 +19,15 @@ class SlowConsumer extends StreamConsumer {
final int bufferSize;
final List bufferedData = [];
int usedBufferSize = 0;
int finalCount;
SlowConsumer(int this.bytesPerSecond, int this.bufferSize);
Future consume(Stream stream) {
return addStream(stream).then((_) => close());
}
Future addStream(Stream stream) {
Completer result = new Completer();
var subscription;
subscription = stream.listen(
@ -44,9 +49,16 @@ class SlowConsumer extends StreamConsumer {
});
}
},
onDone: () { result.complete(receivedCount); });
onDone: () {
finalCount = receivedCount;
result.complete(receivedCount);
});
return result.future;
}
Future close() {
return new Future.immediate(finalCount);
}
}
Stream<List> dataGenerator(int bytesTotal, int chunkSize) {

View file

@ -16,10 +16,15 @@ const int GB = KB * KB * KB;
class SlowConsumer extends StreamConsumer {
var current = new Future.immediate(0);
final int bytesPerSecond;
int finalCount;
SlowConsumer(int this.bytesPerSecond);
Future consume(Stream stream) {
return addStream(stream).then((_) => close());
}
Future addStream(Stream stream) {
bool done = false;
Completer completer = new Completer();
var subscription;
@ -40,10 +45,17 @@ class SlowConsumer extends StreamConsumer {
},
onDone: () {
done = true;
current.then((count) { completer.complete(count); });
current.then((count) {
finalCount = count;
completer.complete(count);
});
});
return completer.future;
}
Future close() {
return new Future.immediate(finalCount);
}
}
class DataProvider {