Accommodate a race condition in pkg/scheduled_test/scheduled_process_test.

BUG=9022

Review URL: https://codereview.chromium.org//12741004

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@19754 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
nweiz@google.com 2013-03-09 02:02:26 +00:00
parent b73ee6b89a
commit afcfccea83
3 changed files with 114 additions and 96 deletions

View file

@ -38,8 +38,8 @@ class ScheduledProcess {
/// A line-by-line view of the standard output stream of the process.
Stream<String> _stdout;
/// A subscription that controls both [_stdout] and [_stdoutLog].
StreamSubscription<String> _stdoutSubscription;
/// A canceller that controls both [_stdout] and [_stdoutLog].
StreamCanceller _stdoutCanceller;
/// A fork of [_stderr] that records the standard error of the process. Used
/// for debugging information.
@ -48,8 +48,8 @@ class ScheduledProcess {
/// A line-by-line view of the standard error stream of the process.
Stream<String> _stderr;
/// A subscription that controls both [_stderr] and [_stderrLog].
StreamSubscription<String> _stderrSubscription;
/// A canceller that controls both [_stderr] and [_stderrLog].
StreamCanceller _stderrCanceller;
/// The exit code of the process that's scheduled to run. This will naturally
/// only complete once the process has terminated.
@ -87,17 +87,17 @@ class ScheduledProcess {
_scheduleExceptionCleanup();
var stdoutWithSubscription = _lineStreamWithSubscription(
var stdoutWithCanceller = _lineStreamWithCanceller(
_process.then((p) => p.stdout));
_stdoutSubscription = stdoutWithSubscription.last;
var stdoutTee = tee(stdoutWithSubscription.first);
_stdoutCanceller = stdoutWithCanceller.last;
var stdoutTee = tee(stdoutWithCanceller.first);
_stdout = stdoutTee.first;
_stdoutLog = stdoutTee.last;
var stderrWithSubscription = _lineStreamWithSubscription(
var stderrWithCanceller = _lineStreamWithCanceller(
_process.then((p) => p.stderr));
_stderrSubscription = stderrWithSubscription.last;
var stderrTee = tee(stderrWithSubscription.first);
_stderrCanceller = stderrWithCanceller.last;
var stderrTee = tee(stderrWithCanceller.first);
_stderr = stderrTee.first;
_stderrLog = stderrTee.last;
}
@ -177,10 +177,10 @@ class ScheduledProcess {
}
/// Converts a stream of bytes to a stream of lines and returns that along
/// with a [StreamSubscription] controlling it.
Pair<Stream<String>, StreamSubscription<String>> _lineStreamWithSubscription(
/// with a [StreamCanceller] controlling it.
Pair<Stream<String>, StreamCanceller> _lineStreamWithCanceller(
Future<Stream<int>> streamFuture) {
return streamWithSubscription(futureStream(streamFuture)
return streamWithCanceller(futureStream(streamFuture)
.handleError((e) => currentSchedule.signalError(e))
.transform(new StringDecoder(_encoding))
.transform(new LineTransformer()));
@ -190,8 +190,8 @@ class ScheduledProcess {
/// debug information if an error occurs.
void _scheduleExceptionCleanup() {
currentSchedule.onException.schedule(() {
_stdoutSubscription.cancel();
_stderrSubscription.cancel();
_stdoutCanceller();
_stderrCanceller();
if (!_process.hasValue) return;

View file

@ -104,16 +104,23 @@ Future streamFirst(Stream stream) {
return completer.future;
}
/// Returns a wrapped version of [stream] along with a [StreamSubscription] that
/// can be used to control the wrapped stream.
Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) {
/// A function that can be called to cancel a [Stream] and send a done message.
typedef void StreamCanceller();
// TODO(nweiz): use a StreamSubscription when issue 9026 is fixed.
/// Returns a wrapped version of [stream] along with a function that will cancel
/// the wrapped stream. Unlike [StreamSubscription], this canceller will send a
/// "done" message to the wrapped stream.
Pair<Stream, StreamCanceller> streamWithCanceller(Stream stream) {
var controller = stream.isBroadcast ?
new StreamController.broadcast() :
new StreamController();
var subscription = stream.listen(controller.add,
onError: controller.signalError,
onDone: controller.close);
return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
var subscription = stream.listen((value) {
if (!controller.isClosed) controller.add(value);
}, onError: (error) {
if (!controller.isClosed) controller.signalError(error);
}, onDone: controller.close);
return new Pair<Stream, StreamCanceller>(controller.stream, controller.close);
}
// TODO(nweiz): remove this when issue 7787 is fixed.

View file

@ -118,31 +118,35 @@ void main() {
});
});
// TODO(nweiz): re-enable this (issue 9022).
// expectTestsPass("a process that ends while waiting for stdout shouldn't "
// "block the test", () {
// var errors;
// test('test 1', () {
// currentSchedule.onException.schedule(() {
// errors = currentSchedule.errors;
// });
//
// var process = startDartProcess('');
// expect(process.nextLine(), completion(equals('hello')));
// expect(process.nextLine(), completion(equals('world')));
// process.shouldExit(0);
// });
//
// test('test 2', () {
// expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
// expect(errors.length, equals(2));
// expect(errors[0].error, isStateError);
// expect(errors[0].error.message, equals("No elements"));
// expect(errors[1].error, matches(r"^Process "
// r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
// r"exit code 0\."));
// });
// }, passing: ['test 2']);
expectTestsPass("a process that ends while waiting for stdout shouldn't "
"block the test", () {
var errors;
test('test 1', () {
currentSchedule.onException.schedule(() {
errors = currentSchedule.errors;
});
var process = startDartProcess('');
expect(process.nextLine(), completion(equals('hello')));
expect(process.nextLine(), completion(equals('world')));
process.shouldExit(0);
});
test('test 2', () {
expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
expect(errors.length, anyOf(1, 2));
expect(errors[0].error, isStateError);
expect(errors[0].error.message, equals("No elements"));
// Whether or not this error appears depends on how quickly the "no
// elements" error is handled.
if (errors.length == 2) {
expect(errors[1].error, matches(r"^Process "
r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
r"exit code 0\."));
}
});
}, passing: ['test 2']);
expectTestsPass("a process that ends during the task immediately before it's "
"scheduled to end shouldn't cause an error", () {
@ -168,30 +172,34 @@ void main() {
});
});
// TODO(nweiz): re-enable this (issue 9022).
// expectTestsPass("nextLine throws an error if there's no more stdout", () {
// var errors;
// test('test 1', () {
// currentSchedule.onException.schedule(() {
// errors = currentSchedule.errors;
// });
//
// var process = startDartProcess('print("hello");');
// expect(process.nextLine(), completion(equals('hello')));
// expect(process.nextLine(), completion(equals('world')));
// process.shouldExit(0);
// });
//
// test('test 2', () {
// expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
// expect(errors.length, equals(2));
// expect(errors[0].error, isStateError);
// expect(errors[0].error.message, equals("No elements"));
// expect(errors[1].error, matches(r"^Process "
// r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
// r"exit code 0\."));
// });
// }, passing: ['test 2']);
expectTestsPass("nextLine throws an error if there's no more stdout", () {
var errors;
test('test 1', () {
currentSchedule.onException.schedule(() {
errors = currentSchedule.errors;
});
var process = startDartProcess('print("hello");');
expect(process.nextLine(), completion(equals('hello')));
expect(process.nextLine(), completion(equals('world')));
process.shouldExit(0);
});
test('test 2', () {
expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
expect(errors.length, anyOf(1, 2));
expect(errors[0].error, isStateError);
expect(errors[0].error.message, equals("No elements"));
// Whether or not this error appears depends on how quickly the "no
// elements" error is handled.
if (errors.length == 2) {
expect(errors[1].error, matches(r"^Process "
r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
r"exit code 0\."));
}
});
}, passing: ['test 2']);
expectTestsPass("nextErrLine returns the next line of stderr from the "
"process", () {
@ -208,30 +216,34 @@ void main() {
});
});
// TODO(nweiz): re-enable this (issue 9022).
// expectTestsPass("nextErrLine throws an error if there's no more stderr", () {
// var errors;
// test('test 1', () {
// currentSchedule.onException.schedule(() {
// errors = currentSchedule.errors;
// });
//
// var process = startDartProcess(r'stderr.write("hello\n");');
// expect(process.nextErrLine(), completion(equals('hello')));
// expect(process.nextErrLine(), completion(equals('world')));
// process.shouldExit(0);
// });
//
// test('test 2', () {
// expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
// expect(errors.length, equals(2));
// expect(errors[0].error, isStateError);
// expect(errors[0].error.message, equals("No elements"));
// expect(errors[1].error, matches(r"^Process "
// r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
// r"exit code 0\."));
// });
// }, passing: ['test 2']);
expectTestsPass("nextErrLine throws an error if there's no more stderr", () {
var errors;
test('test 1', () {
currentSchedule.onException.schedule(() {
errors = currentSchedule.errors;
});
var process = startDartProcess(r'stderr.write("hello\n");');
expect(process.nextErrLine(), completion(equals('hello')));
expect(process.nextErrLine(), completion(equals('world')));
process.shouldExit(0);
});
test('test 2', () {
expect(errors, everyElement(new isInstanceOf<ScheduleError>()));
expect(errors.length, anyOf(1, 2));
expect(errors[0].error, isStateError);
expect(errors[0].error.message, equals("No elements"));
// Whether or not this error appears depends on how quickly the "no
// elements" error is handled.
if (errors.length == 2) {
expect(errors[1].error, matches(r"^Process "
r"'[^']+[\\/]dart(\.exe)? [^']+' ended earlier than scheduled with "
r"exit code 0\."));
}
});
}, passing: ['test 2']);
expectTestsPass("remainingStdout returns all the stdout if it's not consumed "
"any other way", () {
@ -333,7 +345,6 @@ void main() {
expectTestsPass("closeStdin closes the process's stdin stream", () {
test('test', () {
currentSchedule.timeout = new Duration(seconds: 1);
var process = startDartProcess(r'''
stdin.listen((line) => print("> $line"),
onDone: () => print("stdin closed"));