From 6b2419ddafa09a0654cd3f39f5d40ef6e2d17d0d Mon Sep 17 00:00:00 2001 From: Ben Konyi Date: Tue, 7 Apr 2020 23:18:45 +0000 Subject: [PATCH] [ Service / package:dds ] Add stream support to package:dds and enable DDS for VM service tests This change adds stream forwarding to the Dart Development Service, allowing for clients to subscribe to service protocol streams with DDS instead of the VM service directly. DDS will maintain a single subscription for each stream as long as at least one client is listening to that stream. A DDS stream subscription will be closed when the last client listening to that stream either disconnects or calls streamCancel. This change also enables DDS for most of the Observatory services tests, excluding thoses which utilize: - Service extensions - Client naming - Client isolate resume synchronization Change-Id: I5641e879a7626fcd5e4d28434ed480dd72fc7659 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/142083 Reviewed-by: Ryan Macnak Commit-Queue: Ben Konyi --- DEPS | 2 +- pkg/dds/lib/dds.dart | 9 + pkg/dds/lib/src/binary_compatible_peer.dart | 60 ++++++ pkg/dds/lib/src/client.dart | 68 +++++++ pkg/dds/lib/src/dds_impl.dart | 54 ++++-- pkg/dds/lib/src/stream_manager.dart | 113 +++++++++++ pkg/dds/pubspec.yaml | 4 +- pkg/vm_service/CHANGELOG.md | 4 + pkg/vm_service/lib/src/vm_service.dart | 10 +- pkg/vm_service/tool/dart/generate_dart.dart | 5 +- runtime/observatory/.packages | 1 + runtime/observatory/lib/service_common.dart | 8 +- .../break_on_default_constructor_test.dart | 2 +- .../tests/service/client_name_rpc_test.dart | 7 +- ...pprovals_approve_then_disconnect_test.dart | 11 +- ...ient_resume_approvals_disconnect_test.dart | 11 +- ...resume_approvals_identical_names_test.dart | 11 +- ..._resume_approvals_multiple_names_test.dart | 11 +- ...ent_resume_approvals_name_change_test.dart | 11 +- .../client_resume_approvals_reload_test.dart | 10 +- ...e_service_port_fallback_positive_test.dart | 14 +- ..._service_asynchronous_invocation_test.dart | 6 +- .../external_service_disappear_test.dart | 6 +- ..._service_notification_invocation_test.dart | 7 +- .../external_service_registration_test.dart | 7 +- ...ce_registration_via_notification_test.dart | 6 +- ...l_service_synchronous_invocation_test.dart | 6 +- .../service/get_client_name_rpc_test.dart | 8 +- .../tests/service/malformed_test.dart | 9 +- .../service/observatory_assets_test.dart | 6 +- ...use_on_start_and_exit_with_child_test.dart | 22 ++- ...e_on_unhandled_async_exceptions2_test.dart | 14 +- .../service/stream_subscription_test.dart | 58 ++++++ .../tests/service/test_helper.dart | 179 +++++++++++------- .../service/vm_timeline_events_test.dart | 15 +- sdk/lib/vmservice/vmservice.dart | 3 +- sdk_nnbd/lib/vmservice/vmservice.dart | 3 +- tools/bots/test_matrix.json | 2 + 38 files changed, 649 insertions(+), 134 deletions(-) create mode 100644 pkg/dds/lib/src/binary_compatible_peer.dart create mode 100644 pkg/dds/lib/src/client.dart create mode 100644 pkg/dds/lib/src/stream_manager.dart create mode 100644 runtime/observatory/tests/service/stream_subscription_test.dart diff --git a/DEPS b/DEPS index 59060c96413..ff53d1748ef 100644 --- a/DEPS +++ b/DEPS @@ -147,7 +147,7 @@ vars = { "usage_tag": "3.4.0", "watcher_rev": "0.9.7+14", "web_components_rev": "8f57dac273412a7172c8ade6f361b407e2e4ed02", - "web_socket_channel_tag": "1.0.15", + "web_socket_channel_tag": "1.1.0", "WebCore_rev": "fb11e887f77919450e497344da570d780e078bc8", "yaml_tag": "2.2.0", "zlib_rev": "c44fb7248079cc3d5563b14b3f758aee60d6b415", diff --git a/pkg/dds/lib/dds.dart b/pkg/dds/lib/dds.dart index d2bb5eae518..03ced9ff837 100644 --- a/pkg/dds/lib/dds.dart +++ b/pkg/dds/lib/dds.dart @@ -7,15 +7,24 @@ library dds; import 'dart:async'; +import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; +import 'package:async/async.dart'; +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; +import 'package:pedantic/pedantic.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as io; import 'package:shelf_proxy/shelf_proxy.dart'; import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:stream_channel/stream_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; +part 'src/binary_compatible_peer.dart'; +part 'src/client.dart'; part 'src/dds_impl.dart'; +part 'src/stream_manager.dart'; /// An intermediary between a Dart VM service and its clients that offers /// additional functionality on top of the standard VM service protocol. diff --git a/pkg/dds/lib/src/binary_compatible_peer.dart b/pkg/dds/lib/src/binary_compatible_peer.dart new file mode 100644 index 00000000000..7c5b83b69fb --- /dev/null +++ b/pkg/dds/lib/src/binary_compatible_peer.dart @@ -0,0 +1,60 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of dds; + +/// Adds support for binary events send from the VM service, which are not part +/// of the official JSON RPC 2.0 specification. +/// +/// A binary event from the VM service has the form: +/// ``` +/// type BinaryEvent { +/// dataOffset : uint32, +/// metadata : uint8[dataOffset-4], +/// data : uint8[], +/// } +/// ``` +/// where `metadata` is the JSON body of the event. +/// +/// [_BinaryCompatiblePeer] assumes that only stream events can contain a +/// binary payload (e.g., clients cannot send a `BinaryEvent` to the VM service). +class _BinaryCompatiblePeer extends json_rpc.Peer { + _BinaryCompatiblePeer(WebSocketChannel ws, _StreamManager streamManager) + : super( + ws.transform( + StreamChannelTransformer( + StreamTransformer.fromHandlers( + handleData: (data, EventSink sink) => + _transformStream(streamManager, data, sink)), + StreamSinkTransformer.fromHandlers( + handleData: (String data, EventSink sink) { + sink.add(data); + }, + ), + ), + ), + ); + + static void _transformStream( + _StreamManager streamManager, dynamic data, EventSink sink) { + if (data is String) { + // Non-binary messages come in as Strings. Simply forward to the sink. + sink.add(data); + } else if (data is Uint8List) { + // Only binary events will result in `data` being of type Uint8List. We + // need to manually forward them here. + final bytesView = + ByteData.view(data.buffer, data.offsetInBytes, data.lengthInBytes); + const metadataOffset = 4; + final dataOffset = bytesView.getUint32(0, Endian.little); + final metadataLength = dataOffset - metadataOffset; + final metadata = Utf8Decoder().convert(new Uint8List.view( + bytesView.buffer, + bytesView.offsetInBytes + metadataOffset, + metadataLength)); + final decodedMetadata = json.decode(metadata); + streamManager.streamNotify(decodedMetadata['params']['streamId'], data); + } + } +} diff --git a/pkg/dds/lib/src/client.dart b/pkg/dds/lib/src/client.dart new file mode 100644 index 00000000000..11dac829dc4 --- /dev/null +++ b/pkg/dds/lib/src/client.dart @@ -0,0 +1,68 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of dds; + +/// Representation of a single DDS client which manages the connection and +/// DDS request intercepting / forwarding. +class _DartDevelopmentServiceClient { + _DartDevelopmentServiceClient( + this.dds, + this.ws, + json_rpc.Peer vmServicePeer, + ) : _vmServicePeer = vmServicePeer { + _clientPeer = json_rpc.Peer(ws.cast()); + _registerJsonRpcMethods(); + } + + /// Start receiving JSON RPC requests from the client. + /// + /// Returned future completes when the peer is closed. + Future listen() => _clientPeer.listen().then( + (_) => dds.streamManager.clientDisconnect(this), + ); + + /// Close the connection to the client. + Future close() async { + // Cleanup the JSON RPC server for this connection if DDS has shutdown. + await _clientPeer.close(); + } + + /// Send a JSON RPC notification to the client. + void sendNotification(String method, [dynamic parameters]) async { + if (_clientPeer.isClosed) { + return; + } + _clientPeer.sendNotification(method, parameters); + } + + /// Registers handlers for JSON RPC methods which need to be intercepted by + /// DDS as well as fallback request forwarder. + void _registerJsonRpcMethods() { + _clientPeer.registerMethod('streamListen', (parameters) async { + final streamId = parameters['streamId'].asString; + await dds.streamManager.streamListen(this, streamId); + return _success; + }); + + _clientPeer.registerMethod('streamCancel', (parameters) async { + final streamId = parameters['streamId'].asString; + await dds.streamManager.streamCancel(this, streamId); + return _success; + }); + + // Unless otherwise specified, the request is forwarded to the VM service. + _clientPeer.registerFallback((parameters) async => + await _vmServicePeer.sendRequest(parameters.method, parameters.asMap)); + } + + static const _success = { + 'type': 'Success', + }; + + final _DartDevelopmentService dds; + final json_rpc.Peer _vmServicePeer; + final WebSocketChannel ws; + json_rpc.Peer _clientPeer; +} diff --git a/pkg/dds/lib/src/dds_impl.dart b/pkg/dds/lib/src/dds_impl.dart index 37f019adc6f..fb2bf749950 100644 --- a/pkg/dds/lib/src/dds_impl.dart +++ b/pkg/dds/lib/src/dds_impl.dart @@ -5,12 +5,20 @@ part of dds; class _DartDevelopmentService implements DartDevelopmentService { - _DartDevelopmentService(this._remoteVmServiceUri, this._uri); + _DartDevelopmentService(this._remoteVmServiceUri, this._uri) { + _streamManager = _StreamManager(this); + } Future startService() async { // Establish the connection to the VM service. - _vmServiceSocket = await WebSocket.connect(remoteVmServiceWsUri.toString()); - _vmServiceStream = _vmServiceSocket.asBroadcastStream(); + _vmServiceSocket = WebSocketChannel.connect(remoteVmServiceWsUri); + _vmServiceClient = _BinaryCompatiblePeer(_vmServiceSocket, _streamManager); + // Setup the JSON RPC client with the VM service. + unawaited(_vmServiceClient.listen()); + + // Setup stream event handling. + streamManager.listen(); + // Once we have a connection to the VM service, we're ready to spawn the intermediary. await _startDDSServer(); } @@ -28,8 +36,20 @@ class _DartDevelopmentService implements DartDevelopmentService { /// Stop accepting requests after gracefully handling existing requests. Future shutdown() async { + // Don't accept anymore HTTP requests. await _server.close(); - await _vmServiceSocket.close(); + + // Close all incoming websocket connections. + final futures = []; + for (final client in _clients) { + futures.add(client.close()); + } + await Future.wait(futures); + + // Close connection to VM service. + await _vmServiceSocket.sink.close(); + + _done.complete(); } // Attempt to upgrade HTTP requests to a websocket before processing them as @@ -38,16 +58,18 @@ class _DartDevelopmentService implements DartDevelopmentService { Cascade _handlers() => Cascade().add(_webSocketHandler()).add(_httpHandler()); Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) { - // TODO(bkonyi): actually process requests instead of blindly forwarding them. - _vmServiceStream.listen( - (event) => ws.sink.add(event), - onDone: () => ws.sink.close(), + final client = _DartDevelopmentServiceClient( + this, + ws, + _vmServiceClient, ); - ws.stream.listen((event) => _vmServiceSocket.add(event)); + _clients.add(client); + client.listen().then((_) => _clients.remove(client)); }); Handler _httpHandler() { - // TODO(bkonyi): actually process requests instead of blindly forwarding them. + // DDS doesn't support any HTTP requests itself, so we just forward all of + // them to the VM service. final cascade = Cascade().add(proxyHandler(remoteVmServiceUri)); return cascade.handler; } @@ -79,7 +101,15 @@ class _DartDevelopmentService implements DartDevelopmentService { bool get isRunning => _uri != null; - WebSocket _vmServiceSocket; - Stream _vmServiceStream; + Future get done => _done.future; + Completer _done = Completer(); + + _StreamManager get streamManager => _streamManager; + _StreamManager _streamManager; + + final List<_DartDevelopmentServiceClient> _clients = []; + + json_rpc.Peer _vmServiceClient; + WebSocketChannel _vmServiceSocket; HttpServer _server; } diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart new file mode 100644 index 00000000000..318cc39a78b --- /dev/null +++ b/pkg/dds/lib/src/stream_manager.dart @@ -0,0 +1,113 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of dds; + +class _StreamManager { + _StreamManager(this.dds); + + /// Send `streamNotify` notifications to clients subscribed to `streamId`. + /// + /// If `data` is of type `Uint8List`, the notification is assumed to be a + /// binary event and is forwarded directly over the subscriber's websocket. + /// Otherwise, the event is sent via the JSON RPC client. + void streamNotify(String streamId, data) { + if (streamListeners.containsKey(streamId)) { + final listeners = streamListeners[streamId]; + final isBinaryData = data is Uint8List; + for (final listener in listeners) { + if (isBinaryData) { + listener.ws.sink.add(data); + } else { + listener.sendNotification('streamNotify', data); + } + } + } + } + + /// Start listening for `streamNotify` events from the VM service and forward + /// them to the clients which have subscribed to the stream. + void listen() => dds._vmServiceClient.registerMethod( + 'streamNotify', + (parameters) { + final streamId = parameters['streamId'].asString; + streamNotify(streamId, parameters.value); + }, + ); + + /// Subscribes `client` to a stream. + /// + /// If `client` is the first client to listen to `stream`, DDS will send a + /// `streamListen` request for `stream` to the VM service. + Future streamListen( + _DartDevelopmentServiceClient client, + String stream, + ) async { + assert(stream != null && stream.isNotEmpty); + if (!streamListeners.containsKey(stream)) { + // This will return an RPC exception if the stream doesn't exist. This + // will throw and the exception will be forwarded to the client. + final result = await dds._vmServiceClient.sendRequest('streamListen', { + 'streamId': stream, + }); + assert(result['type'] == 'Success'); + streamListeners[stream] = <_DartDevelopmentServiceClient>[]; + } + if (streamListeners[stream].contains(client)) { + throw kStreamAlreadySubscribedException; + } + streamListeners[stream].add(client); + } + + /// Unsubscribes `client` from a stream. + /// + /// If `client` is the last client to unsubscribe from `stream`, DDS will + /// send a `streamCancel` request for `stream` to the VM service. + Future streamCancel( + _DartDevelopmentServiceClient client, + String stream, + ) async { + assert(stream != null && stream.isNotEmpty); + final listeners = streamListeners[stream]; + if (listeners == null || !listeners.contains(client)) { + throw kStreamNotSubscribedException; + } + listeners.remove(client); + if (listeners.isEmpty) { + streamListeners.remove(stream); + final result = await dds._vmServiceClient.sendRequest('streamCancel', { + 'streamId': stream, + }); + assert(result['type'] == 'Success'); + } else { + streamListeners[stream] = listeners; + } + } + + /// Cleanup stream subscriptions for `client` when it has disconnected. + void clientDisconnect(_DartDevelopmentServiceClient client) { + for (final streamId in streamListeners.keys.toList()) { + streamCancel(client, streamId); + } + } + + // These error codes must be kept in sync with those in vm/json_stream.h and + // vmservice.dart. + static const kStreamAlreadySubscribed = 103; + static const kStreamNotSubscribed = 104; + + // Keep these messages in sync with the VM service. + static final kStreamAlreadySubscribedException = json_rpc.RpcException( + kStreamAlreadySubscribed, + 'Stream already subscribed', + ); + + static final kStreamNotSubscribedException = json_rpc.RpcException( + kStreamNotSubscribed, + 'Stream not subscribed', + ); + + final _DartDevelopmentService dds; + final streamListeners = >{}; +} diff --git a/pkg/dds/pubspec.yaml b/pkg/dds/pubspec.yaml index 8734df5f3cd..9dbb70e8219 100644 --- a/pkg/dds/pubspec.yaml +++ b/pkg/dds/pubspec.yaml @@ -11,13 +11,15 @@ environment: sdk: '>=2.6.0 <3.0.0' dependencies: + async: ^2.4.1 json_rpc_2: ^2.1.0 + pedantic: ^1.7.0 shelf: ^0.7.5 shelf_proxy: ^0.1.0+7 shelf_web_socket: ^0.2.3 + stream_channel: ^2.0.0 web_socket_channel: ^1.1.0 dev_dependencies: - pedantic: ^1.7.0 test: ^1.0.0 vm_service: ^4.0.0 diff --git a/pkg/vm_service/CHANGELOG.md b/pkg/vm_service/CHANGELOG.md index cbe922209b9..f4f24b5cd88 100644 --- a/pkg/vm_service/CHANGELOG.md +++ b/pkg/vm_service/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased +- Fixed issue where RPC format did not conform to the JSON-RPC 2.0 + specification. + ## 4.0.0 - **breaking**: RPCs which can return a `Sentinel` will now throw a `SentinelException` if a `Sential` is received as a response. diff --git a/pkg/vm_service/lib/src/vm_service.dart b/pkg/vm_service/lib/src/vm_service.dart index 03d939e28ae..a7660078b2d 100644 --- a/pkg/vm_service/lib/src/vm_service.dart +++ b/pkg/vm_service/lib/src/vm_service.dart @@ -1955,13 +1955,17 @@ class VmService implements VmServiceInterface { Future get onDone => _onDoneCompleter.future; - Future _call(String method, [Map args]) { + Future _call(String method, [Map args = const {}]) { String id = '${++_id}'; Completer completer = Completer(); _completers[id] = completer; _methodCalls[id] = method; - Map m = {'id': id, 'method': method}; - if (args != null) m['params'] = args; + Map m = { + 'jsonrpc': '2.0', + 'id': id, + 'method': method, + 'params': args, + }; String message = jsonEncode(m); _onSend.add(message); _writeMessage(message); diff --git a/pkg/vm_service/tool/dart/generate_dart.dart b/pkg/vm_service/tool/dart/generate_dart.dart index 0e85f27221d..b85bc8cf62a 100644 --- a/pkg/vm_service/tool/dart/generate_dart.dart +++ b/pkg/vm_service/tool/dart/generate_dart.dart @@ -118,13 +118,12 @@ final String _implCode = r''' Future get onDone => _onDoneCompleter.future; - Future _call(String method, [Map args]) { + Future _call(String method, [Map args = const {}]) { String id = '${++_id}'; Completer completer = Completer(); _completers[id] = completer; _methodCalls[id] = method; - Map m = {'id': id, 'method': method}; - if (args != null) m['params'] = args; + Map m = {'jsonrpc': '2.0', 'id': id, 'method': method, 'params': args,}; String message = jsonEncode(m); _onSend.add(message); _writeMessage(message); diff --git a/runtime/observatory/.packages b/runtime/observatory/.packages index 29f9ce27cac..dd7012f2309 100644 --- a/runtime/observatory/.packages +++ b/runtime/observatory/.packages @@ -16,6 +16,7 @@ async:../../third_party/pkg/async/lib boolean_selector:../../third_party/pkg/boolean_selector/lib charcode:../../third_party/pkg/charcode/lib dart_internal:../../pkg/dart_internal/lib +dds:../../pkg/dds/lib expect:../../pkg/expect/lib http:../../third_party/pkg/http/lib http_parser:../../third_party/pkg/http_parser/lib diff --git a/runtime/observatory/lib/service_common.dart b/runtime/observatory/lib/service_common.dart index 66ff984c047..48181ff55d9 100644 --- a/runtime/observatory/lib/service_common.dart +++ b/runtime/observatory/lib/service_common.dart @@ -318,8 +318,12 @@ abstract class CommonWebSocketVM extends VM { 'params': {'id': serial, 'query': request.method} }); } else { - message = json.encode( - {'id': serial, 'method': request.method, 'params': request.params}); + message = json.encode({ + 'jsonrpc': '2.0', + 'id': serial, + 'method': request.method, + 'params': request.params + }); } if (request.method != 'getTagProfile' && request.method != 'getIsolateMetric' && diff --git a/runtime/observatory/tests/service/break_on_default_constructor_test.dart b/runtime/observatory/tests/service/break_on_default_constructor_test.dart index bc504c741d2..d968ca21cfa 100644 --- a/runtime/observatory/tests/service/break_on_default_constructor_test.dart +++ b/runtime/observatory/tests/service/break_on_default_constructor_test.dart @@ -66,7 +66,7 @@ var tests = [ fail("Expected to find function"); } - isolate.resume(); + await isolate.resume(); } ]; diff --git a/runtime/observatory/tests/service/client_name_rpc_test.dart b/runtime/observatory/tests/service/client_name_rpc_test.dart index 41cbe4fb379..fa204302d19 100644 --- a/runtime/observatory/tests/service/client_name_rpc_test.dart +++ b/runtime/observatory/tests/service/client_name_rpc_test.dart @@ -63,4 +63,9 @@ var tests = [ }, ]; -main(args) async => runVMTests(args, tests); +main(args) async => runVMTests( + args, + tests, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_approve_then_disconnect_test.dart b/runtime/observatory/tests/service/client_resume_approvals_approve_then_disconnect_test.dart index 427706fdfd2..462e3dc414b 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_approve_then_disconnect_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_approve_then_disconnect_test.dart @@ -60,5 +60,12 @@ final test = [ hasStoppedAtExit, ]; -Future main(args) => runIsolateTests(args, test, - testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true); +Future main(args) => runIsolateTests( + args, + test, + testeeConcurrent: fooBar, + pause_on_start: true, + pause_on_exit: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_disconnect_test.dart b/runtime/observatory/tests/service/client_resume_approvals_disconnect_test.dart index 9d235224407..50c5d34609f 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_disconnect_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_disconnect_test.dart @@ -58,5 +58,12 @@ final test = [ hasStoppedAtExit, ]; -Future main(args) => runIsolateTests(args, test, - testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true); +Future main(args) => runIsolateTests( + args, + test, + testeeConcurrent: fooBar, + pause_on_start: true, + pause_on_exit: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_identical_names_test.dart b/runtime/observatory/tests/service/client_resume_approvals_identical_names_test.dart index d3a09a962d5..27e026c50e5 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_identical_names_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_identical_names_test.dart @@ -40,5 +40,12 @@ final sameClientNamesTest = [ hasStoppedAtExit, ]; -Future main(args) => runIsolateTests(args, sameClientNamesTest, - testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true); +Future main(args) => runIsolateTests( + args, + sameClientNamesTest, + testeeConcurrent: fooBar, + pause_on_start: true, + pause_on_exit: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_multiple_names_test.dart b/runtime/observatory/tests/service/client_resume_approvals_multiple_names_test.dart index 5f26a9c9def..e92442c1c7e 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_multiple_names_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_multiple_names_test.dart @@ -58,5 +58,12 @@ final multipleClientNamesTest = [ }, ]; -Future main(args) => runIsolateTests(args, multipleClientNamesTest, - testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true); +Future main(args) => runIsolateTests( + args, + multipleClientNamesTest, + testeeConcurrent: fooBar, + pause_on_start: true, + pause_on_exit: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_name_change_test.dart b/runtime/observatory/tests/service/client_resume_approvals_name_change_test.dart index 3d33112bc54..b20a5936103 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_name_change_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_name_change_test.dart @@ -51,5 +51,12 @@ final nameChangeTest = [ hasStoppedAtExit, ]; -Future main(args) => runIsolateTests(args, nameChangeTest, - testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true); +Future main(args) => runIsolateTests( + args, + nameChangeTest, + testeeConcurrent: fooBar, + pause_on_start: true, + pause_on_exit: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/client_resume_approvals_reload_test.dart b/runtime/observatory/tests/service/client_resume_approvals_reload_test.dart index 250d34978ba..50f7f459411 100644 --- a/runtime/observatory/tests/service/client_resume_approvals_reload_test.dart +++ b/runtime/observatory/tests/service/client_resume_approvals_reload_test.dart @@ -60,5 +60,11 @@ final hotReloadTest = [ }, ]; -Future main(args) => runIsolateTests(args, hotReloadTest, - testeeConcurrent: fooBar, pause_on_start: true); +Future main(args) => runIsolateTests( + args, + hotReloadTest, + testeeConcurrent: fooBar, + pause_on_start: true, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/enable_service_port_fallback_positive_test.dart b/runtime/observatory/tests/service/enable_service_port_fallback_positive_test.dart index ca25d5f9f5f..bbac57444e9 100644 --- a/runtime/observatory/tests/service/enable_service_port_fallback_positive_test.dart +++ b/runtime/observatory/tests/service/enable_service_port_fallback_positive_test.dart @@ -21,11 +21,15 @@ int selectedPort = 0; main(args) async { selectedPort = await _getUnusedPort(); - await runVMTests(args, tests, - enable_service_port_fallback: true, - // Choose a port number that should always be open. - port: selectedPort, - extraArgs: []); + await runVMTests( + args, tests, + enable_service_port_fallback: true, + // Choose a port number that should always be open. + port: selectedPort, + extraArgs: [], + // TODO(bkonyi): investigate failure. + enableDds: false, + ); } Future _getUnusedPort() async { diff --git a/runtime/observatory/tests/service/external_service_asynchronous_invocation_test.dart b/runtime/observatory/tests/service/external_service_asynchronous_invocation_test.dart index 277816791cc..58d44deba1e 100644 --- a/runtime/observatory/tests/service/external_service_asynchronous_invocation_test.dart +++ b/runtime/observatory/tests/service/external_service_asynchronous_invocation_test.dart @@ -164,4 +164,8 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/external_service_disappear_test.dart b/runtime/observatory/tests/service/external_service_disappear_test.dart index eafe5a698d6..a015686b2e3 100644 --- a/runtime/observatory/tests/service/external_service_disappear_test.dart +++ b/runtime/observatory/tests/service/external_service_disappear_test.dart @@ -89,4 +89,8 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/external_service_notification_invocation_test.dart b/runtime/observatory/tests/service/external_service_notification_invocation_test.dart index 3b1e6953350..a5bfd64fcf3 100644 --- a/runtime/observatory/tests/service/external_service_notification_invocation_test.dart +++ b/runtime/observatory/tests/service/external_service_notification_invocation_test.dart @@ -83,4 +83,9 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, + tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/external_service_registration_test.dart b/runtime/observatory/tests/service/external_service_registration_test.dart index 2ec0d95967d..ce0aea9c7c4 100644 --- a/runtime/observatory/tests/service/external_service_registration_test.dart +++ b/runtime/observatory/tests/service/external_service_registration_test.dart @@ -123,4 +123,9 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, + tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/external_service_registration_via_notification_test.dart b/runtime/observatory/tests/service/external_service_registration_via_notification_test.dart index 333c24e626c..80729ce1839 100644 --- a/runtime/observatory/tests/service/external_service_registration_via_notification_test.dart +++ b/runtime/observatory/tests/service/external_service_registration_via_notification_test.dart @@ -103,4 +103,8 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/external_service_synchronous_invocation_test.dart b/runtime/observatory/tests/service/external_service_synchronous_invocation_test.dart index 97917b33fed..45c58d82490 100644 --- a/runtime/observatory/tests/service/external_service_synchronous_invocation_test.dart +++ b/runtime/observatory/tests/service/external_service_synchronous_invocation_test.dart @@ -126,4 +126,8 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, tests, + // TODO(bkonyi): service extensions are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/get_client_name_rpc_test.dart b/runtime/observatory/tests/service/get_client_name_rpc_test.dart index d503a91c362..33f2c21ddd7 100644 --- a/runtime/observatory/tests/service/get_client_name_rpc_test.dart +++ b/runtime/observatory/tests/service/get_client_name_rpc_test.dart @@ -37,4 +37,10 @@ final test = [ }, ]; -Future main(args) => runIsolateTests(args, test, testeeBefore: fooBar); +Future main(args) => runIsolateTests( + args, + test, + testeeBefore: fooBar, + // TODO(bkonyi): client names are not yet supported in DDS. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/malformed_test.dart b/runtime/observatory/tests/service/malformed_test.dart index 5fff3fdb13d..9e277c8400b 100644 --- a/runtime/observatory/tests/service/malformed_test.dart +++ b/runtime/observatory/tests/service/malformed_test.dart @@ -37,4 +37,11 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests); +main(args) => runIsolateTests( + args, + tests, + // This test hangs with DDS as package:json_rpc_2 can't parse the JSON + // response and is unable to determine the request ID, so the malformed + // JSON request will never complete. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/observatory_assets_test.dart b/runtime/observatory/tests/service/observatory_assets_test.dart index 4b8f8e2044b..a78f448b8e9 100644 --- a/runtime/observatory/tests/service/observatory_assets_test.dart +++ b/runtime/observatory/tests/service/observatory_assets_test.dart @@ -22,4 +22,8 @@ var tests = [ } ]; -main(args) async => runVMTests(args, tests); +main(args) async => runVMTests( + args, tests, + // TODO(bkonyi): DDS doesn't forward Observatory assets properly yet. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/pause_on_start_and_exit_with_child_test.dart b/runtime/observatory/tests/service/pause_on_start_and_exit_with_child_test.dart index 1d3a8bf7717..cb4b288d61b 100644 --- a/runtime/observatory/tests/service/pause_on_start_and_exit_with_child_test.dart +++ b/runtime/observatory/tests/service/pause_on_start_and_exit_with_child_test.dart @@ -101,12 +101,16 @@ var tests = [ }, ]; -main(args) => runIsolateTests(args, tests, - testeeConcurrent: testMain, - pause_on_start: true, - pause_on_exit: true, - verbose_vm: true, - extraArgs: [ - '--trace-service', - '--trace-service-verbose', - ]); +main(args) => runIsolateTests( + args, tests, + testeeConcurrent: testMain, + pause_on_start: true, + pause_on_exit: true, + verbose_vm: true, + extraArgs: [ + '--trace-service', + '--trace-service-verbose', + ], + // TODO(bkonyi): investigate failure. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/pause_on_unhandled_async_exceptions2_test.dart b/runtime/observatory/tests/service/pause_on_unhandled_async_exceptions2_test.dart index 850177a10c2..6f982849d72 100644 --- a/runtime/observatory/tests/service/pause_on_unhandled_async_exceptions2_test.dart +++ b/runtime/observatory/tests/service/pause_on_unhandled_async_exceptions2_test.dart @@ -61,7 +61,13 @@ var tests = [ } ]; -main(args) => runIsolateTests(args, tests, - pause_on_unhandled_exceptions: true, - testeeConcurrent: testeeMain, - extraArgs: extraDebuggingArgs); +main(args) => runIsolateTests( + args, + tests, + pause_on_unhandled_exceptions: true, + testeeConcurrent: testeeMain, + extraArgs: extraDebuggingArgs, + // TODO(bkonyi): causes ASSERT in debug mode, unrelated to DDS. + // See https://github.com/dart-lang/sdk/issues/41379. + enableDds: false, + ); diff --git a/runtime/observatory/tests/service/stream_subscription_test.dart b/runtime/observatory/tests/service/stream_subscription_test.dart new file mode 100644 index 00000000000..56f8420c10a --- /dev/null +++ b/runtime/observatory/tests/service/stream_subscription_test.dart @@ -0,0 +1,58 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:observatory/service_io.dart'; +import 'package:test/test.dart'; + +import 'test_helper.dart'; + +Future streamListen(VM vm, String streamId) async => + await vm.invokeRpcNoUpgrade( + 'streamListen', + { + 'streamId': streamId, + }, + ); + +Future streamCancel(VM vm, String streamId) async => + await vm.invokeRpcNoUpgrade( + 'streamCancel', + { + 'streamId': streamId, + }, + ); + +var tests = [ + // Check double subscription fails. + (VM vm) async { + await streamListen(vm, '_Echo'); + try { + await streamListen(vm, '_Echo'); + fail('Subscribed to stream twice'); + } on ServerRpcException catch (e) { + expect(e.message, 'Stream already subscribed'); + } + }, + // Check double cancellation fails. + (VM vm) async { + await streamCancel(vm, '_Echo'); + try { + await streamCancel(vm, '_Echo'); + fail('Double cancellation of stream successful'); + } on ServerRpcException catch (e) { + expect(e.message, 'Stream not subscribed'); + } + }, + // Check subscription to invalid stream fails. + (VM vm) async { + try { + await streamListen(vm, 'Foo'); + fail('Subscribed to invalid stream'); + } on ServerRpcException catch (e) { + expect(e.message, "streamListen: invalid 'streamId' parameter: Foo"); + } + } +]; + +main(args) => runVMTests(args, tests); diff --git a/runtime/observatory/tests/service/test_helper.dart b/runtime/observatory/tests/service/test_helper.dart index 1dd59892c6e..baa8f09a0e5 100644 --- a/runtime/observatory/tests/service/test_helper.dart +++ b/runtime/observatory/tests/service/test_helper.dart @@ -7,6 +7,7 @@ library test_helper; import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:dds/dds.dart'; import 'package:observatory/service_io.dart'; import 'package:test/test.dart'; import 'service_test_common.dart'; @@ -97,6 +98,8 @@ class _ServiceTesteeRunner { class _ServiceTesteeLauncher { Process process; final List args; + Future get exited => _processCompleter.future; + final _processCompleter = Completer(); bool killedByTester = false; _ServiceTesteeLauncher() : args = [Platform.script.toFilePath()] {} @@ -286,6 +289,7 @@ class _ServiceTesteeLauncher { throw "Testee exited with $exitCode"; } print("** Process exited"); + _processCompleter.complete(); }); // Wait for the blank line which signals that we're ready to run. @@ -295,9 +299,16 @@ class _ServiceTesteeLauncher { } final content = await serviceInfoFile.readAsString(); final infoJson = json.decode(content); - uri = Uri.parse(infoJson['uri']); + String rawUri = infoJson['uri']; + + // If rawUri ends with a /, Uri.parse will include an empty string as the + // last path segment. Make sure it's not there to ensure we have a + // consistent Uri. + if (rawUri.endsWith('/')) { + rawUri = rawUri.substring(0, rawUri.length - 1); + } + uri = Uri.parse(rawUri); completer.complete(uri); - print('** Signaled to run test queries on $uri'); }); return completer.future; } @@ -314,7 +325,7 @@ class _ServiceTesteeLauncher { void setupAddresses(Uri serverAddress) { serviceWebsocketAddress = - 'ws://${serverAddress.authority}${serverAddress.path}ws'; + 'ws://${serverAddress.authority}${serverAddress.path}/ws'; serviceHttpAddress = 'http://${serverAddress.authority}${serverAddress.path}'; } @@ -331,78 +342,110 @@ class _ServiceTesterRunner { bool pause_on_unhandled_exceptions: false, bool enable_service_port_fallback: false, bool testeeControlsServer: false, + bool enableDds: true, int port = 0, }) { if (executableArgs == null) { executableArgs = Platform.executableArguments; } - final process = new _ServiceTesteeLauncher(); final name = Platform.script.pathSegments.last; - WebSocketVM vm; - setUp(() async { - await process - .launch( - pause_on_start, - pause_on_exit, - pause_on_unhandled_exceptions, - enable_service_port_fallback, - testeeControlsServer, - port, - extraArgs, - executableArgs) - .then((Uri serverAddress) async { - if (mainArgs.contains("--gdb")) { - final pid = process.process.pid; - final wait = new Duration(seconds: 10); - print("Testee has pid $pid, waiting $wait before continuing"); - sleep(wait); - } - setupAddresses(serverAddress); - vm = new WebSocketVM(new WebSocketVMTarget(serviceWebsocketAddress)); - print('Loading VM...'); - await vm.load(); - print('Done loading VM'); - }); - }); - test( - name, - () async { - // Run vm tests. - if (vmTests != null) { - int testIndex = 1; - final totalTests = vmTests.length; - for (var test in vmTests) { - vm.verbose = verbose_vm; - print('Running $name [$testIndex/$totalTests]'); - testIndex++; - await test(vm); + runTest(String name, {bool enableDds: false}) { + test( + '$name (${enableDds ? 'DDS' : 'VM Service'})', + () async { + final process = new _ServiceTesteeLauncher(); + bool testsDone = false; + try { + DartDevelopmentService dds; + WebSocketVM vm; + await process + .launch( + pause_on_start, + pause_on_exit, + pause_on_unhandled_exceptions, + enable_service_port_fallback, + testeeControlsServer, + port, + extraArgs, + executableArgs) + .then((Uri serverAddress) async { + if (mainArgs.contains("--gdb")) { + final pid = process.process.pid; + final wait = new Duration(seconds: 10); + print("Testee has pid $pid, waiting $wait before continuing"); + sleep(wait); + } + if (enableDds) { + dds = await DartDevelopmentService.startDartDevelopmentService( + serverAddress); + setupAddresses(dds.uri); + } else { + setupAddresses(serverAddress); + } + print('** Signaled to run test queries on $serviceHttpAddress' + ' (${enableDds ? "DDS" : "VM Service"})'); + vm = new WebSocketVM( + new WebSocketVMTarget(serviceWebsocketAddress)); + print('Loading VM...'); + await vm.load(); + print('Done loading VM'); + }); + + // Run vm tests. + if (vmTests != null) { + int testIndex = 1; + final totalTests = vmTests.length; + for (var test in vmTests) { + vm.verbose = verbose_vm; + print('Running $name [$testIndex/$totalTests]'); + testIndex++; + await test(vm); + } + } + + // Run isolate tests. + if (isolateTests != null) { + final isolate = await getFirstIsolate(vm); + int testIndex = 1; + final totalTests = isolateTests.length; + for (var test in isolateTests) { + vm.verbose = verbose_vm; + print('Running $name [$testIndex/$totalTests]'); + testIndex++; + await test(isolate); + } + } + + print('All service tests completed successfully.'); + testsDone = true; + if (enableDds) { + await dds.shutdown(); + } + process.requestExit(); + + // Wait for the process to exit so we don't have interleaved + // logging between DDS and VM Service runs. + await process.exited; + } catch (error, stackTrace) { + if (testsDone) { + print('Ignoring late exception during process exit:\n' + '$error\n$stackTrace'); + } else { + rethrow; + } } - } + }, + // Some service tests run fairly long (e.g., valid_source_locations_test). + timeout: Timeout.none, + ); + } - // Run isolate tests. - if (isolateTests != null) { - final isolate = await getFirstIsolate(vm); - int testIndex = 1; - final totalTests = isolateTests.length; - for (var test in isolateTests) { - vm.verbose = verbose_vm; - print('Running $name [$testIndex/$totalTests]'); - testIndex++; - await test(isolate); - } - } - }, - retry: 0, - // Some service tests run fairly long (e.g., valid_source_locations_test). - timeout: Timeout.none, - ); - - tearDown(() { - print('All service tests completed successfully.'); - process.requestExit(); - }); + runTest(name); + if (enableDds) { + runTest(name, enableDds: true); + } } Future getFirstIsolate(WebSocketVM vm) async { @@ -460,6 +503,7 @@ Future runIsolateTests(List mainArgs, List tests, bool verbose_vm: false, bool pause_on_unhandled_exceptions: false, bool testeeControlsServer: false, + bool enableDds: true, List extraArgs}) async { assert(!pause_on_start || testeeBefore == null); if (_isTestee()) { @@ -477,7 +521,8 @@ Future runIsolateTests(List mainArgs, List tests, pause_on_exit: pause_on_exit, verbose_vm: verbose_vm, pause_on_unhandled_exceptions: pause_on_unhandled_exceptions, - testeeControlsServer: testeeControlsServer); + testeeControlsServer: testeeControlsServer, + enableDds: enableDds); } } @@ -529,6 +574,7 @@ Future runVMTests(List mainArgs, List tests, bool verbose_vm: false, bool pause_on_unhandled_exceptions: false, bool enable_service_port_fallback: false, + bool enableDds: true, int port = 0, List extraArgs, List executableArgs}) async { @@ -549,6 +595,7 @@ Future runVMTests(List mainArgs, List tests, verbose_vm: verbose_vm, pause_on_unhandled_exceptions: pause_on_unhandled_exceptions, enable_service_port_fallback: enable_service_port_fallback, + enableDds: enableDds, port: port, ); } diff --git a/runtime/observatory/tests/service/vm_timeline_events_test.dart b/runtime/observatory/tests/service/vm_timeline_events_test.dart index 1b77ed15c1c..a47bcf21aa2 100644 --- a/runtime/observatory/tests/service/vm_timeline_events_test.dart +++ b/runtime/observatory/tests/service/vm_timeline_events_test.dart @@ -9,10 +9,14 @@ import 'package:test/test.dart'; import 'service_test_common.dart'; import 'test_helper.dart'; -primeDartTimeline() { +primeDartTimeline() async { while (true) { Timeline.startSync('apple'); Timeline.finishSync(); + // Give the VM a chance to send the timeline events. This test is + // significantly slower if we loop without yielding control after each + // iteration. + await Future.delayed(const Duration(milliseconds: 1)); } } @@ -22,8 +26,8 @@ List filterEvents(List events, filter) { return events.where(filter).toList(); } -Completer completer = new Completer(); -int eventCount = 0; +Completer completer; +int eventCount; onTimelineEvent(ServiceEvent event) { eventCount++; @@ -34,6 +38,11 @@ onTimelineEvent(ServiceEvent event) { } var tests = [ + (Isolate isolate) async { + // Clear global state. + eventCount = 0; + completer = Completer(); + }, (Isolate isolate) async { // Subscribe to the Timeline stream. await subscribeToStream(isolate.vm, VM.kTimelineStream, onTimelineEvent); diff --git a/sdk/lib/vmservice/vmservice.dart b/sdk/lib/vmservice/vmservice.dart index 6943f130ba6..81a2ac7c8a7 100644 --- a/sdk/lib/vmservice/vmservice.dart +++ b/sdk/lib/vmservice/vmservice.dart @@ -49,7 +49,8 @@ final String serviceAuthToken = _makeAuthToken(); final Map isolateEmbedderData = new Map(); -// These must be kept in sync with the declarations in vm/json_stream.h. +// These must be kept in sync with the declarations in vm/json_stream.h and +// pkg/dds/lib/src/stream_manager.dart. const kParseError = -32700; const kInvalidRequest = -32600; const kMethodNotFound = -32601; diff --git a/sdk_nnbd/lib/vmservice/vmservice.dart b/sdk_nnbd/lib/vmservice/vmservice.dart index f5a176e8b89..807fd000d31 100644 --- a/sdk_nnbd/lib/vmservice/vmservice.dart +++ b/sdk_nnbd/lib/vmservice/vmservice.dart @@ -46,7 +46,8 @@ final serviceAuthToken = _makeAuthToken(); // the cleanup method will be invoked after being removed from the map. final isolateEmbedderData = {}; -// These must be kept in sync with the declarations in vm/json_stream.h. +// These must be kept in sync with the declarations in vm/json_stream.h and +// pkg/dds/lib/src/stream_manager.dart. const kParseError = -32700; const kInvalidRequest = -32600; const kMethodNotFound = -32601; diff --git a/tools/bots/test_matrix.json b/tools/bots/test_matrix.json index 1fc2765c263..79ecfc7ad57 100644 --- a/tools/bots/test_matrix.json +++ b/tools/bots/test_matrix.json @@ -331,6 +331,7 @@ "pkg/dart_internal/", "pkg/dart2native/", "pkg/dart2js_tools/", + "pkg/dds/", "pkg/expect/", "pkg/front_end/", "pkg/js/", @@ -417,6 +418,7 @@ "pkg/dart_internal/", "pkg/dart2native/", "pkg/dart2js_tools/", + "pkg/dds/", "pkg/expect/", "pkg/front_end/", "pkg/js/",