Get the pub oauth2 test passing locally.

Review URL: https://codereview.chromium.org//12191008

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@18103 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
nweiz@google.com 2013-02-05 00:57:25 +00:00
parent 5b8c1da46d
commit dbb72ee35d
6 changed files with 71 additions and 27 deletions

View file

@ -35,5 +35,5 @@ class ByteStream extends StreamView<List<int>> {
toBytes().then((bytes) => decodeString(bytes, encoding));
Stream<String> toStringStream([Encoding encoding=Encoding.UTF_8]) =>
mappedBy((bytes) => decodeString(bytes, encoding));
wrapStream(mappedBy((bytes) => decodeString(bytes, encoding)));
}

View file

@ -309,3 +309,16 @@ Future forEachFuture(Iterable input, Future fn(element)) {
}
return nextElement(null);
}
// TODO(nweiz): remove this when issue 8310 is fixed.
/// Returns a [Stream] identical to [stream], but piped through a new
/// [StreamController]. This exists to work around issue 8310.
Stream wrapStream(Stream stream) {
var controller = stream.isBroadcast
? new StreamController.broadcast()
: new StreamController();
stream.listen(controller.add,
onError: (e) => controller.signalError(e),
onDone: controller.close);
return controller.stream;
}

View file

@ -100,7 +100,7 @@ Future<Client> _getClient(SystemCache cache) {
if (credentials == null) return _authorize();
var client = new Client(_identifier, _secret, credentials,
httpClient: curlClient);
httpClient: httpClient);
_saveCredentials(cache, client.credentials);
return client;
});

View file

@ -226,8 +226,8 @@ final RegExp _lineRegexp = new RegExp(r"\r\n|\r|\n");
/// newline is ignored.
Stream<String> streamToLines(Stream<String> stream) {
var buffer = new StringBuffer();
return stream.transform(new StreamTransformer.from(
onData: (chunk, sink) {
return wrapStream(stream.transform(new StreamTransformer(
handleData: (chunk, sink) {
var lines = chunk.split(_lineRegexp);
var leftover = lines.removeLast();
for (var line in lines) {
@ -240,10 +240,24 @@ Stream<String> streamToLines(Stream<String> stream) {
sink.add(line);
}
buffer.add(leftover);
}, onDone: (sink) {
},
handleDone: (sink) {
if (!buffer.isEmpty) sink.add(buffer.toString());
sink.close();
}));
})));
}
// TODO(nweiz): remove this when issue 8310 is fixed.
/// Returns a [Stream] identical to [stream], but piped through a new
/// [StreamController]. This exists to work around issue 8310.
Stream wrapStream(Stream stream) {
var controller = stream.isBroadcast
? new StreamController.broadcast()
: new StreamController();
stream.listen(controller.add,
onError: (e) => controller.signalError(e),
onDone: controller.close);
return controller.stream;
}
/// Like [Iterable.where], but allows [test] to return [Future]s and uses the

View file

@ -31,7 +31,9 @@ main() {
response.outputStream.close();
});
pub.kill();
// After we give pub an invalid response, it should crash. We wait for it to
// do so rather than killing it so it'll write out the credentials file.
pub.shouldExit(1);
credentialsFile(server, 'access token').scheduleValidate();
});
@ -112,7 +114,9 @@ main() {
response.outputStream.close();
});
pub.kill();
// After we give pub an invalid response, it should crash. We wait for it to
// do so rather than killing it so it'll write out the credentials file.
pub.shouldExit(1);
credentialsFile(server, 'new access token').scheduleValidate();
});
@ -135,7 +139,10 @@ main() {
response.outputStream.close();
});
pub.kill();
// After we give pub an invalid response, it should crash. We wait for it to
// do so rather than killing it so it'll write out the credentials file.
pub.shouldExit(1);
credentialsFile(server, 'new access token').scheduleValidate();
});
@ -190,7 +197,7 @@ void authorizePub(ScheduledProcess pub, ScheduledServer server,
.send();
}).then((response) {
expect(response.headers['location'],
equals(['http://pub.dartlang.org/authorized']));
equals('http://pub.dartlang.org/authorized'));
}), anything);
handleAccessTokenRequest(server, accessToken);

View file

@ -474,8 +474,15 @@ final _TIMEOUT = 30000;
/// Defines an integration test. The [body] should schedule a series of
/// operations which will be run asynchronously.
void integration(String description, void body()) {
test(description, () {
void integration(String description, void body()) =>
_integration(description, body, test);
/// Like [integration], but causes only this test to run.
void solo_integration(String description, void body()) =>
_integration(description, body, solo_test);
void _integration(String description, void body(), [Function testFn]) {
testFn(description, () {
// Ensure the SDK version is always available.
dir(sdkPath, [
file('version', '0.1.2.3')
@ -828,7 +835,7 @@ abstract class Descriptor {
var path = join(dir, name);
return defer(() {
if (!entryExists(path)) {
throw new ExpectException('File $name in $dir not found.');
throw new ExpectException('Entry $path not found.');
}
return validate(path);
});
@ -1286,16 +1293,19 @@ class ScheduledProcess {
var pairFuture = process.then((p) {
_process = p;
var stdoutTee = tee(p.stdout.handleError((e) {
registerException(e.error, e.stackTrace);
}));
byteStreamToLines(stream) {
var handledErrors = wrapStream(stream.handleError((e) {
registerException(e.error, e.stackTrace);
}));
return streamToLines(new ByteStream(handledErrors).toStringStream());
}
var stdoutTee = tee(byteStreamToLines(p.stdout));
var stdoutPair = streamWithSubscription(stdoutTee.last);
_stdout = stdoutPair.first;
_stdoutSubscription = stdoutPair.last;
var stderrTee = tee(p.stderr.handleError((e) {
registerException(e.error, e.stackTrace);
}));
var stderrTee = tee(byteStreamToLines(p.stderr));
var stderrPair = streamWithSubscription(stderrTee.last);
_stderr = stderrPair.first;
_stderrSubscription = stderrPair.last;
@ -1312,7 +1322,7 @@ class ScheduledProcess {
"or kill() called before the test is run.");
}
return process.then((p) => p.exitCode).then((exitCode) {
process.then((p) => p.exitCode).then((exitCode) {
if (_endExpected) {
_exitCode = exitCode;
_exitCodeCompleter.complete(exitCode);
@ -1321,18 +1331,18 @@ class ScheduledProcess {
// Sleep for half a second in case _endExpected is set in the next
// scheduled event.
sleep(500).then((_) {
return sleep(500).then((_) {
if (_endExpected) {
_exitCodeCompleter.complete(exitCode);
return;
}
return _printStreams().then((_) {
registerException(new ExpectException("Process $name ended "
"earlier than scheduled with exit code $exitCode"));
});
return _printStreams();
}).then((_) {
registerException(new ExpectException("Process $name ended "
"earlier than scheduled with exit code $exitCode"));
});
});
}).catchError((e) => registerException(e.error, e.stackTrace));
});
_scheduleOnException((_) {
@ -1409,7 +1419,7 @@ class ScheduledProcess {
/// Writes [line] to the process as stdin.
void writeLine(String line) {
_schedule((_) => _processFuture.then(
(p) => p.stdin.write('$line\n'.charCodes)));
(p) => p.stdin.add('$line\n'.charCodes)));
}
/// Kills the process, and waits until it's dead.