mirror of
https://github.com/dart-lang/sdk
synced 2024-09-15 23:39:48 +00:00
d5cabea15f
Fixes https://github.com/dart-lang/sdk/issues/49184 Change-Id: Ibebab647fd8479e3a33aa0b71ca267c37e6553be Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/247362 Reviewed-by: Devon Carew <devoncarew@google.com> Commit-Queue: Ben Konyi <bkonyi@google.com>
561 lines
19 KiB
Dart
561 lines
19 KiB
Dart
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
|
|
// for details. All rights reserved. Use of this source code is governed by a
|
|
// BSD-style license that can be found in the LICENSE file.
|
|
|
|
@TestOn('vm')
|
|
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:async/async.dart';
|
|
import 'package:mockito/mockito.dart';
|
|
import 'package:test/test.dart';
|
|
import 'package:vm_service/vm_service.dart';
|
|
|
|
void main() {
|
|
late MockVmService serviceMock;
|
|
late StreamController<Map<String, Object>> requestsController;
|
|
late StreamController<Map<String, Object?>> responsesController;
|
|
late ServiceExtensionRegistry serviceRegistry;
|
|
|
|
setUp(() {
|
|
serviceMock = MockVmService();
|
|
requestsController = StreamController<Map<String, Object>>();
|
|
responsesController = StreamController<Map<String, Object?>>();
|
|
serviceRegistry = ServiceExtensionRegistry();
|
|
VmServerConnection(requestsController.stream, responsesController.sink,
|
|
serviceRegistry, serviceMock);
|
|
});
|
|
|
|
tearDown(() {
|
|
requestsController.close();
|
|
responsesController.close();
|
|
});
|
|
|
|
group('method delegation', () {
|
|
test('works for simple methods', () {
|
|
var request = rpcRequest("getVersion");
|
|
var version = Version(major: 1, minor: 0);
|
|
when(serviceMock.getVersion()).thenAnswer((_) => Future.value(version));
|
|
expect(responsesController.stream, emits(rpcResponse(version)));
|
|
requestsController.add(request);
|
|
});
|
|
test('works for methods with parameters', () {
|
|
var isolate = Isolate(
|
|
name: 'isolate',
|
|
exceptionPauseMode: ExceptionPauseMode.kNone,
|
|
id: '123',
|
|
number: '0',
|
|
startTime: 1,
|
|
runnable: true,
|
|
livePorts: 2,
|
|
isolateFlags: [],
|
|
pauseOnExit: false,
|
|
pauseEvent: Event(
|
|
kind: EventKind.kResume,
|
|
timestamp: 3,
|
|
),
|
|
libraries: [],
|
|
breakpoints: [],
|
|
isSystemIsolate: false,
|
|
);
|
|
var request =
|
|
rpcRequest("getIsolate", params: {'isolateId': isolate.id!});
|
|
when(serviceMock.getIsolate(isolate.id!))
|
|
.thenAnswer((Invocation invocation) {
|
|
expect(invocation.positionalArguments, equals([isolate.id]));
|
|
return Future.value(isolate);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(isolate)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
test('works for methods with list parameters', () {
|
|
var isolate = Isolate(
|
|
name: 'isolate',
|
|
exceptionPauseMode: ExceptionPauseMode.kNone,
|
|
id: '123',
|
|
number: '0',
|
|
startTime: 1,
|
|
runnable: true,
|
|
livePorts: 2,
|
|
isolateFlags: [],
|
|
pauseOnExit: false,
|
|
pauseEvent: Event(
|
|
kind: EventKind.kResume,
|
|
timestamp: 3,
|
|
),
|
|
libraries: [],
|
|
breakpoints: [],
|
|
isSystemIsolate: false,
|
|
);
|
|
var request = rpcRequest("setVMTimelineFlags", params: {
|
|
'isolateId': isolate.id!,
|
|
// Note: the dynamic list below is intentional in order to exercise the
|
|
// code under test.
|
|
'recordedStreams': <dynamic>['GC', 'Dart', 'Embedder'],
|
|
});
|
|
var response = Success();
|
|
when(serviceMock.getIsolate(isolate.id!))
|
|
.thenAnswer((Invocation invocation) {
|
|
expect(invocation.namedArguments,
|
|
equals({Symbol('isolateId'): null, Symbol('args'): null}));
|
|
return Future.value(isolate);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(response)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
group('custom service extensions', () {
|
|
test('with no params or isolateId', () {
|
|
var extension = 'ext.cool';
|
|
var request = rpcRequest(extension, params: null);
|
|
var response = Response()..json = {"hello": "world"};
|
|
when(serviceMock.callServiceExtension(
|
|
extension,
|
|
isolateId: argThat(isNull, named: 'isolateId'),
|
|
args: argThat(isNull, named: 'args'),
|
|
)).thenAnswer((Invocation invocation) {
|
|
expect(invocation.namedArguments,
|
|
equals({Symbol('isolateId'): null, Symbol('args'): null}));
|
|
return Future.value(response);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(response)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
test('with isolateId and no other params', () {
|
|
var extension = 'ext.cool';
|
|
var request = rpcRequest(extension, params: {'isolateId': '1'});
|
|
var response = Response()..json = {"hello": "world"};
|
|
when(serviceMock.callServiceExtension(
|
|
extension,
|
|
isolateId: argThat(equals('1'), named: 'isolateId'),
|
|
args: argThat(equals({}), named: 'args'),
|
|
)).thenAnswer((Invocation invocation) {
|
|
expect(invocation.namedArguments,
|
|
equals({Symbol('isolateId'): '1', Symbol('args'): {}}));
|
|
return Future.value(response);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(response)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
test('with params and no isolateId', () {
|
|
var extension = 'ext.cool';
|
|
var params = {'cool': 'option'};
|
|
var request = rpcRequest(extension, params: params);
|
|
var response = Response()..json = {"hello": "world"};
|
|
when(serviceMock.callServiceExtension(
|
|
extension,
|
|
isolateId: argThat(isNull, named: 'isolateId'),
|
|
args: argThat(equals(params), named: 'args'),
|
|
)).thenAnswer((Invocation invocation) {
|
|
expect(invocation.namedArguments,
|
|
equals({Symbol('isolateId'): null, Symbol('args'): params}));
|
|
return Future.value(response);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(response)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
test('with params and isolateId', () {
|
|
var extension = 'ext.cool';
|
|
var params = {'cool': 'option'};
|
|
var request =
|
|
rpcRequest(extension, params: Map.of(params)..['isolateId'] = '1');
|
|
var response = Response()..json = {"hello": "world"};
|
|
when(serviceMock.callServiceExtension(
|
|
extension,
|
|
isolateId: argThat(equals("1"), named: 'isolateId'),
|
|
args: argThat(equals(params), named: 'args'),
|
|
)).thenAnswer((Invocation invocation) {
|
|
expect(invocation.namedArguments,
|
|
equals({Symbol('isolateId'): '1', Symbol('args'): params}));
|
|
return Future.value(response);
|
|
});
|
|
expect(responsesController.stream, emits(rpcResponse(response)));
|
|
requestsController.add(request);
|
|
});
|
|
});
|
|
});
|
|
|
|
group('error handling', () {
|
|
test('special cases RPCError instances', () {
|
|
var request = rpcRequest("getVersion");
|
|
var error =
|
|
RPCError('getVersion', 1234, 'custom message', {'custom': 'data'});
|
|
when(serviceMock.getVersion()).thenAnswer((_) => Future.error(error));
|
|
expect(responsesController.stream, emits(rpcErrorResponse(error)));
|
|
requestsController.add(request);
|
|
});
|
|
|
|
test('has a fallback for generic exceptions', () {
|
|
var request = rpcRequest("getVersion");
|
|
var error = UnimplementedError();
|
|
when(serviceMock.getVersion()).thenAnswer((_) => Future.error(error));
|
|
expect(
|
|
responsesController.stream.map((response) => '$response'),
|
|
emits(startsWith(
|
|
'{jsonrpc: 2.0, id: 1, error: {code: -32603, message: getVersion: UnimplementedError')));
|
|
requestsController.add(request);
|
|
});
|
|
});
|
|
|
|
group('streams', () {
|
|
test('can be listened to and canceled', () async {
|
|
var streamId = 'Isolate';
|
|
var responseQueue = StreamQueue(responsesController.stream);
|
|
StreamController<Event> eventController;
|
|
{
|
|
var request =
|
|
rpcRequest('streamListen', params: {'streamId': streamId});
|
|
var response = Success();
|
|
when(serviceMock.streamListen(streamId))
|
|
.thenAnswer((_) => Future.value(response));
|
|
requestsController.add(request);
|
|
await expectLater(responseQueue, emitsThrough(rpcResponse(response)));
|
|
|
|
eventController = serviceMock.streamControllers[streamId]!;
|
|
|
|
var events = [
|
|
Event(
|
|
kind: EventKind.kIsolateStart,
|
|
timestamp: 0,
|
|
),
|
|
Event(
|
|
kind: EventKind.kIsolateExit,
|
|
timestamp: 1,
|
|
)
|
|
];
|
|
events.forEach(eventController.add);
|
|
await expectLater(
|
|
responseQueue,
|
|
emitsInOrder(
|
|
events.map((event) => streamNotifyResponse(streamId, event))));
|
|
}
|
|
{
|
|
var request =
|
|
rpcRequest('streamCancel', params: {'streamId': streamId});
|
|
var response = Success();
|
|
when(serviceMock.streamListen(streamId))
|
|
.thenAnswer((_) => Future.value(response));
|
|
requestsController.add(request);
|
|
await expectLater(responseQueue, emitsThrough(rpcResponse(response)));
|
|
|
|
var nextEvent = Event(
|
|
kind: EventKind.kIsolateReload,
|
|
timestamp: 2,
|
|
);
|
|
eventController.add(nextEvent);
|
|
expect(responseQueue,
|
|
neverEmits(streamNotifyResponse(streamId, nextEvent)));
|
|
|
|
await pumpEventQueue();
|
|
await eventController.close();
|
|
await responsesController.close();
|
|
}
|
|
});
|
|
test("can't be listened to twice", () {
|
|
var streamId = 'Isolate';
|
|
var responseQueue = StreamQueue(responsesController.stream);
|
|
{
|
|
var request =
|
|
rpcRequest('streamListen', params: {'streamId': streamId});
|
|
var response = Success();
|
|
when(serviceMock.streamListen(streamId))
|
|
.thenAnswer((_) => Future.value(response));
|
|
requestsController.add(request);
|
|
expect(responseQueue, emitsThrough(rpcResponse(response)));
|
|
}
|
|
{
|
|
var request =
|
|
rpcRequest('streamListen', params: {'streamId': streamId});
|
|
var response = Success();
|
|
when(serviceMock.streamListen(streamId))
|
|
.thenAnswer((_) => Future.value(response));
|
|
requestsController.add(request);
|
|
expect(
|
|
responseQueue,
|
|
emitsThrough(rpcErrorResponse(
|
|
RPCError('streamSubcribe', 103, 'Stream already subscribed', {
|
|
'details': "The stream '$streamId' is already subscribed",
|
|
}))));
|
|
}
|
|
});
|
|
|
|
test("can't cancel a stream that isn't being listened to", () {
|
|
var streamId = 'Isolate';
|
|
var responseQueue = StreamQueue(responsesController.stream);
|
|
|
|
var request = rpcRequest('streamCancel', params: {'streamId': streamId});
|
|
var response = Success();
|
|
when(serviceMock.streamListen(streamId))
|
|
.thenAnswer((_) => Future.value(response));
|
|
requestsController.add(request);
|
|
expect(
|
|
responseQueue,
|
|
emitsThrough(rpcErrorResponse(
|
|
RPCError('streamCancel', 104, 'Stream not subscribed', {
|
|
'details': "The stream '$streamId' is not subscribed",
|
|
}))));
|
|
});
|
|
|
|
group('Service', () {
|
|
final serviceStream = 'Service';
|
|
|
|
test('gives register and unregister events', () async {
|
|
var serviceId = 'ext.test.service';
|
|
var serviceRegisteredEvent = streamNotifyResponse(
|
|
serviceStream,
|
|
Event(
|
|
kind: EventKind.kServiceRegistered,
|
|
timestamp: 0,
|
|
method: serviceId,
|
|
service: serviceId,
|
|
),
|
|
);
|
|
var serviceUnRegisteredEvent = streamNotifyResponse(
|
|
serviceStream,
|
|
Event(
|
|
kind: EventKind.kServiceUnregistered,
|
|
timestamp: 0,
|
|
method: serviceId,
|
|
service: serviceId,
|
|
),
|
|
);
|
|
|
|
requestsController.add(
|
|
rpcRequest('streamListen', params: {'streamId': serviceStream}));
|
|
requestsController
|
|
.add(rpcRequest('registerService', params: {'service': serviceId}));
|
|
await expectLater(
|
|
responsesController.stream
|
|
.map((Map response) => stripEventTimestamp(response)),
|
|
emitsThrough(serviceRegisteredEvent));
|
|
|
|
// Connect another client to get the previous register events and the
|
|
// unregister event.
|
|
var requestsController2 = StreamController<Map<String, Object>>();
|
|
var responsesController2 = StreamController<Map<String, Object?>>();
|
|
addTearDown(() {
|
|
requestsController2.close();
|
|
responsesController2.close();
|
|
});
|
|
|
|
VmServerConnection(
|
|
requestsController2.stream,
|
|
responsesController2.sink,
|
|
serviceRegistry,
|
|
VmService(Stream.empty(), (String _) => null),
|
|
);
|
|
|
|
expect(
|
|
responsesController2.stream
|
|
.map((Map response) => stripEventTimestamp(response)),
|
|
emitsThrough(emitsInOrder(
|
|
[serviceRegisteredEvent, serviceUnRegisteredEvent])));
|
|
|
|
// Should get the previously registered extension event, as well as
|
|
// the unregister event when the client disconnects.
|
|
requestsController2.add(
|
|
rpcRequest('streamListen', params: {'streamId': serviceStream}));
|
|
// Need to give the client a chance to subscribe.
|
|
await pumpEventQueue();
|
|
unawaited(requestsController.close());
|
|
// Give the old client a chance to shut down
|
|
await pumpEventQueue();
|
|
|
|
// Connect yet another client, it should get zero registration or
|
|
// unregistration events.
|
|
var requestsController3 = StreamController<Map<String, Object>>();
|
|
var responsesController3 = StreamController<Map<String, Object?>>();
|
|
|
|
VmServerConnection(
|
|
requestsController3.stream,
|
|
responsesController3.sink,
|
|
serviceRegistry,
|
|
VmService(Stream.empty(), (String _) => null),
|
|
);
|
|
expect(
|
|
responsesController3.stream,
|
|
neverEmits(
|
|
anyOf(serviceRegisteredEvent, serviceUnRegisteredEvent)));
|
|
// Give it a chance to deliver events.
|
|
await pumpEventQueue();
|
|
// Disconnect the client so the test can shut down.
|
|
unawaited(requestsController3.close());
|
|
unawaited(responsesController3.close());
|
|
});
|
|
});
|
|
});
|
|
|
|
group('registerService', () {
|
|
test('can delegate requests between clients', () async {
|
|
var serviceId = 'ext.test.service';
|
|
var responseQueue = StreamQueue(responsesController.stream);
|
|
|
|
var clientInputController =
|
|
StreamController<Map<String, Object?>>.broadcast();
|
|
var clientOutputController =
|
|
StreamController<Map<String, Object>>.broadcast();
|
|
var client = VmService(
|
|
clientInputController.stream.map(jsonEncode),
|
|
(String message) => clientOutputController
|
|
.add(jsonDecode(message).cast<String, Object>()),
|
|
disposeHandler: () async {
|
|
await clientInputController.close();
|
|
await clientOutputController.close();
|
|
});
|
|
|
|
var clientConnection = VmServerConnection(clientOutputController.stream,
|
|
clientInputController.sink, serviceRegistry, serviceMock);
|
|
|
|
var requestParams = {'foo': 'bar'};
|
|
var expectedResponse = Response()..json = {'zap': 'zip'};
|
|
await client.registerService(serviceId, 'service');
|
|
// Duplicate registrations should fail.
|
|
expect(client.registerService(serviceId, 'service'),
|
|
throwsA(const TypeMatcher<RPCError>()));
|
|
|
|
client.registerServiceCallback(serviceId, (request) async {
|
|
expect(request, equals(requestParams));
|
|
return {'result': expectedResponse.toJson()};
|
|
});
|
|
|
|
var serviceRequest = rpcRequest(serviceId, params: requestParams);
|
|
|
|
requestsController.add(serviceRequest);
|
|
expect(await responseQueue.next, rpcResponse(expectedResponse));
|
|
|
|
// Kill the client that registered the handler, it should now fall back
|
|
// on `callServiceExtension`.
|
|
await client.dispose();
|
|
// This should complete as well.
|
|
await clientConnection.done;
|
|
|
|
var mockResponse = Response()..json = {'mock': 'response'};
|
|
when(serviceMock.callServiceExtension(serviceId,
|
|
args: argThat(equals(requestParams), named: 'args'),
|
|
isolateId: argThat(isNull, named: 'isolateId')))
|
|
.thenAnswer((_) async => mockResponse);
|
|
|
|
requestsController.add(serviceRequest);
|
|
expect(await responseQueue.next, rpcResponse(mockResponse));
|
|
});
|
|
});
|
|
}
|
|
|
|
Map<String, Object> rpcRequest(String method,
|
|
{Map<String, Object>? params = const {}, String id = "1"}) =>
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": method,
|
|
if (params != null) "params": params,
|
|
"id": id,
|
|
};
|
|
|
|
Map<String, Object> rpcResponse(Response response, {String id = "1"}) => {
|
|
'jsonrpc': '2.0',
|
|
'id': id,
|
|
'result': response.toJson(),
|
|
};
|
|
|
|
Map<String, Object> rpcErrorResponse(Object error, {String id = "1"}) {
|
|
Map<String, Object> errorJson;
|
|
if (error is RPCError) {
|
|
errorJson = {
|
|
'code': error.code,
|
|
'message': error.message,
|
|
};
|
|
if (error.data != null) {
|
|
errorJson['data'] = error.data!;
|
|
}
|
|
} else {
|
|
errorJson = {
|
|
'code': -32603,
|
|
'message': error.toString(),
|
|
};
|
|
}
|
|
return {
|
|
'jsonrpc': '2.0',
|
|
'id': id,
|
|
'error': errorJson,
|
|
};
|
|
}
|
|
|
|
Map<String, Object> streamNotifyResponse(String streamId, Event event) {
|
|
return {
|
|
'jsonrpc': '2.0',
|
|
'method': 'streamNotify',
|
|
'params': {
|
|
'streamId': '$streamId',
|
|
'event': event.toJson(),
|
|
},
|
|
};
|
|
}
|
|
|
|
Map<String, Object?> stripEventTimestamp(Map response) {
|
|
if (response.containsKey('params') &&
|
|
response['params'].containsKey('event')) {
|
|
response['params']['event']['timestamp'] = 0;
|
|
}
|
|
return response as Map<String, Object?>;
|
|
}
|
|
|
|
class MockVmService extends Mock implements VmServiceInterface {
|
|
final streamControllers = <String, StreamController<Event>>{};
|
|
|
|
@override
|
|
Future<Version> getVersion() {
|
|
return super.noSuchMethod(Invocation.method(#getVersion, []),
|
|
returnValue: Future.value(Version(major: 0, minor: 0)));
|
|
}
|
|
|
|
@override
|
|
Future<Isolate> getIsolate(String isolateId) {
|
|
return super.noSuchMethod(Invocation.method(#getIsolate, [isolateId]),
|
|
returnValue: Future.value(Isolate(
|
|
id: null,
|
|
number: null,
|
|
name: null,
|
|
isSystemIsolate: null,
|
|
isolateFlags: null,
|
|
startTime: null,
|
|
runnable: null,
|
|
livePorts: null,
|
|
pauseOnExit: null,
|
|
pauseEvent: null,
|
|
libraries: null,
|
|
breakpoints: null,
|
|
exceptionPauseMode: null)));
|
|
}
|
|
|
|
@override
|
|
Future<Response> callServiceExtension(
|
|
String method, {
|
|
String? isolateId,
|
|
Map<String, dynamic>? args,
|
|
}) {
|
|
return super.noSuchMethod(
|
|
Invocation.method(#callServiceExtension, [method],
|
|
{#isolateId: isolateId, #args: args}),
|
|
returnValue: Future.value(Response()));
|
|
}
|
|
|
|
@override
|
|
Future<Success> streamListen(String streamId) {
|
|
return super.noSuchMethod(Invocation.method(#streamListen, [streamId]),
|
|
returnValue: Future.value(Success()));
|
|
}
|
|
|
|
@override
|
|
Stream<Event> onEvent(String streamId) => streamControllers
|
|
.putIfAbsent(streamId, () => StreamController<Event>())
|
|
.stream;
|
|
|
|
@override
|
|
Future<Success> setVMTimelineFlags(List<String> recordedStreams) async {
|
|
return Success();
|
|
}
|
|
}
|