update to use bazel_worker package

BUG=
R=paulberry@google.com

Review URL: https://codereview.chromium.org/1885073002 .
This commit is contained in:
Jacob MacDonald 2016-04-14 07:46:33 -07:00
parent 09c6fcebc0
commit d2e2241fc1
10 changed files with 51 additions and 537 deletions

3
DEPS
View file

@ -32,6 +32,7 @@ vars = {
"args_tag": "@0.13.4",
"async_tag": "@1.9.0",
"barback_tag" : "@0.15.2+7",
"bazel_worker_tag": "@v0.1.0",
"boolean_selector_tag" : "@1.0.0",
"boringssl_rev" : "@daeafc22c66ad48f6b32fc8d3362eb9ba31b774e",
"charcode_tag": "@1.1.0",
@ -150,6 +151,8 @@ deps = {
(Var("github_mirror") % "async") + Var("async_tag"),
Var("dart_root") + "/third_party/pkg/barback":
(Var("github_mirror") % "barback") + Var("barback_tag"),
Var("dart_root") + "/third_party/pkg/bazel_worker":
(Var("github_dartlang") % "bazel_worker") + Var("bazel_worker_tag"),
Var("dart_root") + "/third_party/pkg/boolean_selector":
(Var("github_dartlang") % "boolean_selector") +
Var("boolean_selector_tag"),

View file

@ -28,10 +28,7 @@ import 'package:analyzer_cli/src/analyzer_impl.dart';
import 'package:analyzer_cli/src/driver.dart';
import 'package:analyzer_cli/src/error_formatter.dart';
import 'package:analyzer_cli/src/options.dart';
import 'package:protobuf/protobuf.dart';
import 'message_grouper.dart';
import 'worker_protocol.pb.dart';
import 'package:bazel_worker/bazel_worker.dart';
/**
* Analyzer used when the "--build-mode" option is supplied.
@ -276,74 +273,23 @@ class BuildMode {
}
}
/**
* Default implementation of [WorkerConnection] that works with stdio.
*/
class StdWorkerConnection implements WorkerConnection {
final MessageGrouper _messageGrouper;
final io.Stdout _stdoutStream;
StdWorkerConnection(io.Stdin stdinStream, this._stdoutStream)
: _messageGrouper = new MessageGrouper(stdinStream);
@override
WorkRequest readRequest() {
var buffer = _messageGrouper.next;
if (buffer == null) return null;
return new WorkRequest.fromBuffer(buffer);
}
@override
void writeResponse(WorkResponse response) {
var responseBuffer = response.writeToBuffer();
var writer = new CodedBufferWriter();
writer.writeInt32NoTag(responseBuffer.length);
writer.writeRawBytes(responseBuffer);
_stdoutStream.add(writer.toBuffer());
}
}
/**
* Connection between a worker and input / output.
*/
abstract class WorkerConnection {
/**
* Read a new [WorkRequest]. Returns [null] when there are no more requests.
*/
WorkRequest readRequest();
/**
* Write the given [response] as bytes to the output.
*/
void writeResponse(WorkResponse response);
}
/**
* Persistent Bazel worker.
*/
class WorkerLoop {
static const int EXIT_CODE_OK = 0;
static const int EXIT_CODE_ERROR = 15;
final WorkerConnection connection;
class AnalyzerWorkerLoop extends SyncWorkerLoop {
final StringBuffer errorBuffer = new StringBuffer();
final StringBuffer outBuffer = new StringBuffer();
final String dartSdkPath;
WorkerLoop(this.connection, {this.dartSdkPath});
AnalyzerWorkerLoop(SyncWorkerConnection connection, {this.dartSdkPath})
: super(connection: connection);
factory WorkerLoop.std(
factory AnalyzerWorkerLoop.std(
{io.Stdin stdinStream, io.Stdout stdoutStream, String dartSdkPath}) {
stdinStream ??= io.stdin;
stdoutStream ??= io.stdout;
WorkerConnection connection =
new StdWorkerConnection(stdinStream, stdoutStream);
return new WorkerLoop(connection, dartSdkPath: dartSdkPath);
SyncWorkerConnection connection = new StdSyncWorkerConnection(
stdinStream: stdinStream, stdoutStream: stdoutStream);
return new AnalyzerWorkerLoop(connection, dartSdkPath: dartSdkPath);
}
/**
@ -354,14 +300,12 @@ class WorkerLoop {
}
/**
* Perform a single loop step. Return `true` if should exit the loop.
* Perform a single loop step.
*/
bool performSingle() {
WorkResponse performRequest(WorkRequest request) {
errorBuffer.clear();
outBuffer.clear();
try {
WorkRequest request = connection.readRequest();
if (request == null) {
return true;
}
// Add in the dart-sdk argument if `dartSdkPath` is not null, otherwise it
// will try to find the currently installed sdk.
var arguments = new List.from(request.arguments);
@ -377,36 +321,29 @@ class WorkerLoop {
// Analyze and respond.
analyze(options);
String msg = _getErrorOutputBuffersText();
connection.writeResponse(new WorkResponse()
return new WorkResponse()
..exitCode = EXIT_CODE_OK
..output = msg);
..output = msg;
} catch (e, st) {
String msg = _getErrorOutputBuffersText();
msg += '$e \n $st';
connection.writeResponse(new WorkResponse()
msg += '$e\n$st';
return new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = msg);
..output = msg;
}
return false;
}
/**
* Run the worker loop.
*/
@override
void run() {
errorSink = errorBuffer;
outSink = outBuffer;
exitHandler = (int exitCode) {
return throw new StateError('Exit called: $exitCode');
};
while (true) {
errorBuffer.clear();
outBuffer.clear();
bool shouldExit = performSingle();
if (shouldExit) {
break;
}
}
super.run();
}
String _getErrorOutputBuffersText() {

View file

@ -245,7 +245,7 @@ class Driver implements CommandLineStarter {
ErrorSeverity _buildModeAnalyze(CommandLineOptions options) {
return _analyzeAllTag.makeCurrentWhile(() {
if (options.buildModePersistentWorker) {
new WorkerLoop.std(dartSdkPath: options.dartSdkPath).run();
new AnalyzerWorkerLoop.std(dartSdkPath: options.dartSdkPath).run();
} else {
return new BuildMode(options, stats).analyze();
}

View file

@ -1,112 +0,0 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:io';
import 'dart:typed_data';
/// Groups stdin input into messages by interpreting it as
/// base-128 encoded lengths interleaved with raw data.
///
/// The base-128 encoding is in little-endian order, with the high bit set on
/// all bytes but the last. This was chosen since it's the same as the
/// base-128 encoding used by protobufs, so it allows a modest amount of code
/// reuse at the other end of the protocol.
///
/// Possible future improvements to consider (should a debugging need arise):
/// - Put a magic number at the beginning of the stream.
/// - Use a guard byte between messages to sanity check that the encoder and
/// decoder agree on the encoding of lengths.
class MessageGrouper {
final _state = new _MessageGrouperState();
final Stdin _stdin;
MessageGrouper(this._stdin);
/// Blocks until the next full message is received, and then returns it.
///
/// Returns null at end of file.
List<int> get next {
var message;
while (message == null) {
var nextByte = _stdin.readByteSync();
if (nextByte == -1) return null;
message = _state.handleInput(nextByte);
}
return message;
}
}
/// State held by the [MessageGrouper] while waiting for additional data to
/// arrive.
class _MessageGrouperState {
/// `true` means we are waiting to receive bytes of base-128 encoded length.
/// Some bytes of length may have been received already.
///
/// `false` means we are waiting to receive more bytes of message data. Some
/// bytes of message data may have been received already.
bool waitingForLength = true;
/// If [waitingForLength] is `true`, the decoded value of the length bytes
/// received so far (if any). If [waitingForLength] is `false`, the decoded
/// length that was most recently received.
int length = 0;
/// If [waitingForLength] is `true`, the amount by which the next received
/// length byte must be left-shifted; otherwise undefined.
int lengthShift = 0;
/// If [waitingForLength] is `false`, a [Uint8List] which is ready to receive
/// message data. Otherwise null.
Uint8List message;
/// If [waitingForLength] is `false`, the number of message bytes that have
/// been received so far. Otherwise zero.
int numMessageBytesReceived;
_MessageGrouperState() {
reset();
}
/// Handle one byte at a time.
///
/// Returns a [List<int>] of message bytes if [byte] was the last byte in a
/// message, otherwise returns [null].
List<int> handleInput(int byte) {
if (waitingForLength) {
length |= (byte & 0x7f) << lengthShift;
if ((byte & 0x80) == 0) {
waitingForLength = false;
message = new Uint8List(length);
if (length == 0) {
// There is no message data to wait for, so just go ahead and deliver the
// empty message.
var messageToReturn = message;
reset();
return messageToReturn;
}
} else {
lengthShift += 7;
}
} else {
message[numMessageBytesReceived] = byte;
numMessageBytesReceived++;
if (numMessageBytesReceived == length) {
var messageToReturn = message;
reset();
return messageToReturn;
}
}
return null;
}
/// Reset the state so that we are ready to receive the next base-128 encoded
/// length.
void reset() {
waitingForLength = true;
length = 0;
lengthShift = 0;
message = null;
numMessageBytesReceived = 0;
}
}

View file

@ -1,133 +0,0 @@
///
// Generated code. Do not modify.
///
library blaze.worker_worker_protocol;
import 'package:protobuf/protobuf.dart';
class Input extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('Input')
..a(1, 'path', PbFieldType.OS)
..a(2, 'digest', PbFieldType.OY)
..hasRequiredFields = false
;
Input() : super();
Input.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
Input.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
Input clone() => new Input()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static Input create() => new Input();
static PbList<Input> createRepeated() => new PbList<Input>();
static Input getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyInput();
return _defaultInstance;
}
static Input _defaultInstance;
static void $checkItem(Input v) {
if (v is !Input) checkItemFailed(v, 'Input');
}
String get path => $_get(0, 1, '');
void set path(String v) { $_setString(0, 1, v); }
bool hasPath() => $_has(0, 1);
void clearPath() => clearField(1);
List<int> get digest => $_get(1, 2, null);
void set digest(List<int> v) { $_setBytes(1, 2, v); }
bool hasDigest() => $_has(1, 2);
void clearDigest() => clearField(2);
}
class _ReadonlyInput extends Input with ReadonlyMessageMixin {}
class WorkRequest extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('WorkRequest')
..p(1, 'arguments', PbFieldType.PS)
..pp(2, 'inputs', PbFieldType.PM, Input.$checkItem, Input.create)
..hasRequiredFields = false
;
WorkRequest() : super();
WorkRequest.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
WorkRequest.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
WorkRequest clone() => new WorkRequest()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static WorkRequest create() => new WorkRequest();
static PbList<WorkRequest> createRepeated() => new PbList<WorkRequest>();
static WorkRequest getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkRequest();
return _defaultInstance;
}
static WorkRequest _defaultInstance;
static void $checkItem(WorkRequest v) {
if (v is !WorkRequest) checkItemFailed(v, 'WorkRequest');
}
List<String> get arguments => $_get(0, 1, null);
List<Input> get inputs => $_get(1, 2, null);
}
class _ReadonlyWorkRequest extends WorkRequest with ReadonlyMessageMixin {}
class WorkResponse extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('WorkResponse')
..a(1, 'exitCode', PbFieldType.O3)
..a(2, 'output', PbFieldType.OS)
..hasRequiredFields = false
;
WorkResponse() : super();
WorkResponse.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
WorkResponse.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
WorkResponse clone() => new WorkResponse()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static WorkResponse create() => new WorkResponse();
static PbList<WorkResponse> createRepeated() => new PbList<WorkResponse>();
static WorkResponse getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkResponse();
return _defaultInstance;
}
static WorkResponse _defaultInstance;
static void $checkItem(WorkResponse v) {
if (v is !WorkResponse) checkItemFailed(v, 'WorkResponse');
}
int get exitCode => $_get(0, 1, 0);
void set exitCode(int v) { $_setUnsignedInt32(0, 1, v); }
bool hasExitCode() => $_has(0, 1);
void clearExitCode() => clearField(1);
String get output => $_get(1, 2, '');
void set output(String v) { $_setString(1, 2, v); }
bool hasOutput() => $_has(1, 2);
void clearOutput() => clearField(2);
}
class _ReadonlyWorkResponse extends WorkResponse with ReadonlyMessageMixin {}
const Input$json = const {
'1': 'Input',
'2': const [
const {'1': 'path', '3': 1, '4': 1, '5': 9},
const {'1': 'digest', '3': 2, '4': 1, '5': 12},
],
};
const WorkRequest$json = const {
'1': 'WorkRequest',
'2': const [
const {'1': 'arguments', '3': 1, '4': 3, '5': 9},
const {'1': 'inputs', '3': 2, '4': 3, '5': 11, '6': '.blaze.worker.Input'},
],
};
const WorkResponse$json = const {
'1': 'WorkResponse',
'2': const [
const {'1': 'exit_code', '3': 1, '4': 1, '5': 5},
const {'1': 'output', '3': 2, '4': 1, '5': 9},
],
};

View file

@ -8,6 +8,7 @@ environment:
dependencies:
analyzer: ^0.27.0
args: ^0.13.0
bazel_worker: ^0.1.0
cli_util: ^0.0.1
linter: ^0.1.10
package_config: ^0.1.1

View file

@ -6,7 +6,6 @@ import 'boot_loader_test.dart' as boot_loader;
import 'build_mode_test.dart' as build_mode_test;
import 'driver_test.dart' as driver;
import 'error_test.dart' as error;
import 'message_grouper_test.dart' as message_grouper;
import 'options_test.dart' as options;
import 'package_prefix_test.dart' as package_prefix;
import 'perf_report_test.dart' as perf;
@ -25,7 +24,6 @@ main() {
//sdk_ext.main();
//strong_mode.main();
error.main();
message_grouper.main();
options.main();
perf.main();
plugin_manager.main();

View file

@ -4,35 +4,28 @@
library analyzer_cli.test.built_mode;
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'package:analyzer_cli/src/build_mode.dart';
import 'package:analyzer_cli/src/driver.dart';
import 'package:analyzer_cli/src/options.dart';
import 'package:analyzer_cli/src/worker_protocol.pb.dart';
import 'package:bazel_worker/bazel_worker.dart';
import 'package:bazel_worker/testing.dart';
import 'package:protobuf/protobuf.dart';
import 'package:test_reflective_loader/test_reflective_loader.dart';
import 'package:typed_mock/typed_mock.dart';
import 'package:unittest/unittest.dart';
import 'utils.dart';
main() {
defineReflectiveTests(WorkerLoopTest);
}
typedef void _TestWorkerLoopAnalyze(CommandLineOptions options);
@reflectiveTest
class WorkerLoopTest {
final TestStdinStream stdinStream = new TestStdinStream();
final TestStdinSync stdinStream = new TestStdinSync();
final TestStdoutStream stdoutStream = new TestStdoutStream();
_TestWorkerConnection connection;
TestSyncWorkerConnection connection;
WorkerLoopTest() {
connection = new _TestWorkerConnection(this.stdinStream, this.stdoutStream);
connection =
new TestSyncWorkerConnection(this.stdinStream, this.stdoutStream);
}
void setUp() {}
@ -57,7 +50,7 @@ class WorkerLoopTest {
]);
stdinStream.addInputBytes(_serializeProto(request));
new _TestWorkerLoop(connection, (CommandLineOptions options) {
new TestAnalyzerWorkerLoop(connection, (CommandLineOptions options) {
expect(options.buildSummaryInputs,
unorderedEquals(['/tmp/1.sum', '/tmp/2.sum']));
expect(
@ -71,10 +64,10 @@ class WorkerLoopTest {
outSink.writeln('outSink b');
errorSink.writeln('errorSink b');
}).run();
expect(connection.outputList, hasLength(1));
expect(connection.responses, hasLength(1));
var response = connection.outputList[0];
expect(response.exitCode, WorkerLoop.EXIT_CODE_OK, reason: response.output);
var response = connection.responses[0];
expect(response.exitCode, EXIT_CODE_OK, reason: response.output);
expect(
response.output,
allOf(contains('errorSink a'), contains('errorSink a'),
@ -89,64 +82,50 @@ class WorkerLoopTest {
var request = new WorkRequest();
request.arguments.addAll(['--unknown-option', '/foo.dart', '/bar.dart']);
stdinStream.addInputBytes(_serializeProto(request));
new _TestWorkerLoop(connection).run();
expect(connection.outputList, hasLength(1));
new TestAnalyzerWorkerLoop(connection).run();
expect(connection.responses, hasLength(1));
var response = connection.outputList[0];
expect(response.exitCode, WorkerLoop.EXIT_CODE_ERROR);
var response = connection.responses[0];
expect(response.exitCode, EXIT_CODE_ERROR);
expect(response.output, anything);
}
test_run_invalidRequest_noArgumentsInputs() {
stdinStream.addInputBytes(_serializeProto(new WorkRequest()));
new _TestWorkerLoop(connection).run();
expect(connection.outputList, hasLength(1));
new TestAnalyzerWorkerLoop(connection).run();
expect(connection.responses, hasLength(1));
var response = connection.outputList[0];
expect(response.exitCode, WorkerLoop.EXIT_CODE_ERROR);
var response = connection.responses[0];
expect(response.exitCode, EXIT_CODE_ERROR);
expect(response.output, anything);
}
test_run_invalidRequest_randomBytes() {
stdinStream.addInputBytes([1, 2, 3]);
new _TestWorkerLoop(connection).run();
expect(connection.outputList, hasLength(1));
new TestAnalyzerWorkerLoop(connection).run();
expect(connection.responses, hasLength(1));
var response = connection.outputList[0];
expect(response.exitCode, WorkerLoop.EXIT_CODE_ERROR);
var response = connection.responses[0];
expect(response.exitCode, EXIT_CODE_ERROR);
expect(response.output, anything);
}
test_run_stopAtEOF() {
stdinStream.addInputBytes([-1]);
new _TestWorkerLoop(connection).run();
new TestAnalyzerWorkerLoop(connection).run();
}
}
/**
* A [StdWorkerConnection] which records its responses.
*/
class _TestWorkerConnection extends StdWorkerConnection {
final outputList = <WorkResponse>[];
_TestWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
: super(stdinStream, stdoutStream);
@override
void writeResponse(WorkResponse response) {
super.writeResponse(response);
outputList.add(response);
}
}
typedef void _TestWorkerLoopAnalyze(CommandLineOptions options);
/**
* [WorkerLoop] for testing.
* [AnalyzerWorkerLoop] for testing.
*/
class _TestWorkerLoop extends WorkerLoop {
class TestAnalyzerWorkerLoop extends AnalyzerWorkerLoop {
final _TestWorkerLoopAnalyze _analyze;
_TestWorkerLoop(WorkerConnection connection, [this._analyze])
TestAnalyzerWorkerLoop(SyncWorkerConnection connection, [this._analyze])
: super(connection);
@override

View file

@ -1,123 +0,0 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:analyzer_cli/src/message_grouper.dart';
import 'package:unittest/unittest.dart';
import 'utils.dart';
main() {
MessageGrouper messageGrouper;
TestStdinStream stdinStream;
setUp(() {
stdinStream = new TestStdinStream();
messageGrouper = new MessageGrouper(stdinStream);
});
group('message_grouper', () {
/// Check that if the message grouper produces the [expectedOutput] in
/// response to the corresponding [input].
void check(List<int> input, List<List<int>> expectedOutput) {
stdinStream.addInputBytes(input);
for (var chunk in expectedOutput) {
expect(messageGrouper.next, equals(chunk));
}
}
/// Make a simple message having the given [length]
List<int> makeMessage(int length) {
var result = <int>[];
for (int i = 0; i < length; i++) {
result.add(i & 0xff);
}
return result;
}
test('Empty message', () {
check([0], [[]]);
});
test('Short message', () {
check([
5,
10,
20,
30,
40,
50
], [
[10, 20, 30, 40, 50]
]);
});
test('Message with 2-byte length', () {
var len = 0x155;
var msg = makeMessage(len);
var encodedLen = [0xd5, 0x02];
check([]..addAll(encodedLen)..addAll(msg), [msg]);
});
test('Message with 3-byte length', () {
var len = 0x4103;
var msg = makeMessage(len);
var encodedLen = [0x83, 0x82, 0x01];
check([]..addAll(encodedLen)..addAll(msg), [msg]);
});
test('Multiple messages', () {
check([
2,
10,
20,
2,
30,
40
], [
[10, 20],
[30, 40]
]);
});
test('Empty message at start', () {
check([
0,
2,
10,
20
], [
[],
[10, 20]
]);
});
test('Empty message at end', () {
check([
2,
10,
20,
0
], [
[10, 20],
[]
]);
});
test('Empty message in the middle', () {
check([
2,
10,
20,
0,
2,
30,
40
], [
[10, 20],
[],
[30, 40]
]);
});
});
}

View file

@ -4,7 +4,6 @@
library analyzer_cli.test.utils;
import 'dart:collection';
import 'dart:io';
import 'dart:mirrors';
@ -12,7 +11,6 @@ import 'package:analyzer/analyzer.dart';
import 'package:analyzer/src/generated/java_io.dart';
import 'package:path/path.dart' as pathos;
import 'package:path/path.dart' as path;
import 'package:typed_mock/typed_mock.dart';
import 'package:unittest/unittest.dart';
/// Gets the test directory in a way that works with
@ -65,37 +63,3 @@ dynamic withTempDir(fn(String path)) {
}
class _TestUtils {}
/**
* A [Stdin] mock.
*/
class TestStdinStream extends TypedMock implements Stdin {
final pendingBytes = new Queue<int>();
// Adds all the input bytes to this stream.
void addInputBytes(List<int> bytes) {
pendingBytes.addAll(bytes);
}
@override
int readByteSync() {
if (pendingBytes.isEmpty) {
return -1;
} else {
return pendingBytes.removeFirst();
}
}
}
/**
* A [Stdout] mock.
*/
class TestStdoutStream extends TypedMock implements Stdout {
final writes = <List<int>>[];
@override
void add(List<int> bytes) {
super.add(bytes);
writes.add(bytes);
}
}