Reland "[ Service / package:dds ] Add stream support to package:dds and enable DDS for VM service tests"

Fixes buildbot failures by running VM service and DDS service tests in
separate test processes.

This reverts commit 1f0c90868c.

Change-Id: I0ad9b2e6ccea4d5468dd4dcdde0e286bcbf820ff
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/142902
Reviewed-by: Ryan Macnak <rmacnak@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Ben Konyi 2020-04-16 17:59:25 +00:00 committed by commit-bot@chromium.org
parent 4f2e21b351
commit e5b85792da
40 changed files with 672 additions and 112 deletions

2
DEPS
View file

@ -146,7 +146,7 @@ vars = {
"usage_tag": "3.4.0",
"watcher_rev": "0.9.7+14",
"web_components_rev": "8f57dac273412a7172c8ade6f361b407e2e4ed02",
"web_socket_channel_tag": "1.0.15",
"web_socket_channel_tag": "1.1.0",
"WebCore_rev": "fb11e887f77919450e497344da570d780e078bc8",
"yaml_tag": "2.2.0",
"zlib_rev": "c44fb7248079cc3d5563b14b3f758aee60d6b415",

View file

@ -7,15 +7,24 @@
library dds;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:async/async.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_proxy/shelf_proxy.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
part 'src/binary_compatible_peer.dart';
part 'src/client.dart';
part 'src/dds_impl.dart';
part 'src/stream_manager.dart';
/// An intermediary between a Dart VM service and its clients that offers
/// additional functionality on top of the standard VM service protocol.

View file

@ -0,0 +1,60 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
/// Adds support for binary events send from the VM service, which are not part
/// of the official JSON RPC 2.0 specification.
///
/// A binary event from the VM service has the form:
/// ```
/// type BinaryEvent {
/// dataOffset : uint32,
/// metadata : uint8[dataOffset-4],
/// data : uint8[],
/// }
/// ```
/// where `metadata` is the JSON body of the event.
///
/// [_BinaryCompatiblePeer] assumes that only stream events can contain a
/// binary payload (e.g., clients cannot send a `BinaryEvent` to the VM service).
class _BinaryCompatiblePeer extends json_rpc.Peer {
_BinaryCompatiblePeer(WebSocketChannel ws, _StreamManager streamManager)
: super(
ws.transform<String>(
StreamChannelTransformer(
StreamTransformer<dynamic, String>.fromHandlers(
handleData: (data, EventSink<String> sink) =>
_transformStream(streamManager, data, sink)),
StreamSinkTransformer<String, dynamic>.fromHandlers(
handleData: (String data, EventSink<dynamic> sink) {
sink.add(data);
},
),
),
),
);
static void _transformStream(
_StreamManager streamManager, dynamic data, EventSink<String> sink) {
if (data is String) {
// Non-binary messages come in as Strings. Simply forward to the sink.
sink.add(data);
} else if (data is Uint8List) {
// Only binary events will result in `data` being of type Uint8List. We
// need to manually forward them here.
final bytesView =
ByteData.view(data.buffer, data.offsetInBytes, data.lengthInBytes);
const metadataOffset = 4;
final dataOffset = bytesView.getUint32(0, Endian.little);
final metadataLength = dataOffset - metadataOffset;
final metadata = Utf8Decoder().convert(new Uint8List.view(
bytesView.buffer,
bytesView.offsetInBytes + metadataOffset,
metadataLength));
final decodedMetadata = json.decode(metadata);
streamManager.streamNotify(decodedMetadata['params']['streamId'], data);
}
}
}

View file

@ -0,0 +1,68 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class _DartDevelopmentServiceClient {
_DartDevelopmentServiceClient(
this.dds,
this.ws,
json_rpc.Peer vmServicePeer,
) : _vmServicePeer = vmServicePeer {
_clientPeer = json_rpc.Peer(ws.cast<String>());
_registerJsonRpcMethods();
}
/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dds.streamManager.clientDisconnect(this),
);
/// Close the connection to the client.
Future<void> close() async {
// Cleanup the JSON RPC server for this connection if DDS has shutdown.
await _clientPeer.close();
}
/// Send a JSON RPC notification to the client.
void sendNotification(String method, [dynamic parameters]) async {
if (_clientPeer.isClosed) {
return;
}
_clientPeer.sendNotification(method, parameters);
}
/// Registers handlers for JSON RPC methods which need to be intercepted by
/// DDS as well as fallback request forwarder.
void _registerJsonRpcMethods() {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamListen(this, streamId);
return _success;
});
_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return _success;
});
// Unless otherwise specified, the request is forwarded to the VM service.
_clientPeer.registerFallback((parameters) async =>
await _vmServicePeer.sendRequest(parameters.method, parameters.asMap));
}
static const _success = <String, dynamic>{
'type': 'Success',
};
final _DartDevelopmentService dds;
final json_rpc.Peer _vmServicePeer;
final WebSocketChannel ws;
json_rpc.Peer _clientPeer;
}

View file

@ -5,12 +5,20 @@
part of dds;
class _DartDevelopmentService implements DartDevelopmentService {
_DartDevelopmentService(this._remoteVmServiceUri, this._uri);
_DartDevelopmentService(this._remoteVmServiceUri, this._uri) {
_streamManager = _StreamManager(this);
}
Future<void> startService() async {
// Establish the connection to the VM service.
_vmServiceSocket = await WebSocket.connect(remoteVmServiceWsUri.toString());
_vmServiceStream = _vmServiceSocket.asBroadcastStream();
_vmServiceSocket = WebSocketChannel.connect(remoteVmServiceWsUri);
_vmServiceClient = _BinaryCompatiblePeer(_vmServiceSocket, _streamManager);
// Setup the JSON RPC client with the VM service.
unawaited(_vmServiceClient.listen());
// Setup stream event handling.
streamManager.listen();
// Once we have a connection to the VM service, we're ready to spawn the intermediary.
await _startDDSServer();
}
@ -28,8 +36,20 @@ class _DartDevelopmentService implements DartDevelopmentService {
/// Stop accepting requests after gracefully handling existing requests.
Future<void> shutdown() async {
// Don't accept anymore HTTP requests.
await _server.close();
await _vmServiceSocket.close();
// Close all incoming websocket connections.
final futures = <Future>[];
for (final client in _clients) {
futures.add(client.close());
}
await Future.wait(futures);
// Close connection to VM service.
await _vmServiceSocket.sink.close();
_done.complete();
}
// Attempt to upgrade HTTP requests to a websocket before processing them as
@ -38,16 +58,18 @@ class _DartDevelopmentService implements DartDevelopmentService {
Cascade _handlers() => Cascade().add(_webSocketHandler()).add(_httpHandler());
Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
_vmServiceStream.listen(
(event) => ws.sink.add(event),
onDone: () => ws.sink.close(),
final client = _DartDevelopmentServiceClient(
this,
ws,
_vmServiceClient,
);
ws.stream.listen((event) => _vmServiceSocket.add(event));
_clients.add(client);
client.listen().then((_) => _clients.remove(client));
});
Handler _httpHandler() {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
// DDS doesn't support any HTTP requests itself, so we just forward all of
// them to the VM service.
final cascade = Cascade().add(proxyHandler(remoteVmServiceUri));
return cascade.handler;
}
@ -79,7 +101,15 @@ class _DartDevelopmentService implements DartDevelopmentService {
bool get isRunning => _uri != null;
WebSocket _vmServiceSocket;
Stream _vmServiceStream;
Future<void> get done => _done.future;
Completer _done = Completer<void>();
_StreamManager get streamManager => _streamManager;
_StreamManager _streamManager;
final List<_DartDevelopmentServiceClient> _clients = [];
json_rpc.Peer _vmServiceClient;
WebSocketChannel _vmServiceSocket;
HttpServer _server;
}

View file

@ -0,0 +1,113 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
class _StreamManager {
_StreamManager(this.dds);
/// Send `streamNotify` notifications to clients subscribed to `streamId`.
///
/// If `data` is of type `Uint8List`, the notification is assumed to be a
/// binary event and is forwarded directly over the subscriber's websocket.
/// Otherwise, the event is sent via the JSON RPC client.
void streamNotify(String streamId, data) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId];
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (isBinaryData) {
listener.ws.sink.add(data);
} else {
listener.sendNotification('streamNotify', data);
}
}
}
}
/// Start listening for `streamNotify` events from the VM service and forward
/// them to the clients which have subscribed to the stream.
void listen() => dds._vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
final streamId = parameters['streamId'].asString;
streamNotify(streamId, parameters.value);
},
);
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
Future<void> streamListen(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds._vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
streamListeners[stream] = <_DartDevelopmentServiceClient>[];
}
if (streamListeners[stream].contains(client)) {
throw kStreamAlreadySubscribedException;
}
streamListeners[stream].add(client);
}
/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
/// send a `streamCancel` request for `stream` to the VM service.
Future<void> streamCancel(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
if (listeners.isEmpty) {
streamListeners.remove(stream);
final result = await dds._vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(_DartDevelopmentServiceClient client) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId);
}
}
// These error codes must be kept in sync with those in vm/json_stream.h and
// vmservice.dart.
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;
// Keep these messages in sync with the VM service.
static final kStreamAlreadySubscribedException = json_rpc.RpcException(
kStreamAlreadySubscribed,
'Stream already subscribed',
);
static final kStreamNotSubscribedException = json_rpc.RpcException(
kStreamNotSubscribed,
'Stream not subscribed',
);
final _DartDevelopmentService dds;
final streamListeners = <String, List<_DartDevelopmentServiceClient>>{};
}

View file

@ -11,13 +11,15 @@ environment:
sdk: '>=2.6.0 <3.0.0'
dependencies:
async: ^2.4.1
json_rpc_2: ^2.1.0
pedantic: ^1.7.0
shelf: ^0.7.5
shelf_proxy: ^0.1.0+7
shelf_web_socket: ^0.2.3
stream_channel: ^2.0.0
web_socket_channel: ^1.1.0
dev_dependencies:
pedantic: ^1.7.0
test: ^1.0.0
vm_service: ^4.0.0

View file

@ -583,6 +583,8 @@ class StandardTestSuite extends TestSuite {
_enqueueStandardTest(testFile, expectationSet, onTest);
} else if (configuration.runtime.isBrowser) {
_enqueueBrowserTest(testFile, expectationSet, onTest);
} else if (suiteName == 'service') {
_enqueueServiceTest(testFile, expectationSet, onTest);
} else {
_enqueueStandardTest(testFile, expectationSet, onTest);
}
@ -616,6 +618,45 @@ class StandardTestSuite extends TestSuite {
}
}
void _enqueueServiceTest(
TestFile testFile, Set<Expectation> expectations, TestCaseEvent onTest) {
var commonArguments = _commonArgumentsFromFile(testFile);
var vmOptionsList = getVmOptions(testFile);
assert(!vmOptionsList.isEmpty);
bool emitDdsTest = false;
for (int i = 0; i < 2; ++i) {
for (var vmOptionsVariant = 0;
vmOptionsVariant < vmOptionsList.length;
vmOptionsVariant++) {
var vmOptions = vmOptionsList[vmOptionsVariant];
var allVmOptions = vmOptions;
if (!extraVmOptions.isEmpty) {
allVmOptions = vmOptions.toList()..addAll(extraVmOptions);
}
if (emitDdsTest) {
allVmOptions.add('-DUSE_DDS=true');
}
var isCrashExpected = expectations.contains(Expectation.crash);
var commands = _makeCommands(
testFile,
vmOptionsVariant + (vmOptionsList.length * i),
allVmOptions,
commonArguments,
isCrashExpected);
var variantTestName =
testFile.name + '/${emitDdsTest ? 'dds' : 'service'}';
if (vmOptionsList.length > 1) {
variantTestName = "${testFile.name}_$vmOptionsVariant";
}
_addTestCase(testFile, variantTestName, commands, expectations, onTest);
}
emitDdsTest = true;
}
}
List<Command> _makeCommands(TestFile testFile, int vmOptionsVariant,
List<String> vmOptions, List<String> args, bool isCrashExpected) {
var commands = <Command>[];

View file

@ -1,5 +1,9 @@
# Changelog
## Unreleased
- Fixed issue where RPC format did not conform to the JSON-RPC 2.0
specification.
## 4.0.1
- Improved documentation.
- Fixed analysis issues.

View file

@ -1955,13 +1955,17 @@ class VmService implements VmServiceInterface {
Future get onDone => _onDoneCompleter.future;
Future<T> _call<T>(String method, [Map args]) {
Future<T> _call<T>(String method, [Map args = const {}]) {
String id = '${++_id}';
Completer<T> completer = Completer<T>();
_completers[id] = completer;
_methodCalls[id] = method;
Map m = {'id': id, 'method': method};
if (args != null) m['params'] = args;
Map m = {
'jsonrpc': '2.0',
'id': id,
'method': method,
'params': args,
};
String message = jsonEncode(m);
_onSend.add(message);
_writeMessage(message);

View file

@ -118,13 +118,12 @@ final String _implCode = r'''
Future get onDone => _onDoneCompleter.future;
Future<T> _call<T>(String method, [Map args]) {
Future<T> _call<T>(String method, [Map args = const {}]) {
String id = '${++_id}';
Completer<T> completer = Completer<T>();
_completers[id] = completer;
_methodCalls[id] = method;
Map m = {'id': id, 'method': method};
if (args != null) m['params'] = args;
Map m = {'jsonrpc': '2.0', 'id': id, 'method': method, 'params': args,};
String message = jsonEncode(m);
_onSend.add(message);
_writeMessage(message);

View file

@ -318,8 +318,12 @@ abstract class CommonWebSocketVM extends VM {
'params': {'id': serial, 'query': request.method}
});
} else {
message = json.encode(
{'id': serial, 'method': request.method, 'params': request.params});
message = json.encode({
'jsonrpc': '2.0',
'id': serial,
'method': request.method,
'params': request.params
});
}
if (request.method != 'getTagProfile' &&
request.method != 'getIsolateMetric' &&

View file

@ -66,7 +66,7 @@ var tests = <IsolateTest>[
fail("Expected to find function");
}
isolate.resume();
await isolate.resume();
}
];

View file

@ -63,4 +63,9 @@ var tests = <VMTest>[
},
];
main(args) async => runVMTests(args, tests);
main(args) async => runVMTests(
args,
tests,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -60,5 +60,12 @@ final test = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, test,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -58,5 +58,12 @@ final test = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, test,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -40,5 +40,12 @@ final sameClientNamesTest = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, sameClientNamesTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
sameClientNamesTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -58,5 +58,12 @@ final multipleClientNamesTest = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, multipleClientNamesTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
multipleClientNamesTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -51,5 +51,12 @@ final nameChangeTest = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, nameChangeTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
nameChangeTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -60,5 +60,11 @@ final hotReloadTest = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, hotReloadTest,
testeeConcurrent: fooBar, pause_on_start: true);
Future<void> main(args) => runIsolateTests(
args,
hotReloadTest,
testeeConcurrent: fooBar,
pause_on_start: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -21,11 +21,15 @@ int selectedPort = 0;
main(args) async {
selectedPort = await _getUnusedPort();
await runVMTests(args, tests,
enable_service_port_fallback: true,
// Choose a port number that should always be open.
port: selectedPort,
extraArgs: []);
await runVMTests(
args, tests,
enable_service_port_fallback: true,
// Choose a port number that should always be open.
port: selectedPort,
extraArgs: [],
// TODO(bkonyi): investigate failure.
enableDds: false,
);
}
Future<int> _getUnusedPort() async {

View file

@ -164,4 +164,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -89,4 +89,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -83,4 +83,9 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -123,4 +123,9 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -103,4 +103,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -126,4 +126,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -37,4 +37,10 @@ final test = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, test, testeeBefore: fooBar);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeBefore: fooBar,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -37,4 +37,11 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// This test hangs with DDS as package:json_rpc_2 can't parse the JSON
// response and is unable to determine the request ID, so the malformed
// JSON request will never complete.
enableDds: false,
);

View file

@ -22,4 +22,8 @@ var tests = <VMTest>[
}
];
main(args) async => runVMTests(args, tests);
main(args) async => runVMTests(
args, tests,
// TODO(bkonyi): DDS doesn't forward Observatory assets properly yet.
enableDds: false,
);

View file

@ -101,12 +101,16 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests,
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
verbose_vm: true,
extraArgs: [
'--trace-service',
'--trace-service-verbose',
]);
main(args) => runIsolateTests(
args, tests,
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
verbose_vm: true,
extraArgs: [
'--trace-service',
'--trace-service-verbose',
],
// TODO(bkonyi): investigate failure.
enableDds: false,
);

View file

@ -61,7 +61,13 @@ var tests = <IsolateTest>[
}
];
main(args) => runIsolateTests(args, tests,
pause_on_unhandled_exceptions: true,
testeeConcurrent: testeeMain,
extraArgs: extraDebuggingArgs);
main(args) => runIsolateTests(
args,
tests,
pause_on_unhandled_exceptions: true,
testeeConcurrent: testeeMain,
extraArgs: extraDebuggingArgs,
// TODO(bkonyi): causes ASSERT in debug mode, unrelated to DDS.
// See https://github.com/dart-lang/sdk/issues/41379.
enableDds: false,
);

View file

@ -29,6 +29,7 @@ get_isolate_after_language_error_test: SkipByDesign
*: SkipByDesign
[ $system == windows ]
*: Slow
async_generator_breakpoint_test: Skip # Issue 29145
dev_fs_http_put_weird_char_test: Skip # Windows disallows carriage returns in paths
dev_fs_weird_char_test: Skip # Windows disallows question mark in paths

View file

@ -0,0 +1,58 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
Future streamCancel(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamCancel',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Check double subscription fails.
(VM vm) async {
await streamListen(vm, '_Echo');
try {
await streamListen(vm, '_Echo');
fail('Subscribed to stream twice');
} on ServerRpcException catch (e) {
expect(e.message, 'Stream already subscribed');
}
},
// Check double cancellation fails.
(VM vm) async {
await streamCancel(vm, '_Echo');
try {
await streamCancel(vm, '_Echo');
fail('Double cancellation of stream successful');
} on ServerRpcException catch (e) {
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -7,6 +7,7 @@ library test_helper;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'service_test_common.dart';
@ -16,6 +17,9 @@ export 'service_test_common.dart' show IsolateTest, VMTest;
const bool useCausalAsyncStacks =
const bool.fromEnvironment('dart.developer.causal_async_stacks');
/// Determines whether DDS is enabled for this test run.
const bool useDds = const bool.fromEnvironment('USE_DDS');
/// The extra arguments to use
const List<String> extraDebuggingArgs = useCausalAsyncStacks
? const ['--causal-async-stacks', '--no-lazy-async-stacks']
@ -97,6 +101,8 @@ class _ServiceTesteeRunner {
class _ServiceTesteeLauncher {
Process process;
final List<String> args;
Future<void> get exited => _processCompleter.future;
final _processCompleter = Completer<void>();
bool killedByTester = false;
_ServiceTesteeLauncher() : args = [Platform.script.toFilePath()] {}
@ -286,6 +292,7 @@ class _ServiceTesteeLauncher {
throw "Testee exited with $exitCode";
}
print("** Process exited");
_processCompleter.complete();
});
// Wait for the blank line which signals that we're ready to run.
@ -295,9 +302,16 @@ class _ServiceTesteeLauncher {
}
final content = await serviceInfoFile.readAsString();
final infoJson = json.decode(content);
uri = Uri.parse(infoJson['uri']);
String rawUri = infoJson['uri'];
// If rawUri ends with a /, Uri.parse will include an empty string as the
// last path segment. Make sure it's not there to ensure we have a
// consistent Uri.
if (rawUri.endsWith('/')) {
rawUri = rawUri.substring(0, rawUri.length - 1);
}
uri = Uri.parse(rawUri);
completer.complete(uri);
print('** Signaled to run test queries on $uri');
});
return completer.future;
}
@ -314,7 +328,7 @@ class _ServiceTesteeLauncher {
void setupAddresses(Uri serverAddress) {
serviceWebsocketAddress =
'ws://${serverAddress.authority}${serverAddress.path}ws';
'ws://${serverAddress.authority}${serverAddress.path}/ws';
serviceHttpAddress = 'http://${serverAddress.authority}${serverAddress.path}';
}
@ -331,16 +345,18 @@ class _ServiceTesterRunner {
bool pause_on_unhandled_exceptions: false,
bool enable_service_port_fallback: false,
bool testeeControlsServer: false,
bool enableDds: true,
int port = 0,
}) {
if (executableArgs == null) {
executableArgs = Platform.executableArguments;
}
final process = new _ServiceTesteeLauncher();
final name = Platform.script.pathSegments.last;
DartDevelopmentService dds;
WebSocketVM vm;
_ServiceTesteeLauncher process;
setUp(() async {
process = _ServiceTesteeLauncher();
await process
.launch(
pause_on_start,
@ -358,7 +374,15 @@ class _ServiceTesterRunner {
print("Testee has pid $pid, waiting $wait before continuing");
sleep(wait);
}
setupAddresses(serverAddress);
if (useDds) {
dds = await DartDevelopmentService.startDartDevelopmentService(
serverAddress);
setupAddresses(dds.uri);
} else {
setupAddresses(serverAddress);
}
print('** Signaled to run test queries on $serviceHttpAddress'
' (${useDds ? "DDS" : "VM Service"})');
vm = new WebSocketVM(new WebSocketVMTarget(serviceWebsocketAddress));
print('Loading VM...');
await vm.load();
@ -366,43 +390,66 @@ class _ServiceTesterRunner {
});
});
test(
name,
() async {
// Run vm tests.
if (vmTests != null) {
int testIndex = 1;
final totalTests = vmTests.length;
for (var test in vmTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(vm);
}
}
// Run isolate tests.
if (isolateTests != null) {
final isolate = await getFirstIsolate(vm);
int testIndex = 1;
final totalTests = isolateTests.length;
for (var test in isolateTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(isolate);
}
}
},
retry: 0,
// Some service tests run fairly long (e.g., valid_source_locations_test).
timeout: Timeout.none,
);
tearDown(() {
print('All service tests completed successfully.');
tearDown(() async {
if (useDds) {
await dds.shutdown();
}
process.requestExit();
});
final name = Platform.script.pathSegments.last;
runTest(String name) {
test(
'$name (${useDds ? 'DDS' : 'VM Service'})',
() async {
bool testsDone = false;
try {
// Run vm tests.
if (vmTests != null) {
int testIndex = 1;
final totalTests = vmTests.length;
for (var test in vmTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(vm);
}
}
// Run isolate tests.
if (isolateTests != null) {
final isolate = await getFirstIsolate(vm);
int testIndex = 1;
final totalTests = isolateTests.length;
for (var test in isolateTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(isolate);
}
}
print('All service tests completed successfully.');
testsDone = true;
} catch (error, stackTrace) {
if (testsDone) {
print('Ignoring late exception during process exit:\n'
'$error\n$stackTrace');
} else {
rethrow;
}
}
},
// Some service tests run fairly long (e.g., valid_source_locations_test).
timeout: Timeout.none,
);
}
if (useDds && !enableDds) {
print('Skipping DDS run for $name');
} else {
runTest(name);
}
}
Future<Isolate> getFirstIsolate(WebSocketVM vm) async {
@ -460,6 +507,7 @@ Future runIsolateTests(List<String> mainArgs, List<IsolateTest> tests,
bool verbose_vm: false,
bool pause_on_unhandled_exceptions: false,
bool testeeControlsServer: false,
bool enableDds: true,
List<String> extraArgs}) async {
assert(!pause_on_start || testeeBefore == null);
if (_isTestee()) {
@ -477,7 +525,8 @@ Future runIsolateTests(List<String> mainArgs, List<IsolateTest> tests,
pause_on_exit: pause_on_exit,
verbose_vm: verbose_vm,
pause_on_unhandled_exceptions: pause_on_unhandled_exceptions,
testeeControlsServer: testeeControlsServer);
testeeControlsServer: testeeControlsServer,
enableDds: enableDds);
}
}
@ -529,6 +578,7 @@ Future runVMTests(List<String> mainArgs, List<VMTest> tests,
bool verbose_vm: false,
bool pause_on_unhandled_exceptions: false,
bool enable_service_port_fallback: false,
bool enableDds: true,
int port = 0,
List<String> extraArgs,
List<String> executableArgs}) async {
@ -549,6 +599,7 @@ Future runVMTests(List<String> mainArgs, List<VMTest> tests,
verbose_vm: verbose_vm,
pause_on_unhandled_exceptions: pause_on_unhandled_exceptions,
enable_service_port_fallback: enable_service_port_fallback,
enableDds: enableDds,
port: port,
);
}

View file

@ -62,7 +62,7 @@ Uri randomlyAddRequestParams(Uri uri) {
}
Future<HttpServer> startServer() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 8011);
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
server.listen((request) async {
final response = request.response;
randomlyAddCookie(response);

View file

@ -9,10 +9,14 @@ import 'package:test/test.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
primeDartTimeline() {
primeDartTimeline() async {
while (true) {
Timeline.startSync('apple');
Timeline.finishSync();
// Give the VM a chance to send the timeline events. This test is
// significantly slower if we loop without yielding control after each
// iteration.
await Future.delayed(const Duration(milliseconds: 1));
}
}
@ -22,8 +26,8 @@ List filterEvents(List events, filter) {
return events.where(filter).toList();
}
Completer completer = new Completer();
int eventCount = 0;
Completer completer;
int eventCount;
onTimelineEvent(ServiceEvent event) {
eventCount++;
@ -34,6 +38,11 @@ onTimelineEvent(ServiceEvent event) {
}
var tests = <IsolateTest>[
(Isolate isolate) async {
// Clear global state.
eventCount = 0;
completer = Completer<void>();
},
(Isolate isolate) async {
// Subscribe to the Timeline stream.
await subscribeToStream(isolate.vm, VM.kTimelineStream, onTimelineEvent);

View file

@ -49,7 +49,8 @@ final String serviceAuthToken = _makeAuthToken();
final Map<int, IsolateEmbedderData> isolateEmbedderData =
new Map<int, IsolateEmbedderData>();
// These must be kept in sync with the declarations in vm/json_stream.h.
// These must be kept in sync with the declarations in vm/json_stream.h and
// pkg/dds/lib/src/stream_manager.dart.
const kParseError = -32700;
const kInvalidRequest = -32600;
const kMethodNotFound = -32601;

View file

@ -46,7 +46,8 @@ final serviceAuthToken = _makeAuthToken();
// the cleanup method will be invoked after being removed from the map.
final isolateEmbedderData = <int, IsolateEmbedderData>{};
// These must be kept in sync with the declarations in vm/json_stream.h.
// These must be kept in sync with the declarations in vm/json_stream.h and
// pkg/dds/lib/src/stream_manager.dart.
const kParseError = -32700;
const kInvalidRequest = -32600;
const kMethodNotFound = -32601;

View file

@ -327,6 +327,7 @@
"pkg/dart_internal/",
"pkg/dart2native/",
"pkg/dart2js_tools/",
"pkg/dds/",
"pkg/expect/",
"pkg/front_end/",
"pkg/js/",
@ -412,6 +413,7 @@
"pkg/dart_internal/",
"pkg/dart2native/",
"pkg/dart2js_tools/",
"pkg/dds/",
"pkg/expect/",
"pkg/front_end/",
"pkg/js/",