Solidify, test and document DTD errors

Change-Id: Ied0f1af43954e47a2c51837bd2fc8d7ce0e03fa7
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/345800
Commit-Queue: Dan Chevalier <danchevalier@google.com>
Reviewed-by: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Dan Chevalier 2024-01-18 16:07:31 +00:00 committed by Commit Queue
parent c9cf5b9916
commit a995f7930b
10 changed files with 378 additions and 99 deletions

View file

@ -5,17 +5,8 @@
import 'package:dtd_impl/dart_tooling_daemon.dart';
void main(List<String> args) async {
final argParser = DartToolingDaemonOptions.createArgParser();
final results = argParser.parse(args);
if (results.wasParsed(DartToolingDaemonOptions.train.name)) {
return;
}
final dartToolingDaemon = await DartToolingDaemon.startService(
await DartToolingDaemon.startService(
args,
shouldLogRequests: true,
); // TODO(@danchevalier): turn off logging
print(
'The Dart Tooling Daemon is listening on ${dartToolingDaemon.uri?.host}:${dartToolingDaemon.uri?.port}',
);
}

View file

@ -117,6 +117,9 @@ any client calls [postEvent](#postevent) on the _streamId_ stream.
If successful responds with [Success](#success).
If the client is already subscribed to the stream, the _103_ (Stream already
subscribed) [RPC error](#rpc-error) code is returned.
#### Code Sample
```json
@ -151,6 +154,9 @@ calls for events on the _streamId_ stream.
If successful responds with [Success](#success).
If the client is not subscribed to the stream, the _104_ (Stream not
subscribed) [RPC error](#rpc-error) code is returned.
#### Code Sample
```json
@ -229,6 +235,10 @@ removed.
If successful responds with [Success](#success).
If the _service_ has already been registered by another client , the _111_ (Service already registered) [RPC error](#rpc-error) code is returned.
If the _method_ has already been registered on the _service_, the _132_ (Service method already registered) [RPC error](#rpc-error) code is returned.
#### Code Sample
```json
@ -269,6 +279,9 @@ the service method.
The result is defined on a case by case basis based on the implementer of the
service method.
If service method does not exist, the -32601 (Method not found)
[RPC error](#rpc-error) code is returned.
#### Code Sample
Assume that a client has registered a service method with:
@ -388,3 +401,40 @@ Methods that respond with Success do so with the following RPC.
```json
{"id": "2", "type": "Success"}
```
## RPC Error
When an RPC encounters an error, it is provided in the _error_
property of the response object. JSON-RPC errors always provide
_code_, _message_, and _data_ properties.
Here is an example error response for our [streamListen](#streamlisten)
request above. This error would be generated if we were attempting to
subscribe to the _GC_ stream multiple times from the same client.
```json
{
"jsonrpc": "2.0",
"error": {
"code": 103,
"message": "Stream already subscribed",
"data": {
"details": "The stream 'GC' is already subscribed"
}
}
"id": "2"
}
```
In addition to the
[error codes](http://www.jsonrpc.org/specification#error_object) specified in
the JSON-RPC spec, we use the following application specific error codes:
code | message | meaning
---- | ------- | -------
-32601 | Method not found | The method does not exist / is not available.
-32602 | Invalid params | Invalid params. Invalid method parameter(s).
103 | Stream already subscribed | The client is already subscribed to the specified _streamId_.
104 | Stream not subscribed | The client is not subscribed to the specified _streamId_.
111 | Service already registered | Service with such name has already been registered by this client.
112 | Service disappeared | Failed to fulfill service request, likely service handler is no longer available.
132 | Service method already registered | Method for the given service has already been registered by this client.

View file

@ -6,7 +6,6 @@ import 'dart:async';
import 'dart:io';
import 'package:args/args.dart';
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'package:shelf/shelf.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
@ -74,21 +73,21 @@ class DartToolingDaemon {
late final DTDStreamManager streamManager;
/// Manages the connected clients of the current [DartToolingDaemon] service.
late final ClientManager clientManager;
late final DTDClientManager clientManager;
final bool _ipv6;
late HttpServer _server;
final List<dtd.DTDConnection> _auxilliaryServices = [];
final bool _shouldLogRequests;
/// The uri of the current [DartToolingDaemon] service.
Uri? get uri => _uri;
Uri? _uri;
Future<void> _startService() async {
Future<void> _startService({required int port}) async {
final host =
(_ipv6 ? InternetAddress.loopbackIPv6 : InternetAddress.loopbackIPv4)
.host;
var port = 0;
// Start the DTD server. Run in an error Zone to ensure that asynchronous
// exceptions encountered during request handling are handled, as exceptions
@ -135,16 +134,27 @@ class DartToolingDaemon {
/// Set [ipv6] to true to have the service use ipv6 instead of ipv4.
///
/// Set [shouldLogRequests] to true to enable logging.
static Future<DartToolingDaemon> startService({
static Future<DartToolingDaemon?> startService(
List<String> args, {
bool ipv6 = false,
bool shouldLogRequests = false,
int port = 0,
}) async {
final argParser = DartToolingDaemonOptions.createArgParser();
final results = argParser.parse(args);
if (results.wasParsed(DartToolingDaemonOptions.train.name)) {
return null;
}
final dtd = DartToolingDaemon._(
ipv6: ipv6,
shouldLogRequests: shouldLogRequests,
);
await dtd._startService();
await dtd._startService(port: port);
await dtd._startAuxilliaryServices();
print(
'The Dart Tooling Daemon is listening on ${dtd.uri?.host}:${dtd.uri?.port}',
);
return dtd;
}
@ -184,6 +194,15 @@ class DartToolingDaemon {
Future<void> _startAuxilliaryServices() async {
final fileService = await dtd.DartToolingDaemon.connect(_uri!);
await DTDFileService.register(fileService);
_auxilliaryServices.add(fileService);
}
Future<void> close() async {
for (var e in _auxilliaryServices) {
await e.close();
}
await clientManager.shutdown();
await _server.close(force: true);
}
}

View file

@ -86,7 +86,9 @@ class DTDClient extends Client {
_clientPeer.registerMethod('streamCancel', _streamCancel);
_clientPeer.registerMethod('postEvent', _postEvent);
_clientPeer.registerMethod('registerService', _registerService);
_clientPeer.registerMethod('getRegisteredStreams', _getRegisteredStreams);
// TODO(danchevalier) implement and document _getRegisteredStreams
// _clientPeer.registerMethod('getRegisteredStreams', _getRegisteredStreams);
// Handle service extension invocations.
_clientPeer.registerFallback(_fallback);
@ -98,10 +100,19 @@ class DTDClient extends Client {
/// 'streamId': the stream to be cancelled.
_streamListen(parameters) async {
final streamId = parameters['streamId'].asString;
await dtd.streamManager.streamListen(
this,
streamId,
);
try {
await dtd.streamManager.streamListen(
this,
streamId,
);
} on StreamAlreadyListeningException catch (_) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kStreamAlreadySubscribed,
data: {
'details': "The stream '$streamId' is already subscribed",
},
);
}
return RPCResponses.success;
}
@ -111,6 +122,15 @@ class DTDClient extends Client {
/// 'streamId': the stream that the client would like to stop listening to.
_streamCancel(parameters) async {
final streamId = parameters['streamId'].asString;
if (!dtd.streamManager.isSubscribed(this, streamId)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kStreamNotSubscribed,
data: {
'details': "Client is not listening to '$streamId'",
},
);
}
await dtd.streamManager.streamCancel(this, streamId);
return RPCResponses.success;
}
@ -139,22 +159,33 @@ class DTDClient extends Client {
final method = parameters['method'].asString;
final combinedName = '$serviceName.$method';
// TODO(danchevalier): enforce only one client can register methods to a
// service.
if (services.containsKey(combinedName)) {
final existingServiceOwnerClient =
dtd.clientManager.findClientThatOwnsService(serviceName);
if (existingServiceOwnerClient != null &&
existingServiceOwnerClient != this) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
data: {
'details':
"Service '$serviceName' is already registered by another client. "
"Only 1 client at a time may register methods to a service.",
},
);
}
if (services.containsKey(combinedName)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceMethodAlreadyRegistered,
data: {
'details': "$combinedName has already been registered by the client.",
},
);
}
services[combinedName] = method;
return RPCResponses.success;
}
_getRegisteredStreams(parameters) {
// TODO(danchevalier) implement this.
return [];
}
/// jrpc fallback handler.
///
/// Handles all service method calls that will be forwarded to the respective

View file

@ -15,4 +15,14 @@ class DTDClientManager extends ClientManager {
super.addClient(client);
client.done.then((_) => removeClient(client));
}
/// Finds a client that has [service] registered to it.
Client? findClientThatOwnsService(String service) {
for (final client in clients) {
if (client.services.keys.any((k) => k.startsWith('$service.'))) {
return client;
}
}
return null;
}
}

View file

@ -31,11 +31,8 @@ class DTDStreamManager extends StreamManager {
stream,
<String, dynamic>{
'streamId': stream,
'event': {
'timestamp': DateTime.now().millisecondsSinceEpoch,
'eventData': eventData,
'eventKind': eventKind,
},
'eventKind': eventKind,
'eventData': eventData,
},
);
}

View file

@ -21,29 +21,14 @@ abstract class RpcErrorCodes {
static const kMethodNotFound = -32601;
static const kInvalidParams = -32602;
// static const kInternalError = -32603;
// static const kExtensionError = -32000;
static const kFeatureDisabled = 100;
// static const kCannotAddBreakpoint = 102;
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;
// static const kIsolateMustBeRunnable = 105;
static const kIsolateMustBePaused = 106;
// static const kCannotResume = 107;
// static const kIsolateIsReloading = 108;
// static const kIsolateReloadBarred = 109;
// static const kIsolateMustHaveReloaded = 110;
static const kServiceAlreadyRegistered = 111;
static const kServiceDisappeared = 112;
static const kExpressionCompilationError = 113;
// static const kInvalidTimelineRequest = 114;
static const kCustomStreamDoesNotExist = 130;
static const kCoreStreamNotAllowed = 131;
static const kServiceMethodAlreadyRegistered = 132;
// Experimental (used in private rpcs).
// static const kFileSystemAlreadyExists = 1001;
@ -51,13 +36,11 @@ abstract class RpcErrorCodes {
// static const kFileDoesNotExist = 1003;
static const errorMessages = {
kFeatureDisabled: 'Feature is disabled',
kStreamAlreadySubscribed: 'Stream already subscribed',
kStreamNotSubscribed: 'Stream not subscribed',
kServiceAlreadyRegistered: 'Service already registered',
kServiceDisappeared: 'Service has disappeared',
kExpressionCompilationError: 'Expression compilation error',
kCustomStreamDoesNotExist: 'Custom stream does not exist',
kCoreStreamNotAllowed: 'Core streams are not allowed',
kServiceMethodAlreadyRegistered:
'The service method has already been registered',
};
}

View file

@ -24,3 +24,4 @@ dependencies:
# See also https://dart.dev/tools/pub/dependencies.
dev_dependencies:
lints: any
test: any

View file

@ -0,0 +1,241 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:dtd_impl/src/rpc_error_codes.dart';
import 'package:json_rpc_2/json_rpc_2.dart';
import 'package:test/test.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:dtd_impl/dart_tooling_daemon.dart';
void main() {
late Peer client1;
late Peer client2;
late DartToolingDaemon? dtd;
setUp(() async {
dtd = await DartToolingDaemon.startService([]);
// Wait for server to start and print to the port to stdout.
final uri = dtd!.uri!.toString();
client1 = _createClient(uri);
client2 = _createClient(uri);
});
tearDown(() async {
await client2.close();
await client1.close();
await dtd?.close();
});
group('streams', () {
final streamId = 'testStream';
final eventKind = 'test';
final eventData = {'the': 'data'};
test('basics', () async {
final completer = Completer();
client1.registerMethod('streamNotify', (Parameters parameters) {
completer.complete(parameters.asMap);
});
final listenResult = await client1.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
final postResult = await client2.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
final dataFromTheStream = await completer.future;
expect(dataFromTheStream, {
"streamId": streamId,
"eventKind": eventKind,
"eventData": eventData,
});
});
test('streamListen the same stream', () async {
final listenResult = await client1.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
expect(
() => client1.sendRequest('streamListen', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamAlreadySubscribed,
),
),
);
});
test('stop listening to a stream that is not being listened to', () {
expect(
() => client1.sendRequest('streamCancel', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamNotSubscribed,
),
),
);
});
test('postEvent when there are no listeners', () async {
final postResult = await client2.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
});
});
group('service methods', () {
final service1 = 'foo1';
final method1 = 'bar1';
final method2 = 'bar2';
final data1 = {"data": 1};
final response1 = {"response": 1};
test('basics', () async {
client1.registerMethod('$service1.$method1', (Parameters parameters) {
return response1;
});
final registerResult = await client1.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
final register2Result = await client1.sendRequest('registerService', {
"service": service1,
"method": method2,
});
expect(register2Result, {"type": "Success"});
final methodResponse = await client2.sendRequest(
'$service1.$method1',
data1,
);
expect(methodResponse, response1);
});
test('registering a service method that already exists', () async {
final registerResult = await client1.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
expect(
() => client1.sendRequest('registerService', {
"service": service1,
"method": method1,
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceMethodAlreadyRegistered,
),
),
);
});
test('calling a method that does not exist', () {
expect(
() => client1.sendRequest('zoo.abc', {}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcException.methodNotFound('zoo.abc').code,
),
),
);
});
test('different clients cannot register the same service', () async {
final registerResult = await client1.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
expect(
() => client2.sendRequest('registerService', {
"service": service1,
"method": method2,
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceAlreadyRegistered,
),
),
);
});
test('releases service methods on disconnect', () async {
final registerResult = await client1.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
await client1.close();
// TODO: replace this polling when notification streams are implemented.
dynamic client2RegisterResult;
for (var i = 0; i < 10; i++) {
try {
// The service method registration should succeed once client1
// finishes closing.
client2RegisterResult = await client2.sendRequest('registerService', {
"service": service1,
"method": method1,
});
break;
} catch (_) {}
await Future.delayed(Duration(seconds: 1));
}
expect(client2RegisterResult, {"type": "Success"});
});
});
}
Peer _createClient(String uri) {
final channel = WebSocketChannel.connect(
Uri.parse(uri),
);
final client = Peer(channel.cast());
unawaited(client.listen());
return client;
}

View file

@ -1,44 +0,0 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// TODO(danchevalier) make these tests relevant.
// import 'dart:io';
// import 'package:http/http.dart';
// import 'package:test/test.dart';
void main() {
// final port = '8080';
// final host = 'http://0.0.0.0:$port';
// late Process p;
// setUp(() async {
// p = await Process.start(
// 'dart',
// ['run', 'bin/dtd_server.dart'],
// environment: {'PORT': port},
// );
// // Wait for server to start and print to stdout.
// await p.stdout.first;
// });
// tearDown(() => p.kill());
// test('Root', () async {
// final response = await get(Uri.parse('$host/'));
// expect(response.statusCode, 200);
// expect(response.body, 'Hello, World!\n');
// });
// test('Echo', () async {
// final response = await get(Uri.parse('$host/echo/hello'));
// expect(response.statusCode, 200);
// expect(response.body, 'hello\n');
// });
// test('404', () async {
// final response = await get(Uri.parse('$host/foobar'));
// expect(response.statusCode, 404);
// });
}