Initial Implementation for Service Extension Router

This instance of service_extension_router has been hooked into DDS seperately. The vm_service and DDS tests still worked with the integration.

There will still be some iteration and discussion to be had, but I'd like to get this up as the initial version.

Bug: https://github.com/dart-lang/sdk/issues/53300
Change-Id: Ic72b770a61b2fc61fbd7c4b47ff8996a90f78665
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/331820
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Dan Chevalier <danchevalier@google.com>
This commit is contained in:
Dan Chevalier 2023-10-24 21:58:21 +00:00 committed by Commit Queue
parent 1df418af1a
commit c7f122ee9e
5 changed files with 202 additions and 11 deletions

View file

@ -5,4 +5,4 @@
library;
export 'src/stream_manager.dart';
export 'src/service_extension_manager.dart';
export 'src/client.dart';

View file

@ -0,0 +1,13 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// TODO(danchevalier): Add documentation before major release
abstract class Client {
void streamNotify(String stream, Map<String, Object?> data);
Future<void> close();
Future<dynamic> sendRequest({required String method, dynamic parameters});
final Map<String, String> services = {};
}

View file

@ -1,5 +0,0 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
class ServiceExtensionManager {}

View file

@ -2,4 +2,85 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
class StreamManager {}
import 'client.dart';
// TODO(danchevalier): Add documentation before major release
class StreamManager {
StreamManager({this.onStreamCancel});
Future<void> Function(Client client, String stream)? onStreamCancel;
final _streamListeners =
<String, List<Client>>{}; // TODO should this be set of StreamClient?
List<Client>? getListenersFor({required String stream}) =>
_streamListeners[stream];
/// Returns true if [client] is subscribed to [stream]
bool isSubscribed(Client client, String stream) {
return _streamListeners[stream]?.contains(client) ?? false;
}
/// Returns true if [stream] has any [ServiceExtensionClients] subscribed.
bool hasSubscriptions(String stream) {
return _streamListeners[stream]?.isNotEmpty ?? false;
}
/// Triggers [Client.streamNotify] for all clients subscribed
/// to [stream].
void postEvent(
String stream,
Map<String, Object?> data, {
Client? excludedClient,
}) {
final listeners = _streamListeners[stream] ?? const <Client>[];
for (final listener in listeners) {
if (listener == excludedClient) continue;
listener.streamNotify(stream, data);
}
}
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
void streamListen(
Client client,
String stream,
) async {
_streamListeners.putIfAbsent(stream, () => <Client>[]);
if (_streamListeners[stream]!.contains(client)) {
throw StreamAlreadyListeningException(stream, client);
}
_streamListeners[stream]!.add(client);
}
/// Unsubscribes [client] from [stream].
Future<void> streamCancel(
Client client,
String stream,
) async {
if (!_streamListeners.containsKey(stream)) return;
_streamListeners[stream]!.remove(client);
if (onStreamCancel != null) {
await onStreamCancel!(client, stream);
}
}
Future<void> onClientDisconnect(Client client) async {
await Future.wait([
for (final stream in _streamListeners.keys)
if (_streamListeners[stream]!.contains(client))
streamCancel(client, stream),
]);
}
}
class StreamAlreadyListeningException implements Exception {
const StreamAlreadyListeningException(this.stream, this.client);
final String stream;
final Client client;
@override
String toString() =>
"Client, with hashCode ${client.hashCode}, is already subscribed to stream $stream";
}

View file

@ -2,12 +2,114 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:service_extension_router/src/client.dart';
import 'package:service_extension_router/src/stream_manager.dart';
import 'package:test/test.dart';
void main() {
group('StreamManager', () {
setUp(() {});
class TestStreamClient extends Client {
int closeCount = 0;
int sendRequestCount = 0;
int streamNotifyCount = 0;
Map<String, dynamic>? notification;
test('First Test', () {});
@override
Future<void> close() {
closeCount++;
return Future.value();
}
@override
Future sendRequest({required String method, parameters}) {
sendRequestCount++;
return Future.value();
}
@override
void streamNotify(String stream, Map<String, Object?> data) {
streamNotifyCount++;
notification = data;
}
}
void main() {
late TestStreamClient client;
late StreamManager manager;
group('Stream Manager', () {
setUp(() {
client = TestStreamClient();
manager = StreamManager();
});
test('streamListen lets a client recieve messages for post', () {
final message = {'message': 'A message'};
manager.streamListen(client, 'A');
manager.postEvent('B', {'message': 'B message'});
expect(client.streamNotifyCount, equals(0));
expect(client.notification, isNull);
manager.postEvent('A', message);
expect(client.streamNotifyCount, equals(1));
expect(client.notification, equals(message));
});
test('streamCancel removes the client from the stream', () {
final messageA = {'message': 'Message A'};
final messageA2 = {'message': 'Message A2'};
final clientA2 = TestStreamClient();
manager.streamListen(client, 'A');
manager.streamListen(clientA2, 'A');
manager.postEvent('A', messageA);
expect(client.notification, equals(messageA));
expect(client.streamNotifyCount, equals(1));
expect(clientA2.notification, equals(messageA));
expect(clientA2.streamNotifyCount, equals(1));
manager.streamCancel(client, 'A');
manager.postEvent('A', messageA2);
expect(client.notification, equals(messageA));
expect(client.streamNotifyCount, equals(1));
expect(clientA2.notification, equals(messageA2));
expect(clientA2.streamNotifyCount, equals(2));
});
test('postEvent notifies clients', () {
final messageA = {'message': 'Message A'};
final messageB = {'message': 'Message B'};
final clientA2 = TestStreamClient();
final clientB = TestStreamClient();
manager.streamListen(client, 'A');
manager.streamListen(clientA2, 'A');
manager.streamListen(clientB, 'B');
manager.postEvent(
'A',
messageA,
);
expect(client.streamNotifyCount, equals(1));
expect(client.notification, equals(messageA));
expect(clientA2.streamNotifyCount, equals(1));
expect(clientA2.notification, equals(messageA));
expect(clientB.streamNotifyCount, equals(0));
expect(clientB.notification, isNull);
manager.postEvent(
'B',
messageB,
);
expect(client.streamNotifyCount, equals(1));
expect(client.notification, equals(messageA));
expect(clientA2.streamNotifyCount, equals(1));
expect(clientA2.notification, equals(messageA));
expect(clientB.streamNotifyCount, equals(1));
expect(clientB.notification, messageB);
});
});
}