Add postEvent to dds client.

This will allow for communication to custom streams through a connection to DDS. The primary use case for this is so Dart DevTools can communicate to the running app's custom streams, and by extension VSCode.

Bug: https://github.com/flutter/devtools/issues/5819
Change-Id: Ib22181a55a15baa4a85f49fb20d86d1ca8f0e5e7
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/304981
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Dan Chevalier <danchevalier@google.com>
This commit is contained in:
Dan Chevalier 2023-06-02 19:07:39 +00:00 committed by Commit Queue
parent 25c0ae1c42
commit 46d9f76f7e
11 changed files with 192 additions and 7 deletions

View file

@ -1,3 +1,7 @@
# 2.9.0
- Updated DDS protocol to version 1.6.
- Add `postEvent RPC.
# 2.8.3
- Pass-through expression evaluation types, method and class information.

View file

@ -17,22 +17,23 @@ The Service Protocol uses [JSON-RPC 2.0][].
- [Revision History](#revision-history)
- [Public RPCs](#public-rpcs)
- [getAvailableCachedCpuSamples](#getavailablecachedcpusamples)
- [getCachedCpuSamples](#getcachedcpusamples)[
- [getCachedCpuSamples](#getcachedcpusamples)
- [getClientName](#getclientname)
- [getDartDevelopmentServiceVersion](#getdartdevelopmentserviceversion)
- [getLogHistorySize](#getloghistorysize)
- [getPerfettoVMTimeline](#getperfettovmtimeline)
- [getStreamHistory](#getstreamhistory)[
- [getStreamHistory](#getstreamhistory)
- [postEvent](#postevent)
- [requirePermissionToResume](#requirepermissiontoresume)
- [setClientName](#setclientname)
- [setLogHistorySize](#setloghistorysize)
- [Public Types](#public-types)
- [AvailableCachedCpuSamples](#availablecachedcpusamples)[
- [AvailableCachedCpuSamples](#availablecachedcpusamples)
- [CachedCpuSamples](#cachedcpusamples)
- [ClientName](#clientname)
- [DartDevelopmentServiceVersion](#dartdevelopmentserviceversion)
- [Size](#size)
- [StreamHistory](#streamhistory)[
- [StreamHistory](#streamhistory)
## RPCs, Requests, and Responses
@ -164,6 +165,13 @@ streams).
See [StreamHistory](#streamhistory).
### postEvent
```
void postEvent(String stream, String eventKind, Map eventData)
```
The _postEvent_ RPC is used to send events to custom Event streams.
### requirePermissionToResume
```
@ -302,6 +310,7 @@ version | comments
1.3 | Added `getAvailableCachedCpuSamples` and `getCachedCpuSamples` RPCs.
1.4 | Added the ability to subscribe to custom streams (which can be specified when calling `dart:developer`'s `postEvent`).
1.5 | Added `getPerfettoCpuSamples` RPC.
1.6 | Added `postEvent` RPC.
[resume]: https://github.com/dart-lang/sdk/blob/main/runtime/vm/service/service.md#resume
[success]: https://github.com/dart-lang/sdk/blob/main/runtime/vm/service/service.md#success

View file

@ -160,7 +160,7 @@ abstract class DartDevelopmentService {
/// The version of the DDS protocol supported by this [DartDevelopmentService]
/// instance.
static const String protocolVersion = '1.5';
static const String protocolVersion = '1.6';
}
class DartDevelopmentServiceException implements Exception {

View file

@ -113,6 +113,14 @@ class DartDevelopmentServiceClient {
return RPCResponses.success;
});
_clientPeer.registerMethod('postEvent', (parameters) async {
final eventKind = parameters['eventKind'].asString;
final eventData = parameters['eventData'].asMap;
final stream = parameters['stream'].asString;
dds.streamManager.postEvent(stream, eventKind, eventData);
return RPCResponses.success;
});
_clientPeer.registerMethod('streamCpuSamplesWithUserTag',
(parameters) async {
final userTags = parameters['userTags'].asList.cast<String>();

View file

@ -41,6 +41,8 @@ abstract class RpcErrorCodes {
static const kExpressionCompilationError = 113;
// static const kInvalidTimelineRequest = 114;
static const kCustomStreamDoesNotExist = 130;
static const kCoreStreamNotAllowed = 131;
// Experimental (used in private rpcs).
// static const kFileSystemAlreadyExists = 1001;
@ -54,5 +56,7 @@ abstract class RpcErrorCodes {
kServiceAlreadyRegistered: 'Service already registered',
kServiceDisappeared: 'Service has disappeared',
kExpressionCompilationError: 'Expression compilation error',
kCustomStreamDoesNotExist: 'Custom stream does not exist',
kCoreStreamNotAllowed: 'Core streams are not allowed',
};
}

View file

@ -187,6 +187,36 @@ class StreamManager {
);
}
/// Send an event to the [stream].
///
/// [stream] must be a registered custom stream (i.e., not a stream specified
/// as part of the VM service protocol).
///
/// If [stream] is not a registered custom stream, an [RPCError] with code
/// [kCustomStreamDoesNotExist] will be thrown.
///
/// If [stream] is a core stream, an [RPCError] with code
/// [kCoreStreamNotAllowed] will be thrown.
void postEvent(
String stream, String eventKind, Map<String, Object?> eventData) {
if (coreStreams.contains(stream)) {
throw kCoreStreamNotAllowed;
}
if (!customStreamListenerKeys.contains(stream)) {
throw kCustomStreamDoesNotExist;
}
streamNotify(stream, <String, dynamic>{
'streamId': stream,
'event': {
'kind': 'Extension',
'timestamp': DateTime.now().millisecondsSinceEpoch,
'extensionData': eventData,
'extensionKind': eventKind,
},
});
}
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
@ -386,16 +416,43 @@ class StreamManager {
RpcErrorCodes.kStreamNotSubscribed,
);
static final kCustomStreamDoesNotExist = RpcErrorCodes.buildRpcException(
RpcErrorCodes.kCustomStreamDoesNotExist,
);
static final kCoreStreamNotAllowed = RpcErrorCodes.buildRpcException(
RpcErrorCodes.kCoreStreamNotAllowed,
);
static const kEchoStream = '_Echo';
static const kDebugStream = 'Debug';
static const kExtensionStream = 'Extension';
static const kHeapSnapshotStream = 'HeapSnapshot';
static const kIsolateStream = 'Isolate';
static const kGCStream = 'GC';
static const kLoggingStream = 'Logging';
static const kProfilerStream = 'Profiler';
static const kStderrStream = 'Stderr';
static const kStdoutStream = 'Stdout';
static const kTimelineStream = 'Timeline';
static const kVMStream = 'VM';
static const destinationStreamKey = '__destinationStream';
static const coreStreams = <String>[
kEchoStream,
kDebugStream,
kExtensionStream,
kHeapSnapshotStream,
kIsolateStream,
kGCStream,
kLoggingStream,
kProfilerStream,
kStderrStream,
kStdoutStream,
kTimelineStream,
kVMStream,
];
static Map<String, LoggingRepository> loggingRepositories = {};
// Never cancel the Debug or Isolate stream as `IsolateManager` requires

View file

@ -1,5 +1,5 @@
name: dds
version: 2.8.3
version: 2.9.0
description: >-
A library used to spawn the Dart Developer Service, used to communicate with
a Dart VM Service instance.

View file

@ -0,0 +1,75 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:dds/src/rpc_error_codes.dart';
import 'package:dds_service_extensions/dds_service_extensions.dart';
import 'package:test/test.dart';
import 'package:vm_service/src/vm_service.dart';
import 'package:vm_service/vm_service.dart' as vm;
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';
void main() {
late final Process process;
late final DartDevelopmentService dds;
setUp(() async {
// We don't care what's actually running in the target process for this
// test, so we're just using an existing one.
process = await spawnDartProcess(
'get_stream_history_script.dart',
pauseOnStart: false,
);
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
});
tearDown(() async {
await dds.shutdown();
process.kill();
});
test('Ensure postEvent behaves as expected', () async {
expect(dds.isRunning, true);
int caughtCount = 0;
final service = await vmServiceConnectUri(dds.wsUri.toString());
final originalExtensionData = {'some': 'testData'};
// Test if the custom stream doesn't exist yet, we throw an error.
try {
await service.postEvent('testStream', 'testKind', originalExtensionData);
} on vm.RPCError catch (e) {
expect(e.code, RpcErrorCodes.kCustomStreamDoesNotExist);
caughtCount++;
}
expect(caughtCount, 1);
// Test if using a core stream, we throw an error.
try {
await service.postEvent('Logging', 'testKind', originalExtensionData);
} on vm.RPCError catch (e) {
expect(e.code, RpcErrorCodes.kCoreStreamNotAllowed);
caughtCount++;
}
expect(caughtCount, 2);
// Test when the stream exists that the event is propagated.
final completer = Completer<void>();
ExtensionData? eventExtensionData;
service.onEvent('testStream').listen((event) {
eventExtensionData = event.extensionData;
completer.complete();
});
await service.streamListen('testStream');
await service.postEvent('testStream', 'testKind', originalExtensionData);
await completer.future;
expect(eventExtensionData?.data, equals(originalExtensionData));
});
}

View file

@ -1,3 +1,6 @@
## 1.5.0
- Added `DdsExtension.postEvent`.
## 1.4.0
- Added `DdsExtension.getPerfettoVMTimelineWithCpuSamples`.

View file

@ -56,6 +56,31 @@ extension DdsExtension on VmService {
});
}
/// Send an event to the [stream].
///
/// [stream] must be a registered custom stream (i.e., not a stream specified
/// as part of the VM service protocol).
///
/// If [stream] is not a registered custom stream, an [RPCError] with code
/// [kCustomStreamDoesNotExist] will be thrown.
///
/// If [stream] is a core stream, an [RPCError] with code
/// [kCoreStreamNotAllowed] will be thrown.
Future<void> postEvent(
String stream,
String eventKind,
Map<String, Object?> eventData,
) async {
if (!(await _versionCheck(1, 6))) {
throw UnimplementedError('postEvent requires DDS version 1.6');
}
return _callHelper<void>('postEvent', args: {
'eventKind': eventKind,
'eventData': eventData,
'stream': stream,
});
}
/// The [getAvailableCachedCpuSamples] RPC is used to determine which caches of CPU samples
/// are available. Caches are associated with individual [UserTag] names and are specified
/// when DDS is started via the `cachedUserTags` parameter.

View file

@ -1,5 +1,5 @@
name: dds_service_extensions
version: 1.4.0
version: 1.5.0
description: >-
Extension methods for `package:vm_service`, used to make requests a
Dart Development Service (DDS) instance.