Add capture/release for results in package:async

R=floitsch@google.com

Review URL: https://codereview.chromium.org//255123002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@36548 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
lrn@google.com 2014-05-23 08:41:10 +00:00
parent 7d040cee6b
commit 1f3c087bc1
5 changed files with 555 additions and 3 deletions

View file

@ -4,8 +4,16 @@ The package contains sub-libraries with different utilities.
### Zipping streams ### Zipping streams
The "stream_zip.dart" sub-library contains functionality to combine several streams The "stream_zip.dart" sub-library contains functionality
of events into a single stream of tuples of events. to combine several streams of events into a single stream of tuples of events.
### Results
The "result.dart" sub-library introduces a `Result` class that can hold either
a value or an error.
It allows capturing an asynchronous computation which can give either a value
or an error, into an asynchronous computation that always gives a `Result`
value, where errors can be treated as data.
It also allows releasing the `Result` back into an asynchronous computation.
### History. ### History.
This package is unrelated to the discontinued `async` package with version 0.1.7. This package is unrelated to the discontinued `async` package with version 0.1.7.

View file

@ -5,3 +5,4 @@
library dart.pkg.async; library dart.pkg.async;
export "stream_zip.dart"; export "stream_zip.dart";
export "result.dart";

278
pkg/async/lib/result.dart Normal file
View file

@ -0,0 +1,278 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/**
* Capture asynchronous results into synchronous values, and release them again.
*
* Capturing a result (either a returned value or a thrown error)
* means converting it into a [Result] -
* either a [ValueResult] or an [ErrorResult].
*
* This value can release itself by writing itself either to a
* [EventSink] or a [Completer], or by becoming a [Future].
*/
library dart.pkg.async.results;
import "dart:async";
/**
* The result of a computation.
*/
abstract class Result<T> {
/**
* Create a `Result` with the result of calling [computation].
*
* This generates either a [ValueResult] with the value returned by
* calling `computation`, or an [ErrorResult] with an error thrown by
* the call.
*/
Result(T computation()) {
try {
return new ValueResult(computation());
} catch (e, s) {
return new ErrorResult(e, s);
}
}
/**
* Create a `Result` holding a value.
*
* Alias for [ValueResult.ValueResult].
*/
factory Result.value(T value) = ValueResult<T>;
/**
* Create a `Result` holding an error.
*
* Alias for [ErrorResult.ErrorResult].
*/
factory Result.error(Object error, [StackTrace stackTrace]) =>
new ErrorResult(error, stackTrace);
// Helper functions.
static _captureValue(value) => new ValueResult(value);
static _captureError(error, stack) => new ErrorResult(error, stack);
static _release(Result v) {
if (v.isValue) return v.asValue.value; // Avoid wrapping in future.
return v.asFuture;
}
/**
* Capture the result of a future into a `Result` future.
*
* The resulting future will never have an error.
* Errors have been converted to an [ErrorResult] value.
*/
static Future<Result> capture(Future future) {
return future.then(_captureValue, onError: _captureError);
}
/**
* Release the result of a captured future.
*
* Converts the [Result] value of the given [future] to a result or error
* completion of the returned future.
*
* If [future] completes with an error, the returned future completes with
* the same error.
*/
static Future release(Future<Result> future) {
return future.then(_release);
}
/**
* Capture the results of a stream into a stream of [Result] values.
*
* The returned stream will not have any error events.
* Errors from the source stream have been converted to [ErrorResult]s.
*
* Shorthand for transforming the stream using [CaptureStreamTransformer].
*/
static Stream<Result> captureStream(Stream source) {
return source.transform(const CaptureStreamTransformer());
}
/**
* Release a stream of [result] values into a stream of the results.
*
* `Result` values of the source stream become value or error events in
* the retuned stream as appropriate.
* Errors from the source stream become errors in the returned stream.
*
* Shorthand for transforming the stream using [ReleaseStreamTransformer].
*/
static Stream releaseStream(Stream<Result> source) {
return source.transform(const ReleaseStreamTransformer());
}
/**
* Whether this result is a value result.
*
* Always the opposite of [isError].
*/
bool get isValue;
/**
* Whether this result is an error result.
*
* Always the opposite of [isValue].
*/
bool get isError;
/**
* If this is a value result, return itself.
*
* Otherwise return `null`.
*/
ValueResult<T> get asValue;
/**
* If this is an error result, return itself.
*
* Otherwise return `null`.
*/
ErrorResult get asError;
/**
* Complete a completer with this result.
*/
void complete(Completer<T> completer);
/**
* Add this result to a [StreamSink].
*/
void addTo(EventSink<T> sink);
/**
* Creates a future completed with this result as a value or an error.
*/
Future<T> get asFuture;
}
/**
* A result representing a returned value.
*/
class ValueResult<T> implements Result<T> {
/** The returned value that this result represents. */
final T value;
/** Create a value result with the given [value]. */
ValueResult(this.value);
bool get isValue => true;
bool get isError => false;
ValueResult<T> get asValue => this;
ErrorResult get asError => null;
void complete(Completer<T> completer) {
completer.complete(value);
}
void addTo(EventSink<T> sink) {
sink.add(value);
}
Future<T> get asFuture => new Future.value(value);
}
/**
* A result representing a thrown error.
*/
class ErrorResult implements Result {
/** The thrown object that this result represents. */
final error;
/** The stack trace, if any, associated with the throw. */
final StackTrace stackTrace;
/** Create an error result with the given [error] and [stackTrace]. */
ErrorResult(this.error, this.stackTrace);
bool get isValue => false;
bool get isError => true;
ValueResult get asValue => null;
ErrorResult get asError => this;
void complete(Completer<T> completer) {
completer.completeError(error, stackTrace);
}
void addTo(EventSink<T> sink) {
sink.addError(error, stackTrace);
}
Future get asFuture => new Future.error(error, stackTrace);
}
/**
* A stream transformer that captures a stream of events into [Result]s.
*
* The result of the transformation is a stream of [Result] values and
* no error events.
*/
class CaptureStreamTransformer<T> implements StreamTransformer<T, Result<T>> {
const CaptureStreamTransformer();
Stream<Result<T>> bind(Stream<T> source) {
return new Stream<Result<T>>.eventTransformed(source, _createSink);
}
static EventSink _createSink(EventSink<Result> sink) {
return new CaptureSink(sink);
}
}
/**
* A stream transformer that releases a stream of result events.
*
* The result of the transformation is a stream of values and
* error events.
*/
class ReleaseStreamTransformer<T> implements StreamTransformer<Result<T>, T> {
const ReleaseStreamTransformer();
Stream<T> bind(Stream<Result<T>> source) {
return new Stream<T>.eventTransformed(source, _createSink);
}
static EventSink _createSink(EventSink<Result> sink) {
return new ReleaseSink(sink);
}
}
/**
* An event sink wrapper that captures the incoming events.
*
* Wraps an [EventSink] that expects [Result] values.
* Accepts any value and error result,
* and passes them to the wrapped sink as [Result] values.
*
* The wrapped sink will never receive an error event.
*/
class CaptureSink<T> implements EventSink<T> {
final EventSink _sink;
CaptureSink(EventSink<Result<T>> sink) : _sink = sink;
void add(T value) { _sink.add(new ValueResult(value)); }
void addError(Object error, StackTrace stackTrace) {
_sink.add(new ErrorResult(error, stackTrace));
}
void close() { _sink.close(); }
}
/**
* An event sink wrapper that releases the incoming result events.
*
* Wraps an output [EventSink] that expects any result.
* Accepts [Result] values, and puts the result value or error into the
* corresponding output sink add method.
*/
class ReleaseSink<T> implements EventSink<Result<T>> {
final EventSink _sink;
ReleaseSink(EventSink<T> sink) : _sink = sink;
void add(Result<T> result) {
if (result.isValue) {
_sink.add(result.asValue.value);
} else {
ErrorResult error = result.asError;
_sink.addError(error.error, error.stackTrace);
}
}
void addError(Object error, StackTrace stackTrace) {
// Errors may be added by intermediate processing, even if it is never
// added by CaptureSink.
_sink.addError(error, stackTrace);
}
void close() { _sink.close(); }
}

View file

@ -1,5 +1,5 @@
name: async name: async
version: 0.9.1+1 version: 0.9.2
author: Dart Team <misc@dartlang.org> author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library. description: Utility functions and classes related to the 'dart:async' library.
homepage: http://www.dartlang.org homepage: http://www.dartlang.org

View file

@ -0,0 +1,265 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "dart:collection";
import "package:async/result.dart";
import "package:unittest/unittest.dart";
void main() {
StackTrace stack;
try { throw 0; } catch (e, s) { stack = s; }
test("create result value", () {
Result<int> result = new Result<int>.value(42);
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
ValueResult value = result.asValue;
expect(value.value, equals(42));
});
test("create result value 2", () {
Result<int> result = new ValueResult<int>(42);
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
ValueResult<int> value = result.asValue;
expect(value.value, equals(42));
});
test("create result error", () {
Result<bool> result = new Result<bool>.error("BAD", stack);
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
ErrorResult error = result.asError;
expect(error.error, equals("BAD"));
expect(error.stackTrace, same(stack));
});
test("create result error 2", () {
Result<bool> result = new ErrorResult<bool>("BAD", stack);
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
ErrorResult error = result.asError;
expect(error.error, equals("BAD"));
expect(error.stackTrace, same(stack));
});
test("create result error no stack", () {
Result<bool> result = new Result<bool>.error("BAD");
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
ErrorResult error = result.asError;
expect(error.error, equals("BAD"));
expect(error.stackTrace, isNull);
});
test("complete with value", () {
Result<int> result = new ValueResult<int>(42);
Completer c = new Completer<int>();
c.future.then(expectAsync((int v) { expect(v, equals(42)); }),
onError: (e, s) { fail("Unexpected error"); });
result.complete(c);
});
test("complete with error", () {
Result<bool> result = new ErrorResult("BAD", stack);
Completer c = new Completer<bool>();
c.future.then((bool v) { Expect.fail("Unexpected value $v"); },
onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
}));
result.complete(c);
});
test("add sink value", () {
Result<int> result = new ValueResult<int>(42);
EventSink<int> sink = new TestSink(
onData: expectAsync((v) { expect(v, equals(42)); })
);
result.addTo(sink);
});
test("add sink error", () {
Result<bool> result = new ErrorResult("BAD", stack);
EventSink<bool> sink = new TestSink(
onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
})
);
result.addTo(sink);
});
test("value as future", () {
Result<int> result = new ValueResult<int>(42);
result.asFuture.then(expectAsync((int v) { expect(v, equals(42)); }),
onError: (e, s) { fail("Unexpected error"); });
});
test("error as future", () {
Result<bool> result = new ErrorResult("BAD", stack);
result.asFuture.then((bool v) { Expect.fail("Unexpected value $v"); },
onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
}));
});
test("capture future value", () {
Future<int> value = new Future<int>.value(42);
Result.capture(value).then(expectAsync((Result result) {
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
ValueResult value = result.asValue;
expect(value.value, equals(42));
}), onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
});
test("capture future error", () {
Future<bool> value = new Future<bool>.error("BAD", stack);
Result.capture(value).then(expectAsync((Result result) {
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
ErrorResult error = result.asError;
expect(error.error, equals("BAD"));
expect(error.stackTrace, same(stack));
}), onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
});
test("release future value", () {
Future<Result<int>> future =
new Future<Result<int>>.value(new Result<int>.value(42));
Result.release(future).then(expectAsync((v) {
expect(v, equals(42));
}), onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
});
test("release future error", () {
// An error in the result is unwrapped and reified by release.
Future<Result<bool>> future =
new Future<Result<bool>>.value(new Result<bool>.error("BAD", stack));
Result.release(future).then((v) {
Expect.fail("Unexpected value: $v");
}, onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
}));
});
test("release future real error", () {
// An error in the error lane is passed through by release.
Future<Result<bool>> future = new Future<Result<bool>>.error("BAD", stack);
Result.release(future).then((v) {
Expect.fail("Unexpected value: $v");
}, onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
}));
});
test("capture stream", () {
StreamController<int> c = new StreamController<int>();
Stream<Result> stream = Result.captureStream(c.stream);
var expectedList = new Queue.from([new Result.value(42),
new Result.error("BAD", stack),
new Result.value(37)]);
void listener(Result actual) {
expect(expectedList.isEmpty, isFalse);
expectResult(actual, expectedList.removeFirst());
}
stream.listen(expectAsync(listener, count: 3),
onError: (e, s) { fail("Unexpected error: $e"); },
onDone: expectAsync((){}),
cancelOnError: true);
c.add(42);
c.addError("BAD", stack);
c.add(37);
c.close();
});
test("release stream", () {
StreamController<Result<int>> c = new StreamController<Result<int>>();
Stream<int> stream = Result.releaseStream(c.stream);
List events = [new Result<int>.value(42),
new Result<int>.error("BAD", stack),
new Result<int>.value(37)];
// Expect the data events, and an extra error event.
var expectedList = new Queue.from(events)..add(new Result.error("BAD2"));
void dataListener(int v) {
expect(expectedList.isEmpty, isFalse);
Result expected = expectedList.removeFirst();
expect(expected.isValue, isTrue);
expect(v, equals(expected.asValue.value));
}
void errorListener(error, StackTrace stackTrace) {
expect(expectedList.isEmpty, isFalse);
Result expected = expectedList.removeFirst();
expect(expected.isError, isTrue);
expect(error, equals(expected.asError.error));
expect(stackTrace, same(expected.asError.stackTrace));
}
stream.listen(expectAsync(dataListener, count: 2),
onError: expectAsync(errorListener, count: 2),
onDone: expectAsync((){}));
for (Result<int> result in events) {
c.add(result); // Result value or error in data line.
}
c.addError("BAD2"); // Error in error line.
c.close();
});
test("release stream cancel on error", () {
StreamController<Result<int>> c = new StreamController<Result<int>>();
Stream<int> stream = Result.releaseStream(c.stream);
stream.listen(expectAsync((v) { expect(v, equals(42)); }),
onError: expectAsync((e, s) {
expect(e, equals("BAD"));
expect(s, same(stack));
}),
onDone: () { fail("Unexpected done event"); },
cancelOnError: true);
c.add(new Result.value(42));
c.add(new Result.error("BAD", stack));
c.add(new Result.value(37));
c.close();
});
}
void expectResult(Result actual, Result expected) {
expect(actual.isValue, equals(expected.isValue));
expect(actual.isError, equals(expected.isError));
if (actual.isValue) {
expect(actual.asValue.value, equals(expected.asValue.value));
} else {
expect(actual.asError.error, equals(expected.asError.error));
expect(actual.asError.stackTrace, same(expected.asError.stackTrace));
}
}
class TestSink<T> implements EventSink<T> {
final Function onData;
final Function onError;
final Function onDone;
TestSink({void this.onData(T data) : _nullData,
void this.onError(e, StackTrace s) : _nullError,
void this.onDone() : _nullDone });
void add(T value) { onData(value); }
void addError(error, [StackTrace stack]) { onError(error, stack); }
void close() { onDone(); }
static void _nullData(value) { Expect.fail("Unexpected sink add: $value"); }
static void _nullError(e, StackTrace s) {
Expect.fail("Unexpected sink addError: $e");
}
static void _nullDone() { Expect.fail("Unepxected sink close"); }
}