[ Service / package:dds ] Add stream support to package:dds and enable DDS for VM service tests

This change adds stream forwarding to the Dart Development Service,
allowing for clients to subscribe to service protocol streams with DDS
instead of the VM service directly. DDS will maintain a single
subscription for each stream as long as at least one client is listening
to that stream. A DDS stream subscription will be closed when the last
client listening to that stream either disconnects or calls
streamCancel.

This change also enables DDS for most of the Observatory services tests,
excluding thoses which utilize:

- Service extensions
- Client naming
- Client isolate resume synchronization

Change-Id: I5641e879a7626fcd5e4d28434ed480dd72fc7659
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/142083
Reviewed-by: Ryan Macnak <rmacnak@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Ben Konyi 2020-04-07 23:18:45 +00:00 committed by commit-bot@chromium.org
parent 732d4ef961
commit 6b2419ddaf
38 changed files with 649 additions and 134 deletions

2
DEPS
View file

@ -147,7 +147,7 @@ vars = {
"usage_tag": "3.4.0",
"watcher_rev": "0.9.7+14",
"web_components_rev": "8f57dac273412a7172c8ade6f361b407e2e4ed02",
"web_socket_channel_tag": "1.0.15",
"web_socket_channel_tag": "1.1.0",
"WebCore_rev": "fb11e887f77919450e497344da570d780e078bc8",
"yaml_tag": "2.2.0",
"zlib_rev": "c44fb7248079cc3d5563b14b3f758aee60d6b415",

View file

@ -7,15 +7,24 @@
library dds;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:async/async.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_proxy/shelf_proxy.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
part 'src/binary_compatible_peer.dart';
part 'src/client.dart';
part 'src/dds_impl.dart';
part 'src/stream_manager.dart';
/// An intermediary between a Dart VM service and its clients that offers
/// additional functionality on top of the standard VM service protocol.

View file

@ -0,0 +1,60 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
/// Adds support for binary events send from the VM service, which are not part
/// of the official JSON RPC 2.0 specification.
///
/// A binary event from the VM service has the form:
/// ```
/// type BinaryEvent {
/// dataOffset : uint32,
/// metadata : uint8[dataOffset-4],
/// data : uint8[],
/// }
/// ```
/// where `metadata` is the JSON body of the event.
///
/// [_BinaryCompatiblePeer] assumes that only stream events can contain a
/// binary payload (e.g., clients cannot send a `BinaryEvent` to the VM service).
class _BinaryCompatiblePeer extends json_rpc.Peer {
_BinaryCompatiblePeer(WebSocketChannel ws, _StreamManager streamManager)
: super(
ws.transform<String>(
StreamChannelTransformer(
StreamTransformer<dynamic, String>.fromHandlers(
handleData: (data, EventSink<String> sink) =>
_transformStream(streamManager, data, sink)),
StreamSinkTransformer<String, dynamic>.fromHandlers(
handleData: (String data, EventSink<dynamic> sink) {
sink.add(data);
},
),
),
),
);
static void _transformStream(
_StreamManager streamManager, dynamic data, EventSink<String> sink) {
if (data is String) {
// Non-binary messages come in as Strings. Simply forward to the sink.
sink.add(data);
} else if (data is Uint8List) {
// Only binary events will result in `data` being of type Uint8List. We
// need to manually forward them here.
final bytesView =
ByteData.view(data.buffer, data.offsetInBytes, data.lengthInBytes);
const metadataOffset = 4;
final dataOffset = bytesView.getUint32(0, Endian.little);
final metadataLength = dataOffset - metadataOffset;
final metadata = Utf8Decoder().convert(new Uint8List.view(
bytesView.buffer,
bytesView.offsetInBytes + metadataOffset,
metadataLength));
final decodedMetadata = json.decode(metadata);
streamManager.streamNotify(decodedMetadata['params']['streamId'], data);
}
}
}

View file

@ -0,0 +1,68 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class _DartDevelopmentServiceClient {
_DartDevelopmentServiceClient(
this.dds,
this.ws,
json_rpc.Peer vmServicePeer,
) : _vmServicePeer = vmServicePeer {
_clientPeer = json_rpc.Peer(ws.cast<String>());
_registerJsonRpcMethods();
}
/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dds.streamManager.clientDisconnect(this),
);
/// Close the connection to the client.
Future<void> close() async {
// Cleanup the JSON RPC server for this connection if DDS has shutdown.
await _clientPeer.close();
}
/// Send a JSON RPC notification to the client.
void sendNotification(String method, [dynamic parameters]) async {
if (_clientPeer.isClosed) {
return;
}
_clientPeer.sendNotification(method, parameters);
}
/// Registers handlers for JSON RPC methods which need to be intercepted by
/// DDS as well as fallback request forwarder.
void _registerJsonRpcMethods() {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamListen(this, streamId);
return _success;
});
_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return _success;
});
// Unless otherwise specified, the request is forwarded to the VM service.
_clientPeer.registerFallback((parameters) async =>
await _vmServicePeer.sendRequest(parameters.method, parameters.asMap));
}
static const _success = <String, dynamic>{
'type': 'Success',
};
final _DartDevelopmentService dds;
final json_rpc.Peer _vmServicePeer;
final WebSocketChannel ws;
json_rpc.Peer _clientPeer;
}

View file

@ -5,12 +5,20 @@
part of dds;
class _DartDevelopmentService implements DartDevelopmentService {
_DartDevelopmentService(this._remoteVmServiceUri, this._uri);
_DartDevelopmentService(this._remoteVmServiceUri, this._uri) {
_streamManager = _StreamManager(this);
}
Future<void> startService() async {
// Establish the connection to the VM service.
_vmServiceSocket = await WebSocket.connect(remoteVmServiceWsUri.toString());
_vmServiceStream = _vmServiceSocket.asBroadcastStream();
_vmServiceSocket = WebSocketChannel.connect(remoteVmServiceWsUri);
_vmServiceClient = _BinaryCompatiblePeer(_vmServiceSocket, _streamManager);
// Setup the JSON RPC client with the VM service.
unawaited(_vmServiceClient.listen());
// Setup stream event handling.
streamManager.listen();
// Once we have a connection to the VM service, we're ready to spawn the intermediary.
await _startDDSServer();
}
@ -28,8 +36,20 @@ class _DartDevelopmentService implements DartDevelopmentService {
/// Stop accepting requests after gracefully handling existing requests.
Future<void> shutdown() async {
// Don't accept anymore HTTP requests.
await _server.close();
await _vmServiceSocket.close();
// Close all incoming websocket connections.
final futures = <Future>[];
for (final client in _clients) {
futures.add(client.close());
}
await Future.wait(futures);
// Close connection to VM service.
await _vmServiceSocket.sink.close();
_done.complete();
}
// Attempt to upgrade HTTP requests to a websocket before processing them as
@ -38,16 +58,18 @@ class _DartDevelopmentService implements DartDevelopmentService {
Cascade _handlers() => Cascade().add(_webSocketHandler()).add(_httpHandler());
Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
_vmServiceStream.listen(
(event) => ws.sink.add(event),
onDone: () => ws.sink.close(),
final client = _DartDevelopmentServiceClient(
this,
ws,
_vmServiceClient,
);
ws.stream.listen((event) => _vmServiceSocket.add(event));
_clients.add(client);
client.listen().then((_) => _clients.remove(client));
});
Handler _httpHandler() {
// TODO(bkonyi): actually process requests instead of blindly forwarding them.
// DDS doesn't support any HTTP requests itself, so we just forward all of
// them to the VM service.
final cascade = Cascade().add(proxyHandler(remoteVmServiceUri));
return cascade.handler;
}
@ -79,7 +101,15 @@ class _DartDevelopmentService implements DartDevelopmentService {
bool get isRunning => _uri != null;
WebSocket _vmServiceSocket;
Stream _vmServiceStream;
Future<void> get done => _done.future;
Completer _done = Completer<void>();
_StreamManager get streamManager => _streamManager;
_StreamManager _streamManager;
final List<_DartDevelopmentServiceClient> _clients = [];
json_rpc.Peer _vmServiceClient;
WebSocketChannel _vmServiceSocket;
HttpServer _server;
}

View file

@ -0,0 +1,113 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
class _StreamManager {
_StreamManager(this.dds);
/// Send `streamNotify` notifications to clients subscribed to `streamId`.
///
/// If `data` is of type `Uint8List`, the notification is assumed to be a
/// binary event and is forwarded directly over the subscriber's websocket.
/// Otherwise, the event is sent via the JSON RPC client.
void streamNotify(String streamId, data) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId];
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (isBinaryData) {
listener.ws.sink.add(data);
} else {
listener.sendNotification('streamNotify', data);
}
}
}
}
/// Start listening for `streamNotify` events from the VM service and forward
/// them to the clients which have subscribed to the stream.
void listen() => dds._vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
final streamId = parameters['streamId'].asString;
streamNotify(streamId, parameters.value);
},
);
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
Future<void> streamListen(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds._vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
streamListeners[stream] = <_DartDevelopmentServiceClient>[];
}
if (streamListeners[stream].contains(client)) {
throw kStreamAlreadySubscribedException;
}
streamListeners[stream].add(client);
}
/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
/// send a `streamCancel` request for `stream` to the VM service.
Future<void> streamCancel(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
if (listeners.isEmpty) {
streamListeners.remove(stream);
final result = await dds._vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(_DartDevelopmentServiceClient client) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId);
}
}
// These error codes must be kept in sync with those in vm/json_stream.h and
// vmservice.dart.
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;
// Keep these messages in sync with the VM service.
static final kStreamAlreadySubscribedException = json_rpc.RpcException(
kStreamAlreadySubscribed,
'Stream already subscribed',
);
static final kStreamNotSubscribedException = json_rpc.RpcException(
kStreamNotSubscribed,
'Stream not subscribed',
);
final _DartDevelopmentService dds;
final streamListeners = <String, List<_DartDevelopmentServiceClient>>{};
}

View file

@ -11,13 +11,15 @@ environment:
sdk: '>=2.6.0 <3.0.0'
dependencies:
async: ^2.4.1
json_rpc_2: ^2.1.0
pedantic: ^1.7.0
shelf: ^0.7.5
shelf_proxy: ^0.1.0+7
shelf_web_socket: ^0.2.3
stream_channel: ^2.0.0
web_socket_channel: ^1.1.0
dev_dependencies:
pedantic: ^1.7.0
test: ^1.0.0
vm_service: ^4.0.0

View file

@ -1,5 +1,9 @@
# Changelog
## Unreleased
- Fixed issue where RPC format did not conform to the JSON-RPC 2.0
specification.
## 4.0.0
- **breaking**: RPCs which can return a `Sentinel` will now throw a `SentinelException`
if a `Sential` is received as a response.

View file

@ -1955,13 +1955,17 @@ class VmService implements VmServiceInterface {
Future get onDone => _onDoneCompleter.future;
Future<T> _call<T>(String method, [Map args]) {
Future<T> _call<T>(String method, [Map args = const {}]) {
String id = '${++_id}';
Completer<T> completer = Completer<T>();
_completers[id] = completer;
_methodCalls[id] = method;
Map m = {'id': id, 'method': method};
if (args != null) m['params'] = args;
Map m = {
'jsonrpc': '2.0',
'id': id,
'method': method,
'params': args,
};
String message = jsonEncode(m);
_onSend.add(message);
_writeMessage(message);

View file

@ -118,13 +118,12 @@ final String _implCode = r'''
Future get onDone => _onDoneCompleter.future;
Future<T> _call<T>(String method, [Map args]) {
Future<T> _call<T>(String method, [Map args = const {}]) {
String id = '${++_id}';
Completer<T> completer = Completer<T>();
_completers[id] = completer;
_methodCalls[id] = method;
Map m = {'id': id, 'method': method};
if (args != null) m['params'] = args;
Map m = {'jsonrpc': '2.0', 'id': id, 'method': method, 'params': args,};
String message = jsonEncode(m);
_onSend.add(message);
_writeMessage(message);

View file

@ -16,6 +16,7 @@ async:../../third_party/pkg/async/lib
boolean_selector:../../third_party/pkg/boolean_selector/lib
charcode:../../third_party/pkg/charcode/lib
dart_internal:../../pkg/dart_internal/lib
dds:../../pkg/dds/lib
expect:../../pkg/expect/lib
http:../../third_party/pkg/http/lib
http_parser:../../third_party/pkg/http_parser/lib

View file

@ -318,8 +318,12 @@ abstract class CommonWebSocketVM extends VM {
'params': {'id': serial, 'query': request.method}
});
} else {
message = json.encode(
{'id': serial, 'method': request.method, 'params': request.params});
message = json.encode({
'jsonrpc': '2.0',
'id': serial,
'method': request.method,
'params': request.params
});
}
if (request.method != 'getTagProfile' &&
request.method != 'getIsolateMetric' &&

View file

@ -66,7 +66,7 @@ var tests = <IsolateTest>[
fail("Expected to find function");
}
isolate.resume();
await isolate.resume();
}
];

View file

@ -63,4 +63,9 @@ var tests = <VMTest>[
},
];
main(args) async => runVMTests(args, tests);
main(args) async => runVMTests(
args,
tests,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -60,5 +60,12 @@ final test = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, test,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -58,5 +58,12 @@ final test = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, test,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -40,5 +40,12 @@ final sameClientNamesTest = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, sameClientNamesTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
sameClientNamesTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -58,5 +58,12 @@ final multipleClientNamesTest = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, multipleClientNamesTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
multipleClientNamesTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -51,5 +51,12 @@ final nameChangeTest = <IsolateTest>[
hasStoppedAtExit,
];
Future<void> main(args) => runIsolateTests(args, nameChangeTest,
testeeConcurrent: fooBar, pause_on_start: true, pause_on_exit: true);
Future<void> main(args) => runIsolateTests(
args,
nameChangeTest,
testeeConcurrent: fooBar,
pause_on_start: true,
pause_on_exit: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -60,5 +60,11 @@ final hotReloadTest = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, hotReloadTest,
testeeConcurrent: fooBar, pause_on_start: true);
Future<void> main(args) => runIsolateTests(
args,
hotReloadTest,
testeeConcurrent: fooBar,
pause_on_start: true,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -21,11 +21,15 @@ int selectedPort = 0;
main(args) async {
selectedPort = await _getUnusedPort();
await runVMTests(args, tests,
enable_service_port_fallback: true,
// Choose a port number that should always be open.
port: selectedPort,
extraArgs: []);
await runVMTests(
args, tests,
enable_service_port_fallback: true,
// Choose a port number that should always be open.
port: selectedPort,
extraArgs: [],
// TODO(bkonyi): investigate failure.
enableDds: false,
);
}
Future<int> _getUnusedPort() async {

View file

@ -164,4 +164,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -89,4 +89,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -83,4 +83,9 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -123,4 +123,9 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -103,4 +103,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -126,4 +126,8 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args, tests,
// TODO(bkonyi): service extensions are not yet supported in DDS.
enableDds: false,
);

View file

@ -37,4 +37,10 @@ final test = <IsolateTest>[
},
];
Future<void> main(args) => runIsolateTests(args, test, testeeBefore: fooBar);
Future<void> main(args) => runIsolateTests(
args,
test,
testeeBefore: fooBar,
// TODO(bkonyi): client names are not yet supported in DDS.
enableDds: false,
);

View file

@ -37,4 +37,11 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests);
main(args) => runIsolateTests(
args,
tests,
// This test hangs with DDS as package:json_rpc_2 can't parse the JSON
// response and is unable to determine the request ID, so the malformed
// JSON request will never complete.
enableDds: false,
);

View file

@ -22,4 +22,8 @@ var tests = <VMTest>[
}
];
main(args) async => runVMTests(args, tests);
main(args) async => runVMTests(
args, tests,
// TODO(bkonyi): DDS doesn't forward Observatory assets properly yet.
enableDds: false,
);

View file

@ -101,12 +101,16 @@ var tests = <IsolateTest>[
},
];
main(args) => runIsolateTests(args, tests,
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
verbose_vm: true,
extraArgs: [
'--trace-service',
'--trace-service-verbose',
]);
main(args) => runIsolateTests(
args, tests,
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
verbose_vm: true,
extraArgs: [
'--trace-service',
'--trace-service-verbose',
],
// TODO(bkonyi): investigate failure.
enableDds: false,
);

View file

@ -61,7 +61,13 @@ var tests = <IsolateTest>[
}
];
main(args) => runIsolateTests(args, tests,
pause_on_unhandled_exceptions: true,
testeeConcurrent: testeeMain,
extraArgs: extraDebuggingArgs);
main(args) => runIsolateTests(
args,
tests,
pause_on_unhandled_exceptions: true,
testeeConcurrent: testeeMain,
extraArgs: extraDebuggingArgs,
// TODO(bkonyi): causes ASSERT in debug mode, unrelated to DDS.
// See https://github.com/dart-lang/sdk/issues/41379.
enableDds: false,
);

View file

@ -0,0 +1,58 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
Future streamCancel(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamCancel',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Check double subscription fails.
(VM vm) async {
await streamListen(vm, '_Echo');
try {
await streamListen(vm, '_Echo');
fail('Subscribed to stream twice');
} on ServerRpcException catch (e) {
expect(e.message, 'Stream already subscribed');
}
},
// Check double cancellation fails.
(VM vm) async {
await streamCancel(vm, '_Echo');
try {
await streamCancel(vm, '_Echo');
fail('Double cancellation of stream successful');
} on ServerRpcException catch (e) {
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -7,6 +7,7 @@ library test_helper;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'service_test_common.dart';
@ -97,6 +98,8 @@ class _ServiceTesteeRunner {
class _ServiceTesteeLauncher {
Process process;
final List<String> args;
Future<void> get exited => _processCompleter.future;
final _processCompleter = Completer<void>();
bool killedByTester = false;
_ServiceTesteeLauncher() : args = [Platform.script.toFilePath()] {}
@ -286,6 +289,7 @@ class _ServiceTesteeLauncher {
throw "Testee exited with $exitCode";
}
print("** Process exited");
_processCompleter.complete();
});
// Wait for the blank line which signals that we're ready to run.
@ -295,9 +299,16 @@ class _ServiceTesteeLauncher {
}
final content = await serviceInfoFile.readAsString();
final infoJson = json.decode(content);
uri = Uri.parse(infoJson['uri']);
String rawUri = infoJson['uri'];
// If rawUri ends with a /, Uri.parse will include an empty string as the
// last path segment. Make sure it's not there to ensure we have a
// consistent Uri.
if (rawUri.endsWith('/')) {
rawUri = rawUri.substring(0, rawUri.length - 1);
}
uri = Uri.parse(rawUri);
completer.complete(uri);
print('** Signaled to run test queries on $uri');
});
return completer.future;
}
@ -314,7 +325,7 @@ class _ServiceTesteeLauncher {
void setupAddresses(Uri serverAddress) {
serviceWebsocketAddress =
'ws://${serverAddress.authority}${serverAddress.path}ws';
'ws://${serverAddress.authority}${serverAddress.path}/ws';
serviceHttpAddress = 'http://${serverAddress.authority}${serverAddress.path}';
}
@ -331,78 +342,110 @@ class _ServiceTesterRunner {
bool pause_on_unhandled_exceptions: false,
bool enable_service_port_fallback: false,
bool testeeControlsServer: false,
bool enableDds: true,
int port = 0,
}) {
if (executableArgs == null) {
executableArgs = Platform.executableArguments;
}
final process = new _ServiceTesteeLauncher();
final name = Platform.script.pathSegments.last;
WebSocketVM vm;
setUp(() async {
await process
.launch(
pause_on_start,
pause_on_exit,
pause_on_unhandled_exceptions,
enable_service_port_fallback,
testeeControlsServer,
port,
extraArgs,
executableArgs)
.then((Uri serverAddress) async {
if (mainArgs.contains("--gdb")) {
final pid = process.process.pid;
final wait = new Duration(seconds: 10);
print("Testee has pid $pid, waiting $wait before continuing");
sleep(wait);
}
setupAddresses(serverAddress);
vm = new WebSocketVM(new WebSocketVMTarget(serviceWebsocketAddress));
print('Loading VM...');
await vm.load();
print('Done loading VM');
});
});
test(
name,
() async {
// Run vm tests.
if (vmTests != null) {
int testIndex = 1;
final totalTests = vmTests.length;
for (var test in vmTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(vm);
runTest(String name, {bool enableDds: false}) {
test(
'$name (${enableDds ? 'DDS' : 'VM Service'})',
() async {
final process = new _ServiceTesteeLauncher();
bool testsDone = false;
try {
DartDevelopmentService dds;
WebSocketVM vm;
await process
.launch(
pause_on_start,
pause_on_exit,
pause_on_unhandled_exceptions,
enable_service_port_fallback,
testeeControlsServer,
port,
extraArgs,
executableArgs)
.then((Uri serverAddress) async {
if (mainArgs.contains("--gdb")) {
final pid = process.process.pid;
final wait = new Duration(seconds: 10);
print("Testee has pid $pid, waiting $wait before continuing");
sleep(wait);
}
if (enableDds) {
dds = await DartDevelopmentService.startDartDevelopmentService(
serverAddress);
setupAddresses(dds.uri);
} else {
setupAddresses(serverAddress);
}
print('** Signaled to run test queries on $serviceHttpAddress'
' (${enableDds ? "DDS" : "VM Service"})');
vm = new WebSocketVM(
new WebSocketVMTarget(serviceWebsocketAddress));
print('Loading VM...');
await vm.load();
print('Done loading VM');
});
// Run vm tests.
if (vmTests != null) {
int testIndex = 1;
final totalTests = vmTests.length;
for (var test in vmTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(vm);
}
}
// Run isolate tests.
if (isolateTests != null) {
final isolate = await getFirstIsolate(vm);
int testIndex = 1;
final totalTests = isolateTests.length;
for (var test in isolateTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(isolate);
}
}
print('All service tests completed successfully.');
testsDone = true;
if (enableDds) {
await dds.shutdown();
}
process.requestExit();
// Wait for the process to exit so we don't have interleaved
// logging between DDS and VM Service runs.
await process.exited;
} catch (error, stackTrace) {
if (testsDone) {
print('Ignoring late exception during process exit:\n'
'$error\n$stackTrace');
} else {
rethrow;
}
}
}
},
// Some service tests run fairly long (e.g., valid_source_locations_test).
timeout: Timeout.none,
);
}
// Run isolate tests.
if (isolateTests != null) {
final isolate = await getFirstIsolate(vm);
int testIndex = 1;
final totalTests = isolateTests.length;
for (var test in isolateTests) {
vm.verbose = verbose_vm;
print('Running $name [$testIndex/$totalTests]');
testIndex++;
await test(isolate);
}
}
},
retry: 0,
// Some service tests run fairly long (e.g., valid_source_locations_test).
timeout: Timeout.none,
);
tearDown(() {
print('All service tests completed successfully.');
process.requestExit();
});
runTest(name);
if (enableDds) {
runTest(name, enableDds: true);
}
}
Future<Isolate> getFirstIsolate(WebSocketVM vm) async {
@ -460,6 +503,7 @@ Future runIsolateTests(List<String> mainArgs, List<IsolateTest> tests,
bool verbose_vm: false,
bool pause_on_unhandled_exceptions: false,
bool testeeControlsServer: false,
bool enableDds: true,
List<String> extraArgs}) async {
assert(!pause_on_start || testeeBefore == null);
if (_isTestee()) {
@ -477,7 +521,8 @@ Future runIsolateTests(List<String> mainArgs, List<IsolateTest> tests,
pause_on_exit: pause_on_exit,
verbose_vm: verbose_vm,
pause_on_unhandled_exceptions: pause_on_unhandled_exceptions,
testeeControlsServer: testeeControlsServer);
testeeControlsServer: testeeControlsServer,
enableDds: enableDds);
}
}
@ -529,6 +574,7 @@ Future runVMTests(List<String> mainArgs, List<VMTest> tests,
bool verbose_vm: false,
bool pause_on_unhandled_exceptions: false,
bool enable_service_port_fallback: false,
bool enableDds: true,
int port = 0,
List<String> extraArgs,
List<String> executableArgs}) async {
@ -549,6 +595,7 @@ Future runVMTests(List<String> mainArgs, List<VMTest> tests,
verbose_vm: verbose_vm,
pause_on_unhandled_exceptions: pause_on_unhandled_exceptions,
enable_service_port_fallback: enable_service_port_fallback,
enableDds: enableDds,
port: port,
);
}

View file

@ -9,10 +9,14 @@ import 'package:test/test.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
primeDartTimeline() {
primeDartTimeline() async {
while (true) {
Timeline.startSync('apple');
Timeline.finishSync();
// Give the VM a chance to send the timeline events. This test is
// significantly slower if we loop without yielding control after each
// iteration.
await Future.delayed(const Duration(milliseconds: 1));
}
}
@ -22,8 +26,8 @@ List filterEvents(List events, filter) {
return events.where(filter).toList();
}
Completer completer = new Completer();
int eventCount = 0;
Completer completer;
int eventCount;
onTimelineEvent(ServiceEvent event) {
eventCount++;
@ -34,6 +38,11 @@ onTimelineEvent(ServiceEvent event) {
}
var tests = <IsolateTest>[
(Isolate isolate) async {
// Clear global state.
eventCount = 0;
completer = Completer<void>();
},
(Isolate isolate) async {
// Subscribe to the Timeline stream.
await subscribeToStream(isolate.vm, VM.kTimelineStream, onTimelineEvent);

View file

@ -49,7 +49,8 @@ final String serviceAuthToken = _makeAuthToken();
final Map<int, IsolateEmbedderData> isolateEmbedderData =
new Map<int, IsolateEmbedderData>();
// These must be kept in sync with the declarations in vm/json_stream.h.
// These must be kept in sync with the declarations in vm/json_stream.h and
// pkg/dds/lib/src/stream_manager.dart.
const kParseError = -32700;
const kInvalidRequest = -32600;
const kMethodNotFound = -32601;

View file

@ -46,7 +46,8 @@ final serviceAuthToken = _makeAuthToken();
// the cleanup method will be invoked after being removed from the map.
final isolateEmbedderData = <int, IsolateEmbedderData>{};
// These must be kept in sync with the declarations in vm/json_stream.h.
// These must be kept in sync with the declarations in vm/json_stream.h and
// pkg/dds/lib/src/stream_manager.dart.
const kParseError = -32700;
const kInvalidRequest = -32600;
const kMethodNotFound = -32601;

View file

@ -331,6 +331,7 @@
"pkg/dart_internal/",
"pkg/dart2native/",
"pkg/dart2js_tools/",
"pkg/dds/",
"pkg/expect/",
"pkg/front_end/",
"pkg/js/",
@ -417,6 +418,7 @@
"pkg/dart_internal/",
"pkg/dart2native/",
"pkg/dart2js_tools/",
"pkg/dds/",
"pkg/expect/",
"pkg/front_end/",
"pkg/js/",