From c7f122ee9ebb5b168c5bdda195878080e68887c7 Mon Sep 17 00:00:00 2001 From: Dan Chevalier Date: Tue, 24 Oct 2023 21:58:21 +0000 Subject: [PATCH] Initial Implementation for Service Extension Router This instance of service_extension_router has been hooked into DDS seperately. The vm_service and DDS tests still worked with the integration. There will still be some iteration and discussion to be had, but I'd like to get this up as the initial version. Bug: https://github.com/dart-lang/sdk/issues/53300 Change-Id: Ic72b770a61b2fc61fbd7c4b47ff8996a90f78665 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/331820 Reviewed-by: Ben Konyi Commit-Queue: Dan Chevalier --- .../lib/service_extension_router.dart | 2 +- .../lib/src/client.dart | 13 +++ .../lib/src/service_extension_manager.dart | 5 - .../lib/src/stream_manager.dart | 83 ++++++++++++- .../test/stream_manager_test.dart | 110 +++++++++++++++++- 5 files changed, 202 insertions(+), 11 deletions(-) create mode 100644 pkg/service_extension_router/lib/src/client.dart delete mode 100644 pkg/service_extension_router/lib/src/service_extension_manager.dart diff --git a/pkg/service_extension_router/lib/service_extension_router.dart b/pkg/service_extension_router/lib/service_extension_router.dart index f2197de02cc..9adbadf01db 100644 --- a/pkg/service_extension_router/lib/service_extension_router.dart +++ b/pkg/service_extension_router/lib/service_extension_router.dart @@ -5,4 +5,4 @@ library; export 'src/stream_manager.dart'; -export 'src/service_extension_manager.dart'; +export 'src/client.dart'; diff --git a/pkg/service_extension_router/lib/src/client.dart b/pkg/service_extension_router/lib/src/client.dart new file mode 100644 index 00000000000..95cb8470551 --- /dev/null +++ b/pkg/service_extension_router/lib/src/client.dart @@ -0,0 +1,13 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// TODO(danchevalier): Add documentation before major release +abstract class Client { + void streamNotify(String stream, Map data); + + Future close(); + Future sendRequest({required String method, dynamic parameters}); + + final Map services = {}; +} diff --git a/pkg/service_extension_router/lib/src/service_extension_manager.dart b/pkg/service_extension_router/lib/src/service_extension_manager.dart deleted file mode 100644 index a0c62a770a0..00000000000 --- a/pkg/service_extension_router/lib/src/service_extension_manager.dart +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -class ServiceExtensionManager {} diff --git a/pkg/service_extension_router/lib/src/stream_manager.dart b/pkg/service_extension_router/lib/src/stream_manager.dart index 1908e51ab76..f1e59313d3c 100644 --- a/pkg/service_extension_router/lib/src/stream_manager.dart +++ b/pkg/service_extension_router/lib/src/stream_manager.dart @@ -2,4 +2,85 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -class StreamManager {} +import 'client.dart'; + +// TODO(danchevalier): Add documentation before major release +class StreamManager { + StreamManager({this.onStreamCancel}); + Future Function(Client client, String stream)? onStreamCancel; + + final _streamListeners = + >{}; // TODO should this be set of StreamClient? + List? getListenersFor({required String stream}) => + _streamListeners[stream]; + + /// Returns true if [client] is subscribed to [stream] + bool isSubscribed(Client client, String stream) { + return _streamListeners[stream]?.contains(client) ?? false; + } + + /// Returns true if [stream] has any [ServiceExtensionClients] subscribed. + bool hasSubscriptions(String stream) { + return _streamListeners[stream]?.isNotEmpty ?? false; + } + + /// Triggers [Client.streamNotify] for all clients subscribed + /// to [stream]. + void postEvent( + String stream, + Map data, { + Client? excludedClient, + }) { + final listeners = _streamListeners[stream] ?? const []; + for (final listener in listeners) { + if (listener == excludedClient) continue; + listener.streamNotify(stream, data); + } + } + + /// Subscribes `client` to a stream. + /// + /// If `client` is the first client to listen to `stream`, DDS will send a + /// `streamListen` request for `stream` to the VM service. + void streamListen( + Client client, + String stream, + ) async { + _streamListeners.putIfAbsent(stream, () => []); + if (_streamListeners[stream]!.contains(client)) { + throw StreamAlreadyListeningException(stream, client); + } + _streamListeners[stream]!.add(client); + } + + /// Unsubscribes [client] from [stream]. + Future streamCancel( + Client client, + String stream, + ) async { + if (!_streamListeners.containsKey(stream)) return; + + _streamListeners[stream]!.remove(client); + if (onStreamCancel != null) { + await onStreamCancel!(client, stream); + } + } + + Future onClientDisconnect(Client client) async { + await Future.wait([ + for (final stream in _streamListeners.keys) + if (_streamListeners[stream]!.contains(client)) + streamCancel(client, stream), + ]); + } +} + +class StreamAlreadyListeningException implements Exception { + const StreamAlreadyListeningException(this.stream, this.client); + final String stream; + final Client client; + + @override + String toString() => + "Client, with hashCode ${client.hashCode}, is already subscribed to stream $stream"; +} diff --git a/pkg/service_extension_router/test/stream_manager_test.dart b/pkg/service_extension_router/test/stream_manager_test.dart index f0b2076343c..f10e4cbdf19 100644 --- a/pkg/service_extension_router/test/stream_manager_test.dart +++ b/pkg/service_extension_router/test/stream_manager_test.dart @@ -2,12 +2,114 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'package:service_extension_router/src/client.dart'; +import 'package:service_extension_router/src/stream_manager.dart'; + import 'package:test/test.dart'; -void main() { - group('StreamManager', () { - setUp(() {}); +class TestStreamClient extends Client { + int closeCount = 0; + int sendRequestCount = 0; + int streamNotifyCount = 0; + Map? notification; - test('First Test', () {}); + @override + Future close() { + closeCount++; + return Future.value(); + } + + @override + Future sendRequest({required String method, parameters}) { + sendRequestCount++; + return Future.value(); + } + + @override + void streamNotify(String stream, Map data) { + streamNotifyCount++; + notification = data; + } +} + +void main() { + late TestStreamClient client; + late StreamManager manager; + group('Stream Manager', () { + setUp(() { + client = TestStreamClient(); + manager = StreamManager(); + }); + + test('streamListen lets a client recieve messages for post', () { + final message = {'message': 'A message'}; + manager.streamListen(client, 'A'); + + manager.postEvent('B', {'message': 'B message'}); + expect(client.streamNotifyCount, equals(0)); + expect(client.notification, isNull); + + manager.postEvent('A', message); + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(message)); + }); + + test('streamCancel removes the client from the stream', () { + final messageA = {'message': 'Message A'}; + final messageA2 = {'message': 'Message A2'}; + + final clientA2 = TestStreamClient(); + manager.streamListen(client, 'A'); + manager.streamListen(clientA2, 'A'); + + manager.postEvent('A', messageA); + + expect(client.notification, equals(messageA)); + expect(client.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + + manager.streamCancel(client, 'A'); + + manager.postEvent('A', messageA2); + + expect(client.notification, equals(messageA)); + expect(client.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA2)); + expect(clientA2.streamNotifyCount, equals(2)); + }); + + test('postEvent notifies clients', () { + final messageA = {'message': 'Message A'}; + final messageB = {'message': 'Message B'}; + final clientA2 = TestStreamClient(); + final clientB = TestStreamClient(); + manager.streamListen(client, 'A'); + manager.streamListen(clientA2, 'A'); + manager.streamListen(clientB, 'B'); + + manager.postEvent( + 'A', + messageA, + ); + + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientB.streamNotifyCount, equals(0)); + expect(clientB.notification, isNull); + + manager.postEvent( + 'B', + messageB, + ); + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientB.streamNotifyCount, equals(1)); + expect(clientB.notification, messageB); + }); }); }