diff --git a/pkg/service_extension_router/lib/service_extension_router.dart b/pkg/service_extension_router/lib/service_extension_router.dart index f2197de02cc..9adbadf01db 100644 --- a/pkg/service_extension_router/lib/service_extension_router.dart +++ b/pkg/service_extension_router/lib/service_extension_router.dart @@ -5,4 +5,4 @@ library; export 'src/stream_manager.dart'; -export 'src/service_extension_manager.dart'; +export 'src/client.dart'; diff --git a/pkg/service_extension_router/lib/src/client.dart b/pkg/service_extension_router/lib/src/client.dart new file mode 100644 index 00000000000..95cb8470551 --- /dev/null +++ b/pkg/service_extension_router/lib/src/client.dart @@ -0,0 +1,13 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// TODO(danchevalier): Add documentation before major release +abstract class Client { + void streamNotify(String stream, Map data); + + Future close(); + Future sendRequest({required String method, dynamic parameters}); + + final Map services = {}; +} diff --git a/pkg/service_extension_router/lib/src/service_extension_manager.dart b/pkg/service_extension_router/lib/src/service_extension_manager.dart deleted file mode 100644 index a0c62a770a0..00000000000 --- a/pkg/service_extension_router/lib/src/service_extension_manager.dart +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -class ServiceExtensionManager {} diff --git a/pkg/service_extension_router/lib/src/stream_manager.dart b/pkg/service_extension_router/lib/src/stream_manager.dart index 1908e51ab76..f1e59313d3c 100644 --- a/pkg/service_extension_router/lib/src/stream_manager.dart +++ b/pkg/service_extension_router/lib/src/stream_manager.dart @@ -2,4 +2,85 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -class StreamManager {} +import 'client.dart'; + +// TODO(danchevalier): Add documentation before major release +class StreamManager { + StreamManager({this.onStreamCancel}); + Future Function(Client client, String stream)? onStreamCancel; + + final _streamListeners = + >{}; // TODO should this be set of StreamClient? + List? getListenersFor({required String stream}) => + _streamListeners[stream]; + + /// Returns true if [client] is subscribed to [stream] + bool isSubscribed(Client client, String stream) { + return _streamListeners[stream]?.contains(client) ?? false; + } + + /// Returns true if [stream] has any [ServiceExtensionClients] subscribed. + bool hasSubscriptions(String stream) { + return _streamListeners[stream]?.isNotEmpty ?? false; + } + + /// Triggers [Client.streamNotify] for all clients subscribed + /// to [stream]. + void postEvent( + String stream, + Map data, { + Client? excludedClient, + }) { + final listeners = _streamListeners[stream] ?? const []; + for (final listener in listeners) { + if (listener == excludedClient) continue; + listener.streamNotify(stream, data); + } + } + + /// Subscribes `client` to a stream. + /// + /// If `client` is the first client to listen to `stream`, DDS will send a + /// `streamListen` request for `stream` to the VM service. + void streamListen( + Client client, + String stream, + ) async { + _streamListeners.putIfAbsent(stream, () => []); + if (_streamListeners[stream]!.contains(client)) { + throw StreamAlreadyListeningException(stream, client); + } + _streamListeners[stream]!.add(client); + } + + /// Unsubscribes [client] from [stream]. + Future streamCancel( + Client client, + String stream, + ) async { + if (!_streamListeners.containsKey(stream)) return; + + _streamListeners[stream]!.remove(client); + if (onStreamCancel != null) { + await onStreamCancel!(client, stream); + } + } + + Future onClientDisconnect(Client client) async { + await Future.wait([ + for (final stream in _streamListeners.keys) + if (_streamListeners[stream]!.contains(client)) + streamCancel(client, stream), + ]); + } +} + +class StreamAlreadyListeningException implements Exception { + const StreamAlreadyListeningException(this.stream, this.client); + final String stream; + final Client client; + + @override + String toString() => + "Client, with hashCode ${client.hashCode}, is already subscribed to stream $stream"; +} diff --git a/pkg/service_extension_router/test/stream_manager_test.dart b/pkg/service_extension_router/test/stream_manager_test.dart index f0b2076343c..f10e4cbdf19 100644 --- a/pkg/service_extension_router/test/stream_manager_test.dart +++ b/pkg/service_extension_router/test/stream_manager_test.dart @@ -2,12 +2,114 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'package:service_extension_router/src/client.dart'; +import 'package:service_extension_router/src/stream_manager.dart'; + import 'package:test/test.dart'; -void main() { - group('StreamManager', () { - setUp(() {}); +class TestStreamClient extends Client { + int closeCount = 0; + int sendRequestCount = 0; + int streamNotifyCount = 0; + Map? notification; - test('First Test', () {}); + @override + Future close() { + closeCount++; + return Future.value(); + } + + @override + Future sendRequest({required String method, parameters}) { + sendRequestCount++; + return Future.value(); + } + + @override + void streamNotify(String stream, Map data) { + streamNotifyCount++; + notification = data; + } +} + +void main() { + late TestStreamClient client; + late StreamManager manager; + group('Stream Manager', () { + setUp(() { + client = TestStreamClient(); + manager = StreamManager(); + }); + + test('streamListen lets a client recieve messages for post', () { + final message = {'message': 'A message'}; + manager.streamListen(client, 'A'); + + manager.postEvent('B', {'message': 'B message'}); + expect(client.streamNotifyCount, equals(0)); + expect(client.notification, isNull); + + manager.postEvent('A', message); + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(message)); + }); + + test('streamCancel removes the client from the stream', () { + final messageA = {'message': 'Message A'}; + final messageA2 = {'message': 'Message A2'}; + + final clientA2 = TestStreamClient(); + manager.streamListen(client, 'A'); + manager.streamListen(clientA2, 'A'); + + manager.postEvent('A', messageA); + + expect(client.notification, equals(messageA)); + expect(client.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + + manager.streamCancel(client, 'A'); + + manager.postEvent('A', messageA2); + + expect(client.notification, equals(messageA)); + expect(client.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA2)); + expect(clientA2.streamNotifyCount, equals(2)); + }); + + test('postEvent notifies clients', () { + final messageA = {'message': 'Message A'}; + final messageB = {'message': 'Message B'}; + final clientA2 = TestStreamClient(); + final clientB = TestStreamClient(); + manager.streamListen(client, 'A'); + manager.streamListen(clientA2, 'A'); + manager.streamListen(clientB, 'B'); + + manager.postEvent( + 'A', + messageA, + ); + + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientB.streamNotifyCount, equals(0)); + expect(clientB.notification, isNull); + + manager.postEvent( + 'B', + messageB, + ); + expect(client.streamNotifyCount, equals(1)); + expect(client.notification, equals(messageA)); + expect(clientA2.streamNotifyCount, equals(1)); + expect(clientA2.notification, equals(messageA)); + expect(clientB.streamNotifyCount, equals(1)); + expect(clientB.notification, messageB); + }); }); }