diff --git a/pkg/dds/CHANGELOG.md b/pkg/dds/CHANGELOG.md index cc06d36ae16..a003ccc1b06 100644 --- a/pkg/dds/CHANGELOG.md +++ b/pkg/dds/CHANGELOG.md @@ -1,3 +1,7 @@ +# 2.9.0 +- Updated DDS protocol to version 1.6. +- Add `postEvent RPC. + # 2.8.3 - Pass-through expression evaluation types, method and class information. diff --git a/pkg/dds/dds_protocol.md b/pkg/dds/dds_protocol.md index fcf8ae28ff8..49f84123566 100644 --- a/pkg/dds/dds_protocol.md +++ b/pkg/dds/dds_protocol.md @@ -17,22 +17,23 @@ The Service Protocol uses [JSON-RPC 2.0][]. - [Revision History](#revision-history) - [Public RPCs](#public-rpcs) - [getAvailableCachedCpuSamples](#getavailablecachedcpusamples) - - [getCachedCpuSamples](#getcachedcpusamples)[ + - [getCachedCpuSamples](#getcachedcpusamples) - [getClientName](#getclientname) - [getDartDevelopmentServiceVersion](#getdartdevelopmentserviceversion) - [getLogHistorySize](#getloghistorysize) - [getPerfettoVMTimeline](#getperfettovmtimeline) - - [getStreamHistory](#getstreamhistory)[ + - [getStreamHistory](#getstreamhistory) + - [postEvent](#postevent) - [requirePermissionToResume](#requirepermissiontoresume) - [setClientName](#setclientname) - [setLogHistorySize](#setloghistorysize) - [Public Types](#public-types) - - [AvailableCachedCpuSamples](#availablecachedcpusamples)[ + - [AvailableCachedCpuSamples](#availablecachedcpusamples) - [CachedCpuSamples](#cachedcpusamples) - [ClientName](#clientname) - [DartDevelopmentServiceVersion](#dartdevelopmentserviceversion) - [Size](#size) - - [StreamHistory](#streamhistory)[ + - [StreamHistory](#streamhistory) ## RPCs, Requests, and Responses @@ -164,6 +165,13 @@ streams). See [StreamHistory](#streamhistory). +### postEvent + +``` +void postEvent(String stream, String eventKind, Map eventData) +``` +The _postEvent_ RPC is used to send events to custom Event streams. + ### requirePermissionToResume ``` @@ -302,6 +310,7 @@ version | comments 1.3 | Added `getAvailableCachedCpuSamples` and `getCachedCpuSamples` RPCs. 1.4 | Added the ability to subscribe to custom streams (which can be specified when calling `dart:developer`'s `postEvent`). 1.5 | Added `getPerfettoCpuSamples` RPC. +1.6 | Added `postEvent` RPC. [resume]: https://github.com/dart-lang/sdk/blob/main/runtime/vm/service/service.md#resume [success]: https://github.com/dart-lang/sdk/blob/main/runtime/vm/service/service.md#success diff --git a/pkg/dds/lib/dds.dart b/pkg/dds/lib/dds.dart index d14cf56128e..86de77ca509 100644 --- a/pkg/dds/lib/dds.dart +++ b/pkg/dds/lib/dds.dart @@ -160,7 +160,7 @@ abstract class DartDevelopmentService { /// The version of the DDS protocol supported by this [DartDevelopmentService] /// instance. - static const String protocolVersion = '1.5'; + static const String protocolVersion = '1.6'; } class DartDevelopmentServiceException implements Exception { diff --git a/pkg/dds/lib/src/client.dart b/pkg/dds/lib/src/client.dart index eb22d676ddf..9cda5d06a87 100644 --- a/pkg/dds/lib/src/client.dart +++ b/pkg/dds/lib/src/client.dart @@ -113,6 +113,14 @@ class DartDevelopmentServiceClient { return RPCResponses.success; }); + _clientPeer.registerMethod('postEvent', (parameters) async { + final eventKind = parameters['eventKind'].asString; + final eventData = parameters['eventData'].asMap; + final stream = parameters['stream'].asString; + dds.streamManager.postEvent(stream, eventKind, eventData); + return RPCResponses.success; + }); + _clientPeer.registerMethod('streamCpuSamplesWithUserTag', (parameters) async { final userTags = parameters['userTags'].asList.cast(); diff --git a/pkg/dds/lib/src/rpc_error_codes.dart b/pkg/dds/lib/src/rpc_error_codes.dart index 99895cb862e..31eacf1eae3 100644 --- a/pkg/dds/lib/src/rpc_error_codes.dart +++ b/pkg/dds/lib/src/rpc_error_codes.dart @@ -41,6 +41,8 @@ abstract class RpcErrorCodes { static const kExpressionCompilationError = 113; // static const kInvalidTimelineRequest = 114; + static const kCustomStreamDoesNotExist = 130; + static const kCoreStreamNotAllowed = 131; // Experimental (used in private rpcs). // static const kFileSystemAlreadyExists = 1001; @@ -54,5 +56,7 @@ abstract class RpcErrorCodes { kServiceAlreadyRegistered: 'Service already registered', kServiceDisappeared: 'Service has disappeared', kExpressionCompilationError: 'Expression compilation error', + kCustomStreamDoesNotExist: 'Custom stream does not exist', + kCoreStreamNotAllowed: 'Core streams are not allowed', }; } diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart index 28a0e5b4ee9..4df895cb08c 100644 --- a/pkg/dds/lib/src/stream_manager.dart +++ b/pkg/dds/lib/src/stream_manager.dart @@ -187,6 +187,36 @@ class StreamManager { ); } + /// Send an event to the [stream]. + /// + /// [stream] must be a registered custom stream (i.e., not a stream specified + /// as part of the VM service protocol). + /// + /// If [stream] is not a registered custom stream, an [RPCError] with code + /// [kCustomStreamDoesNotExist] will be thrown. + /// + /// If [stream] is a core stream, an [RPCError] with code + /// [kCoreStreamNotAllowed] will be thrown. + void postEvent( + String stream, String eventKind, Map eventData) { + if (coreStreams.contains(stream)) { + throw kCoreStreamNotAllowed; + } + if (!customStreamListenerKeys.contains(stream)) { + throw kCustomStreamDoesNotExist; + } + + streamNotify(stream, { + 'streamId': stream, + 'event': { + 'kind': 'Extension', + 'timestamp': DateTime.now().millisecondsSinceEpoch, + 'extensionData': eventData, + 'extensionKind': eventKind, + }, + }); + } + /// Subscribes `client` to a stream. /// /// If `client` is the first client to listen to `stream`, DDS will send a @@ -386,16 +416,43 @@ class StreamManager { RpcErrorCodes.kStreamNotSubscribed, ); + static final kCustomStreamDoesNotExist = RpcErrorCodes.buildRpcException( + RpcErrorCodes.kCustomStreamDoesNotExist, + ); + static final kCoreStreamNotAllowed = RpcErrorCodes.buildRpcException( + RpcErrorCodes.kCoreStreamNotAllowed, + ); + + static const kEchoStream = '_Echo'; static const kDebugStream = 'Debug'; static const kExtensionStream = 'Extension'; + static const kHeapSnapshotStream = 'HeapSnapshot'; static const kIsolateStream = 'Isolate'; + static const kGCStream = 'GC'; static const kLoggingStream = 'Logging'; static const kProfilerStream = 'Profiler'; static const kStderrStream = 'Stderr'; static const kStdoutStream = 'Stdout'; + static const kTimelineStream = 'Timeline'; + static const kVMStream = 'VM'; static const destinationStreamKey = '__destinationStream'; + static const coreStreams = [ + kEchoStream, + kDebugStream, + kExtensionStream, + kHeapSnapshotStream, + kIsolateStream, + kGCStream, + kLoggingStream, + kProfilerStream, + kStderrStream, + kStdoutStream, + kTimelineStream, + kVMStream, + ]; + static Map loggingRepositories = {}; // Never cancel the Debug or Isolate stream as `IsolateManager` requires diff --git a/pkg/dds/pubspec.yaml b/pkg/dds/pubspec.yaml index f0193571a49..fd8baef43d0 100644 --- a/pkg/dds/pubspec.yaml +++ b/pkg/dds/pubspec.yaml @@ -1,5 +1,5 @@ name: dds -version: 2.8.3 +version: 2.9.0 description: >- A library used to spawn the Dart Developer Service, used to communicate with a Dart VM Service instance. diff --git a/pkg/dds/test/dds_client_post_event_test.dart b/pkg/dds/test/dds_client_post_event_test.dart new file mode 100644 index 00000000000..4d5ed80678e --- /dev/null +++ b/pkg/dds/test/dds_client_post_event_test.dart @@ -0,0 +1,75 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:dds/dds.dart'; +import 'package:dds/src/rpc_error_codes.dart'; +import 'package:dds_service_extensions/dds_service_extensions.dart'; +import 'package:test/test.dart'; +import 'package:vm_service/src/vm_service.dart'; +import 'package:vm_service/vm_service.dart' as vm; +import 'package:vm_service/vm_service_io.dart'; + +import 'common/test_helper.dart'; + +void main() { + late final Process process; + late final DartDevelopmentService dds; + + setUp(() async { + // We don't care what's actually running in the target process for this + // test, so we're just using an existing one. + process = await spawnDartProcess( + 'get_stream_history_script.dart', + pauseOnStart: false, + ); + dds = await DartDevelopmentService.startDartDevelopmentService( + remoteVmServiceUri, + ); + }); + + tearDown(() async { + await dds.shutdown(); + process.kill(); + }); + + test('Ensure postEvent behaves as expected', () async { + expect(dds.isRunning, true); + int caughtCount = 0; + final service = await vmServiceConnectUri(dds.wsUri.toString()); + final originalExtensionData = {'some': 'testData'}; + // Test if the custom stream doesn't exist yet, we throw an error. + try { + await service.postEvent('testStream', 'testKind', originalExtensionData); + } on vm.RPCError catch (e) { + expect(e.code, RpcErrorCodes.kCustomStreamDoesNotExist); + caughtCount++; + } + expect(caughtCount, 1); + // Test if using a core stream, we throw an error. + try { + await service.postEvent('Logging', 'testKind', originalExtensionData); + } on vm.RPCError catch (e) { + expect(e.code, RpcErrorCodes.kCoreStreamNotAllowed); + caughtCount++; + } + expect(caughtCount, 2); + + // Test when the stream exists that the event is propagated. + final completer = Completer(); + ExtensionData? eventExtensionData; + + service.onEvent('testStream').listen((event) { + eventExtensionData = event.extensionData; + completer.complete(); + }); + await service.streamListen('testStream'); + await service.postEvent('testStream', 'testKind', originalExtensionData); + + await completer.future; + expect(eventExtensionData?.data, equals(originalExtensionData)); + }); +} diff --git a/pkg/dds_service_extensions/CHANGELOG.md b/pkg/dds_service_extensions/CHANGELOG.md index 9920d6c1dd2..7a1ba9ae62e 100644 --- a/pkg/dds_service_extensions/CHANGELOG.md +++ b/pkg/dds_service_extensions/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.5.0 +- Added `DdsExtension.postEvent`. + ## 1.4.0 - Added `DdsExtension.getPerfettoVMTimelineWithCpuSamples`. diff --git a/pkg/dds_service_extensions/lib/dds_service_extensions.dart b/pkg/dds_service_extensions/lib/dds_service_extensions.dart index fc114e92e98..fa63afda5ec 100644 --- a/pkg/dds_service_extensions/lib/dds_service_extensions.dart +++ b/pkg/dds_service_extensions/lib/dds_service_extensions.dart @@ -56,6 +56,31 @@ extension DdsExtension on VmService { }); } + /// Send an event to the [stream]. + /// + /// [stream] must be a registered custom stream (i.e., not a stream specified + /// as part of the VM service protocol). + /// + /// If [stream] is not a registered custom stream, an [RPCError] with code + /// [kCustomStreamDoesNotExist] will be thrown. + /// + /// If [stream] is a core stream, an [RPCError] with code + /// [kCoreStreamNotAllowed] will be thrown. + Future postEvent( + String stream, + String eventKind, + Map eventData, + ) async { + if (!(await _versionCheck(1, 6))) { + throw UnimplementedError('postEvent requires DDS version 1.6'); + } + return _callHelper('postEvent', args: { + 'eventKind': eventKind, + 'eventData': eventData, + 'stream': stream, + }); + } + /// The [getAvailableCachedCpuSamples] RPC is used to determine which caches of CPU samples /// are available. Caches are associated with individual [UserTag] names and are specified /// when DDS is started via the `cachedUserTags` parameter. diff --git a/pkg/dds_service_extensions/pubspec.yaml b/pkg/dds_service_extensions/pubspec.yaml index 70a2ce296da..cc3f4ceb216 100644 --- a/pkg/dds_service_extensions/pubspec.yaml +++ b/pkg/dds_service_extensions/pubspec.yaml @@ -1,5 +1,5 @@ name: dds_service_extensions -version: 1.4.0 +version: 1.5.0 description: >- Extension methods for `package:vm_service`, used to make requests a Dart Development Service (DDS) instance.