[ VM Service / DDS ] Add custom service stream support (Revised)

NOTES:
Original CL: https://dart-review.googlesource.com/c/sdk/+/274061
Revert CL: https://dart-review.googlesource.com/c/sdk/+/274802

Regression tests that failed after merging Original CL:
- https://ci.chromium.org/ui/p/dart/builders/ci.sandbox/pkg-mac-release/24046/overview
- https://ci.chromium.org/ui/p/dart/builders/ci.sandbox/pkg-mac-release-arm64/5999/overview

----------
DESCRIPTION:
Setting the `stream` parameter on `developer.postEvent` will now forward those events to a custom stream inside DDS.


The first use of this will be for widget inspection. A navigation event will be posted to a custom stream. Our IDE DAP can listen for the Event and react to it by navigating to the desired location in the code.

TEST=Made sure that regression from original PR could be reproduced, and then resolved by applying this change. Updated observatory tests. Created new developer test to check assertions. Added DDS tests for new custom stream behaviour. Manually tested the postEvent and StreamListen with multiple clients

https://github.com/flutter/devtools/issues/4533

Change-Id: I2b04a84b4daf11dd9d72f899928b5e1f62a5ae02
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/275121
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Dan Chevalier 2022-12-13 16:31:37 +00:00 committed by Commit Queue
parent 010b3dff91
commit fd6fa010ec
14 changed files with 466 additions and 27 deletions

View file

@ -19,7 +19,7 @@ abstract class RpcErrorCodes {
// static const kInvalidRequest = -32600;
static const kMethodNotFound = -32601;
// static const kInvalidParams = -32602;
static const kInvalidParams = -32602;
// static const kInternalError = -32603;
// static const kExtensionError = -32000;

View file

@ -142,6 +142,29 @@ class StreamManager {
final streamId = parameters['streamId'].asString;
final event =
Event.parse(parameters['event'].asMap.cast<String, dynamic>())!;
final destinationStreamId =
event.extensionData?.data[destinationStreamKey];
if (destinationStreamId != null) {
// Strip [destinationStreamKey] from the extension data so it is not
// passed along.
(parameters.value['event']['extensionData'] as Map<String, dynamic>)
.remove(destinationStreamKey);
if (destinationStreamId != kExtensionStream) {
if (!customStreamListenerKeys.contains(destinationStreamId)) {
// __destinationStream is only used by developer.postEvent.
// developer.postEvent is only supposed to postEvents to the
// Extension stream or to custom streams
return;
}
final values = parameters.value;
values['streamId'] = destinationStreamId;
streamNotify(destinationStreamId, values);
return;
}
}
// Forward events from the streams IsolateManager subscribes to.
if (isolateManagerStreams.contains(streamId)) {
@ -192,13 +215,24 @@ class StreamManager {
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
if (includePrivates != null)
'_includePrivateMembers': includePrivates,
});
assert(result['type'] == 'Success');
try {
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
if (includePrivates != null)
'_includePrivateMembers': includePrivates,
});
assert(result['type'] == 'Success');
} on json_rpc.RpcException catch (e) {
if (e.code == RpcErrorCodes.kInvalidParams) {
// catching kInvalid params means that the vmServiceClient
// does not know about the stream we passed. So assume that
// the stream is a custom stream.
customStreamListenerKeys.add(stream);
} else {
rethrow;
}
}
}
}
if (streamListeners[stream]!.contains(client)) {
@ -273,6 +307,15 @@ class StreamManager {
client != null && !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
if (customStreamListenerKeys.contains(stream)) {
streamListeners[stream]!.remove(client);
if (streamListeners[stream]!.isEmpty) {
streamListeners.remove(stream);
}
return;
}
listeners.remove(client);
// Don't cancel streams DDS needs to function.
if (listeners.isEmpty &&
@ -354,6 +397,8 @@ class StreamManager {
static const kStderrStream = 'Stderr';
static const kStdoutStream = 'Stdout';
static const destinationStreamKey = '__destinationStream';
static Map<String, LoggingRepository> loggingRepositories = {};
// Never cancel the Debug or Isolate stream as `IsolateManager` requires
@ -386,6 +431,7 @@ class StreamManager {
final DartDevelopmentServiceImpl dds;
final streamListeners = <String, List<DartDevelopmentServiceClient>>{};
final customStreamListenerKeys = <String>{};
final _profilerUserTagSubscriptions = <String>{};
final _streamSubscriptionMutex = Mutex();
final _profilerUserTagSubscriptionsMutex = Mutex();

View file

@ -0,0 +1,14 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:developer';
final eventKind = 'customEventKindTest';
final eventData = {'part1': 1, 'part2': '2'};
final customStreamId = 'a-custom-stream-id';
void main() {
postEvent('customEventKindTest', eventData, stream: customStreamId);
postEvent('customEventKindTest', eventData, stream: 'Extension');
}

View file

@ -0,0 +1,151 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';
import 'post_event_custom_stream_script.dart' as script;
void main() {
late Process process;
late DartDevelopmentService dds;
setUp(() async {
process = await spawnDartProcess(
'post_event_custom_stream_script.dart',
);
});
tearDown(() async {
await dds.shutdown();
process.kill();
});
Future<IsolateRef> getIsolate(VmService service) async {
IsolateRef isolate;
while (true) {
final vm = await service.getVM();
if (vm.isolates!.isNotEmpty) {
isolate = vm.isolates!.first;
break;
}
await Future.delayed(const Duration(milliseconds: 50));
}
return isolate;
}
test('sends a postEvent over a custom stream to multiple listeners',
() async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final service2 = await vmServiceConnectUri(dds.wsUri.toString());
final completer1 = Completer<Event>();
final completer2 = Completer<Event>();
final isolateId = (await getIsolate(service1)).id!;
await service1.streamListen(script.customStreamId);
service1.onEvent(script.customStreamId).listen((event) {
completer1.complete(event);
});
await service2.streamListen(script.customStreamId);
service2.onEvent(script.customStreamId).listen((event) {
completer2.complete(event);
});
await service1.resume(isolateId);
final event1 = await completer1.future;
final event2 = await completer2.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
expect(event2.extensionKind, equals(script.eventKind));
expect(event2.extensionData?.data, equals(script.eventData));
});
test('can cancel custom stream listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
(await getIsolate(service1)).id!;
await service1.streamListen(script.customStreamId);
// We should be able to cancel
await service1.streamCancel(script.customStreamId);
try {
await service1.streamCancel(script.customStreamId);
fail('Re-Canceling the custom stream should have failed');
} on RPCError catch (e) {
expect(
e.message,
'Stream not subscribed',
);
}
});
test('canceling a custom stream does not cancel other listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final isolateId = (await getIsolate(service1)).id!;
final extensionCompleter = Completer<Event>();
await service1.streamListen(script.customStreamId);
await service1.streamListen('Extension');
service1.onEvent('Extension').listen((event) {
extensionCompleter.complete(event);
});
await service1.streamCancel(script.customStreamId);
await service1.resume(isolateId);
final event1 = await extensionCompleter.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
});
test('Canceling a normal stream does not cancel custom listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final isolateId = (await getIsolate(service1)).id!;
final customStreamCompleter = Completer<Event>();
await service1.streamListen(script.customStreamId);
await service1.streamListen('Extension');
service1.onEvent(script.customStreamId).listen((event) {
customStreamCompleter.complete(event);
});
await service1.streamCancel('Extension');
await service1.resume(isolateId);
final event1 = await customStreamCompleter.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
});
}

View file

@ -0,0 +1,35 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the VM Service throws an exception when encountering a custom
// stream.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: true,
enableDds: false,
);

View file

@ -0,0 +1,34 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the DDS allows for listening to a custom stream
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} catch (e) {
fail('Unable to subscribe to a custom stream: $e');
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: false,
enableDds: true,
);

View file

@ -44,15 +44,6 @@ var tests = <VMTest>[
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -0,0 +1,35 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the VM Service throws an exception when encountering a custom
// stream.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: true,
enableDds: false,
);

View file

@ -0,0 +1,34 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the DDS allows for listening to a custom stream
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} catch (e) {
fail('Unable to subscribe to a custom stream: $e');
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: false,
enableDds: true,
);

View file

@ -44,15 +44,6 @@ var tests = <VMTest>[
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -409,6 +409,8 @@ StreamInfo Service::profiler_stream("Profiler");
const uint8_t* Service::dart_library_kernel_ = NULL;
intptr_t Service::dart_library_kernel_len_ = 0;
// Keep streams_ in sync with the protected streams in
// lib/developer/extension.dart
static StreamInfo* const streams_[] = {
&Service::vm_stream, &Service::isolate_stream,
&Service::debug_stream, &Service::gc_stream,

View file

@ -146,13 +146,41 @@ external bool get extensionStreamHasListener;
/// event stream.
///
/// If [extensionStreamHasListener] is false, this method is a no-op.
void postEvent(String eventKind, Map eventData) {
/// Override [stream] to set the destination stream that the event should be
/// posted to. The [stream] may not start with an underscore or be a core VM
/// Service stream.
void postEvent(String eventKind, Map eventData, {String stream = 'Extension'}) {
const destinationStreamKey = '__destinationStream';
// Keep protected streams in sync with `streams_` in runtime/vm/service.cc
// `Extension` is the only stream that should not be protected here.
final protectedStreams = <String>[
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
];
if (protectedStreams.contains(stream)) {
throw ArgumentError.value(
stream, 'stream', 'Cannot be a protected stream.');
} else if (stream.startsWith('_')) {
throw ArgumentError.value(
stream, 'stream', 'Cannot start with and underscore.');
}
if (!extensionStreamHasListener) {
return;
}
// TODO: When NNBD is complete, delete the following two lines.
checkNotNullable(eventKind, 'eventKind');
checkNotNullable(eventData, 'eventData');
checkNotNullable(stream, 'stream');
eventData[destinationStreamKey] = stream;
String eventDataAsString = json.encode(eventData);
_postEvent(eventKind, eventDataAsString);
}

View file

@ -0,0 +1,39 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:convert';
import 'dart:async';
import 'dart:developer';
import 'package:expect/expect.dart';
final protectedStreams = [
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
'_aStreamThatStartsWithAnUnderScore'
];
main() {
for (final protectedStream in protectedStreams) {
Expect.throws(
() {
postEvent('theEvent', {'the': 'data'}, stream: protectedStream);
},
(_) => true,
'Should not allow posting to $protectedStream protected stream',
);
}
// The Extension stream in not protected so calling this should not fail
postEvent('theEvent', {'the': 'data'}, stream: 'Extension');
// Should be allowed to post to a non-protecvted custom stream
postEvent('theEvent', {'the': 'data'}, stream: 'someCustomStream');
}

View file

@ -0,0 +1,39 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:convert';
import 'dart:async';
import 'dart:developer';
import 'package:expect/expect.dart';
final protectedStreams = [
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
'_aStreamThatStartsWithAnUnderScore'
];
main() {
for (final protectedStream in protectedStreams) {
Expect.throws(
() {
postEvent('theEvent', {'the': 'data'}, stream: protectedStream);
},
(_) => true,
'Should not allow posting to $protectedStream protected stream',
);
}
// The Extension stream in not protected so calling this should not fail
postEvent('theEvent', {'the': 'data'}, stream: 'Extension');
// Should be allowed to post to a non-protecvted custom stream
postEvent('theEvent', {'the': 'data'}, stream: 'someCustomStream');
}