Revert "[ VM Service / DDS ] Add custom service stream support"

This reverts commit 4981cbffe2.

Reason for revert: Mac regression test failed.

Original change's description:
> [ VM Service / DDS ] Add custom service stream support
>
> Setting the `stream` parameter on `developer.postEvent` will now forward those events to a custom stream inside DDS.
>
>
> The first use of this will be for widget inspection. A navigation event will be posted to a custom stream. Our IDE DAP can listen for the Event and react to it by navigating to the desired location in the code.
>
> TEST=Updated observatory tests. Created new developer test to check assertions. Added DDS tests for new custom stream behaviour. Manually tested the postEvent and StreamListen with multiple clients
>
> https://github.com/flutter/devtools/issues/4533
>
> Change-Id: I870dc634c9a9a7d2ee3a6605319c2a18517ad197
> Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/274061
> Reviewed-by: Ben Konyi <bkonyi@google.com>
> Commit-Queue: Dan Chevalier <danchevalier@google.com>

TBR=bkonyi@google.com,dart-scoped@luci-project-accounts.iam.gserviceaccount.com,danchevalier@google.com

Change-Id: Ia1dce25444a6329c0553c931c9a6dbbec65ee583
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/274802
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Daco Harkes <dacoharkes@google.com>
Reviewed-by: Dan Chevalier <danchevalier@google.com>
This commit is contained in:
Dan Chevalier 2022-12-12 11:24:37 +00:00 committed by Commit Queue
parent ef286f6039
commit 3ed65601b0
14 changed files with 34 additions and 488 deletions

View file

@ -19,7 +19,7 @@ abstract class RpcErrorCodes {
// static const kInvalidRequest = -32600;
static const kMethodNotFound = -32601;
static const kInvalidParams = -32602;
// static const kInvalidParams = -32602;
// static const kInternalError = -32603;
// static const kExtensionError = -32000;

View file

@ -29,9 +29,8 @@ class StreamManager {
data, {
DartDevelopmentServiceClient? excludedClient,
}) {
final listeners =
streamListeners[streamId] ?? customStreamListeners[streamId];
if (listeners != null) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId]!;
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (listener == excludedClient) {
@ -143,29 +142,6 @@ class StreamManager {
final streamId = parameters['streamId'].asString;
final event =
Event.parse(parameters['event'].asMap.cast<String, dynamic>())!;
final destinationStreamId =
event.extensionData?.data[destinationStreamKey];
if (destinationStreamId != null) {
// Strip [destinationStreamKey] from the extension data so it is not
// passed along.
(parameters.value['event']['extensionData'] as Map<String, dynamic>)
.remove(destinationStreamKey);
if (destinationStreamId != kExtensionStream) {
if (streamListeners.containsKey(destinationStreamId)) {
// __destinationStream is only used by developer.postEvent.
// developer.postEvent is only supposed to postEvents to the
// Extension stream or to custom streams
return;
}
final values = parameters.value;
values['streamId'] = destinationStreamId;
streamNotify(destinationStreamId, values);
return;
}
}
// Forward events from the streams IsolateManager subscribes to.
if (isolateManagerStreams.contains(streamId)) {
@ -205,46 +181,27 @@ class StreamManager {
() async {
assert(stream.isNotEmpty);
bool streamNewlySubscribed = false;
bool isNewCustomStream = false;
if (!streamListeners.containsKey(stream) &&
!customStreamListeners.containsKey(stream)) {
if (!streamListeners.containsKey(stream)) {
// Initialize the list of clients for the new stream before we do
// anything else to ensure multiple clients registering for the same
// stream in quick succession doesn't result in multiple streamListen
// requests being sent to the VM service.
streamNewlySubscribed = true;
streamListeners[stream] = <DartDevelopmentServiceClient>[];
if ((stream == kDebugStream && client == null) ||
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
try {
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
if (includePrivates != null)
'_includePrivateMembers': includePrivates,
});
assert(result['type'] == 'Success');
} on json_rpc.RpcException catch (e) {
if (e.code == RpcErrorCodes.kInvalidParams) {
// catching kInvalid params means that the vmServiceClient
// does not know about the stream we passed. So assume that
// the stream is a custom stream.
isNewCustomStream = true;
} else {
rethrow;
}
}
}
if (isNewCustomStream) {
customStreamListeners[stream] = <DartDevelopmentServiceClient>[];
} else {
streamListeners[stream] = <DartDevelopmentServiceClient>[];
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
if (includePrivates != null)
'_includePrivateMembers': includePrivates,
});
assert(result['type'] == 'Success');
}
}
if (streamListeners[stream]?.contains(client) == true ||
customStreamListeners[stream]?.contains(client) == true) {
if (streamListeners[stream]!.contains(client)) {
throw kStreamAlreadySubscribedException;
} else if (!streamNewlySubscribed && includePrivates != null) {
try {
@ -262,11 +219,7 @@ class StreamManager {
}
}
if (client != null) {
if (isNewCustomStream || customStreamListeners[stream] != null) {
customStreamListeners[stream]!.add(client);
} else {
streamListeners[stream]!.add(client);
}
streamListeners[stream]!.add(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream]!.sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
@ -315,12 +268,6 @@ class StreamManager {
await _streamSubscriptionMutex.runGuarded(
() async {
assert(stream.isNotEmpty);
final customListeners = customStreamListeners[stream];
if (customListeners != null && customListeners.contains(client)) {
customListeners.remove(client);
return;
}
final listeners = streamListeners[stream];
if (listeners == null ||
client != null && !listeners.contains(client)) {
@ -370,12 +317,7 @@ class StreamManager {
/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(DartDevelopmentServiceClient client) {
final allStreamListenerKeys = <String>[
...streamListeners.keys,
...customStreamListeners.keys,
];
for (final streamId in allStreamListenerKeys) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId).catchError(
(_) => null,
// Ignore 'stream not subscribed' errors and StateErrors which arise
@ -412,8 +354,6 @@ class StreamManager {
static const kStderrStream = 'Stderr';
static const kStdoutStream = 'Stdout';
static const destinationStreamKey = '__destinationStream';
static Map<String, LoggingRepository> loggingRepositories = {};
// Never cancel the Debug or Isolate stream as `IsolateManager` requires
@ -446,7 +386,6 @@ class StreamManager {
final DartDevelopmentServiceImpl dds;
final streamListeners = <String, List<DartDevelopmentServiceClient>>{};
final customStreamListeners = <String, List<DartDevelopmentServiceClient>>{};
final _profilerUserTagSubscriptions = <String>{};
final _streamSubscriptionMutex = Mutex();
final _profilerUserTagSubscriptionsMutex = Mutex();

View file

@ -1,14 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:developer';
final eventKind = 'customEventKindTest';
final eventData = {'part1': 1, 'part2': '2'};
final customStreamId = 'a-custom-stream-id';
void main() {
postEvent('customEventKindTest', eventData, stream: customStreamId);
postEvent('customEventKindTest', eventData, stream: 'Extension');
}

View file

@ -1,151 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';
import 'post_event_custom_stream_script.dart' as script;
void main() {
late Process process;
late DartDevelopmentService dds;
setUp(() async {
process = await spawnDartProcess(
'post_event_custom_stream_script.dart',
);
});
tearDown(() async {
await dds.shutdown();
process.kill();
});
Future<IsolateRef> getIsolate(VmService service) async {
IsolateRef isolate;
while (true) {
final vm = await service.getVM();
if (vm.isolates!.isNotEmpty) {
isolate = vm.isolates!.first;
break;
}
await Future.delayed(const Duration(milliseconds: 50));
}
return isolate;
}
test('sends a postEvent over a custom stream to multiple listeners',
() async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final service2 = await vmServiceConnectUri(dds.wsUri.toString());
final completer1 = Completer<Event>();
final completer2 = Completer<Event>();
final isolateId = (await getIsolate(service1)).id!;
await service1.streamListen(script.customStreamId);
service1.onEvent(script.customStreamId).listen((event) {
completer1.complete(event);
});
await service2.streamListen(script.customStreamId);
service2.onEvent(script.customStreamId).listen((event) {
completer2.complete(event);
});
await service1.resume(isolateId);
final event1 = await completer1.future;
final event2 = await completer2.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
expect(event2.extensionKind, equals(script.eventKind));
expect(event2.extensionData?.data, equals(script.eventData));
});
test('can cancel custom stream listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
(await getIsolate(service1)).id!;
await service1.streamListen(script.customStreamId);
// We should be able to cancel
await service1.streamCancel(script.customStreamId);
try {
await service1.streamCancel(script.customStreamId);
fail('Re-Canceling the custom stream should have failed');
} on RPCError catch (e) {
expect(
e.message,
'Stream not subscribed',
);
}
});
test('canceling a custom stream does not cancel other listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final isolateId = (await getIsolate(service1)).id!;
final extensionCompleter = Completer<Event>();
await service1.streamListen(script.customStreamId);
await service1.streamListen('Extension');
service1.onEvent('Extension').listen((event) {
extensionCompleter.complete(event);
});
await service1.streamCancel(script.customStreamId);
await service1.resume(isolateId);
final event1 = await extensionCompleter.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
});
test('Canceling a normal stream does not cancel custom listeners', () async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service1 = await vmServiceConnectUri(dds.wsUri.toString());
final isolateId = (await getIsolate(service1)).id!;
final customStreamCompleter = Completer<Event>();
await service1.streamListen(script.customStreamId);
await service1.streamListen('Extension');
service1.onEvent(script.customStreamId).listen((event) {
customStreamCompleter.complete(event);
});
await service1.streamCancel('Extension');
await service1.resume(isolateId);
final event1 = await customStreamCompleter.future;
expect(event1.extensionKind, equals(script.eventKind));
expect(event1.extensionData?.data, equals(script.eventData));
});
}

View file

@ -1,35 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the VM Service throws an exception when encountering a custom
// stream.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: true,
enableDds: false,
);

View file

@ -1,34 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the DDS allows for listening to a custom stream
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} catch (e) {
fail('Unable to subscribe to a custom stream: $e');
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: false,
enableDds: true,
);

View file

@ -44,6 +44,15 @@ var tests = <VMTest>[
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -1,35 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the VM Service throws an exception when encountering a custom
// stream.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: true,
enableDds: false,
);

View file

@ -1,34 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'test_helper.dart';
Future streamListen(VM vm, String streamId) async =>
await vm.invokeRpcNoUpgrade(
'streamListen',
{
'streamId': streamId,
},
);
var tests = <VMTest>[
// Ensure the DDS allows for listening to a custom stream
(VM vm) async {
try {
await streamListen(vm, 'Foo');
} catch (e) {
fail('Unable to subscribe to a custom stream: $e');
}
}
];
main(args) => runVMTests(
args,
tests,
enableService: false,
enableDds: true,
);

View file

@ -44,6 +44,15 @@ var tests = <VMTest>[
expect(e.message, 'Stream not subscribed');
}
},
// Check subscription to invalid stream fails.
(VM vm) async {
try {
await streamListen(vm, 'Foo');
fail('Subscribed to invalid stream');
} on ServerRpcException catch (e) {
expect(e.message, "streamListen: invalid 'streamId' parameter: Foo");
}
}
];
main(args) => runVMTests(args, tests);

View file

@ -409,8 +409,6 @@ StreamInfo Service::profiler_stream("Profiler");
const uint8_t* Service::dart_library_kernel_ = NULL;
intptr_t Service::dart_library_kernel_len_ = 0;
// Keep streams_ in sync with the protected streams in
// lib/developer/extension.dart
static StreamInfo* const streams_[] = {
&Service::vm_stream, &Service::isolate_stream,
&Service::debug_stream, &Service::gc_stream,

View file

@ -146,41 +146,13 @@ external bool get extensionStreamHasListener;
/// event stream.
///
/// If [extensionStreamHasListener] is false, this method is a no-op.
/// Override [stream] to set the destination stream that the event should be
/// posted to. The [stream] may not start with an underscore or be a core VM
/// Service stream.
void postEvent(String eventKind, Map eventData, {String stream = 'Extension'}) {
const destinationStreamKey = '__destinationStream';
// Keep protected streams in sync with `streams_` in runtime/vm/service.cc
// `Extension` is the only stream that should not be protected here.
final protectedStreams = <String>[
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
];
if (protectedStreams.contains(stream)) {
throw ArgumentError.value(
stream, 'stream', 'Cannot be a protected stream.');
} else if (stream.startsWith('_')) {
throw ArgumentError.value(
stream, 'stream', 'Cannot start with and underscore.');
}
void postEvent(String eventKind, Map eventData) {
if (!extensionStreamHasListener) {
return;
}
// TODO: When NNBD is complete, delete the following two lines.
checkNotNullable(eventKind, 'eventKind');
checkNotNullable(eventData, 'eventData');
checkNotNullable(stream, 'stream');
eventData[destinationStreamKey] = stream;
String eventDataAsString = json.encode(eventData);
_postEvent(eventKind, eventDataAsString);
}

View file

@ -1,39 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:convert';
import 'dart:async';
import 'dart:developer';
import 'package:expect/expect.dart';
final protectedStreams = [
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
'_aStreamThatStartsWithAnUnderScore'
];
main() {
for (final protectedStream in protectedStreams) {
Expect.throws(
() {
postEvent('theEvent', {'the': 'data'}, stream: protectedStream);
},
(_) => true,
'Should not allow posting to $protectedStream protected stream',
);
}
// The Extension stream in not protected so calling this should not fail
postEvent('theEvent', {'the': 'data'}, stream: 'Extension');
// Should be allowed to post to a non-protecvted custom stream
postEvent('theEvent', {'the': 'data'}, stream: 'someCustomStream');
}

View file

@ -1,39 +0,0 @@
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:convert';
import 'dart:async';
import 'dart:developer';
import 'package:expect/expect.dart';
final protectedStreams = [
'VM',
'Isolate',
'Debug',
'GC',
'_Echo',
'HeapSnapshot',
'Logging',
'Timeline',
'Profiler',
'_aStreamThatStartsWithAnUnderScore'
];
main() {
for (final protectedStream in protectedStreams) {
Expect.throws(
() {
postEvent('theEvent', {'the': 'data'}, stream: protectedStream);
},
(_) => true,
'Should not allow posting to $protectedStream protected stream',
);
}
// The Extension stream in not protected so calling this should not fail
postEvent('theEvent', {'the': 'data'}, stream: 'Extension');
// Should be allowed to post to a non-protecvted custom stream
postEvent('theEvent', {'the': 'data'}, stream: 'someCustomStream');
}