Connecting DTD and DTD_impl together.

This change adds the happy path for DTD_impl and DTD being able to:
- register streams
- register serviceMethods
- postEvents to streams
- call serviceMethods

Change-Id: I73865071745ef19a4493f86714e0855930243dd5
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/341700
Reviewed-by: Ben Konyi <bkonyi@google.com>
Reviewed-by: Kenzie Davisson <kenzieschmoll@google.com>
Commit-Queue: Dan Chevalier <danchevalier@google.com>
This commit is contained in:
Dan Chevalier 2023-12-19 15:36:22 +00:00 committed by Commit Queue
parent 08e3d3ee5a
commit e58efcb049
17 changed files with 821 additions and 68 deletions

View file

@ -92,8 +92,7 @@ class DartDevelopmentServiceClient {
return await _clientPeer.sendRequest(method, parameters);
}
/// Registers handlers for JSON RPC methods which need to be intercepted by
/// DDS as well as fallback request forwarder.
/// Registers handlers for JSON RPC method endpoints.
void _registerJsonRpcMethods() {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
@ -107,12 +106,22 @@ class DartDevelopmentServiceClient {
return RPCResponses.success;
});
/// jrpc endpoint for cancelling a stream.
///
/// Parameters:
/// 'streamId': the stream to be cancelled.
_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return RPCResponses.success;
});
/// jrpc endpoint for posting an event to a stream.
///
/// Parameters:
/// 'eventKind': the kind of event being sent.
/// 'data': the data being sent over the stream.
/// 'stream: the stream that is being posted to.
_clientPeer.registerMethod('postEvent', (parameters) async {
final eventKind = parameters['eventKind'].asString;
final eventData = parameters['eventData'].asMap;

View file

@ -4,3 +4,4 @@ linter:
rules:
- avoid_void_async
- unawaited_futures
- require_trailing_commas

View file

@ -5,6 +5,9 @@
import 'package:dtd/dtd.dart';
void main() {
// TODO(@danchevalier) make simple testing services for ide's to know that
// things are working.
// TODO(@danchevalier): make this example meaningful
DartToolingDaemon.connect(Uri.parse('wss://127.0.0.1:12345'));
}

View file

@ -0,0 +1,68 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:dtd/dtd.dart';
//Extension side
class Bar extends DTDResponse {
late String baz;
late int bazCount;
late String bazDescription;
// ignore: use_super_parameters
Bar.fromDTDResponse(DTDResponse response) : super.fromDTDResponse(response) {
baz = result['baz'] as String;
bazCount = result['bazCount'] as int;
bazDescription = result['bazDescription'] as String;
}
@override
String toString() {
return 'Bar(baz:$baz, bazCount:$bazCount, bazDescription:$bazDescription)';
}
}
extension FooServiceExtension on DTDConnection {
Future<Bar> barExtension() async {
final result = await call(
'Foo',
'bar',
params: {
'baz': 'the baz',
'bazCount': 1,
'bazDescription': 'there is one baz',
},
);
return Bar.fromDTDResponse(result);
}
}
void main(List<String> args) async {
final url = args[0]; // pass the url as a param to the example
final fooService = await DartToolingDaemon.connect(Uri.parse('ws://$url'));
final client = await DartToolingDaemon.connect(Uri.parse('ws://$url'));
await fooService.registerService(
'Foo',
'bar',
(params) async {
final baz = params['baz'].value;
final bazCount = params['bazCount'].value;
final bazDescription = params['bazDescription'].value;
final result = {
'type': 'Bar',
'baz': baz,
'bazCount': bazCount,
'bazDescription': bazDescription,
};
return result;
},
);
final response = await client.barExtension();
final bar = Bar.fromDTDResponse(response);
print('Got a bar response: $bar');
}

View file

@ -0,0 +1,28 @@
import 'package:dtd/dtd.dart';
void main(List<String> args) async {
final url = args[0]; // pass the url as a param to the example
final clientA = await DartToolingDaemon.connect(Uri.parse('ws://$url'));
final clientB = await DartToolingDaemon.connect(Uri.parse('ws://$url'));
clientA.onEvent('Foo').listen((event) {
print('A Received $event from Foo Stream');
});
clientB.onEvent('Foo').listen((event) {
print('B Received $event from Foo Stream');
});
await clientA.streamListen('Foo');
await clientB.streamListen('Foo');
clientA.postEvent('Foo', 'kind1', {'event': 1});
clientB.postEvent('Foo', 'kind2', {'event': 2});
// delayed so the Daemon connection is still up by the time the events come
// back.
await Future<void>.delayed(const Duration(seconds: 10));
await clientA.close();
await clientB.close();
}

View file

@ -3,13 +3,15 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'dtd_connection.dart';
// TODO(@danchevalier) make sure that there is html documentation just like the
// analysis server.
abstract class DartToolingDaemon {
// TODO(@danchevalier)
// TODO(@danchevalier) Dart Docs
static Future<DTDConnection> connect(Uri uri) async {
final channel = WebSocketChannel.connect(uri);
return DTDConnection(channel);

View file

@ -2,43 +2,94 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'package:json_rpc_2/json_rpc_2.dart';
import 'package:stream_channel/stream_channel.dart';
typedef DTDServiceCallback = Future<Map<String, Object?>> Function(
Parameters params,
);
// TODO(danchevalier): add a serviceMethodIsAvailable experience. it will listen
// to a stream that announces servicemethods getting registered and
// unregistered. The state can then be presented as a listenable so that clients
// can gate their behaviour on a serviceMethod going up/down.
// TODO(danchevalier) dart docs
class DTDConnection {
DTDConnection(this._connectionChannel) {
// TODO(@danchevalier);
late final Peer _clientPeer;
late final Future _done;
final _subscribedStreamControllers = <String, StreamController<DTDEvent>>{};
DTDConnection(this._connectionChannel)
: _clientPeer = Peer(_connectionChannel.cast<String>()) {
_clientPeer.registerMethod('streamNotify', (Parameters params) {
final streamId = params['streamId'].value as String;
final event = params['event'];
final eventKind = event['eventKind'].value as String;
final eventData = event['eventData'].value as Map<String, Object?>;
final timestamp = event['timestamp'].value as int;
_subscribedStreamControllers[streamId]?.add(
DTDEvent(streamId, eventKind, eventData, timestamp),
);
});
_done = _clientPeer.listen();
}
/// Terminates the connection with the Dart Tooling Daemon.
Future<void> close() async {
// TODO(@danchevalier)
return;
}
Future<void> close() => _clientPeer.close();
// TODO(@danchevalier)
/// A `Future` that completes when the connection with the Dart Tooling Daemon
/// is terminated.
Future<void> get done async => Future.value();
Future<void> get done => _done;
/// Returns the current list of services available.
Future<List<String>> getRegisteredServices() async {
// TODO(@danchevalier)
return Future.value([]);
return await _clientPeer.sendRequest(
'getRegisteredServices',
) as List<String>;
}
/// Returns the current list of streams with active subscribers.
Future<List<String>> getRegisteredStreams() async {
// TODO(@danchevalier)
return Future.value([]);
return await _clientPeer.sendRequest(
'getRegisteredStreams',
) as List<String>;
}
Future<void> registerService(
String service,
String method,
DTDServiceCallback callback,
) async {
final combinedName = '$service.$method';
await _clientPeer.sendRequest('registerService', {
'service': service,
'method': method,
});
_clientPeer.registerMethod(
combinedName,
callback,
);
}
/// Subscribes this client to events posted on [streamId].
///
/// If this client is already subscribed to [streamId], an exception will be
/// thrown.
Future<void> streamListen(String streamId) async {
Future<void> streamListen(String streamId) {
// TODO(@danchevalier)
return;
return _clientPeer.sendRequest(
'streamListen',
{
'streamId': streamId,
},
);
}
/// Cancel the subscription to [streamId].
@ -48,9 +99,14 @@ class DTDConnection {
///
/// If this client was not subscribed to [streamId], an exception will be
/// thrown.
Future<void> streamCancel(Stream streamId) async {
Future<void> streamCancel(Stream streamId) {
// TODO(@danchevalier)
return;
return _clientPeer.sendRequest(
'streamCancel',
{
'streamId': streamId,
},
);
}
/// Creates a `Stream` for events received on [streamId].
@ -59,15 +115,30 @@ class DTDConnection {
/// events aren't dropped. [streamListen(streamId)] must be called before any
/// events will appear on the returned stream.
Stream<DTDEvent> onEvent(String streamId) {
// TODO(@danchevalier)
return const Stream.empty();
return _subscribedStreamControllers
.putIfAbsent(
streamId,
StreamController<DTDEvent>.new,
)
.stream;
}
/// Posts an [DTDEvent] with [eventData] to [streamId].
///
/// If no clients are subscribed to [streamId], the event will be dropped.
void postEvent(String streamId, Map<String, Object?> eventData) {
// TODO(@danchevalier)
void postEvent(
String streamId,
String eventKind,
Map<String, Object?> eventData,
) {
_clientPeer.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
}
/// Invokes a service with the name `serviceName.methodName`.
@ -80,27 +151,86 @@ class DTDConnection {
///
/// If the parameters included in [params] are invalid, an exception will be
/// thrown.
Future<T> call<T extends DTDResponse>(
Future<DTDResponse> call(
String serviceName,
String methodName, {
Map<String, Object>? params,
}) async {
// TODO(@danchevalier)
// ignore: null_argument_to_non_null_type
return Future.value();
final json = await _clientPeer.sendRequest(
'$serviceName.$methodName',
params ?? <String, dynamic>{},
) as Map<String, Object?>;
final type = json['type'] as String?;
if (type == null) {
throw DTDConnectionException.callResponseMissingType(json);
}
//TODO(danchevalier): Find out how to get access to the id.
return DTDResponse('-1', type, json);
}
// ignore: unused_field
final StreamChannel _connectionChannel;
}
abstract class DTDResponse {
String get id;
class DTDResponse {
DTDResponse(this._id, this._type, this._result);
String get type;
DTDResponse.fromDTDResponse(DTDResponse other)
: this(
other.id,
other.type,
other.result,
);
final String _id;
final String _type;
final Map<String, Object?> _result;
Map<String, Object?> get json;
String get id => _id;
String get type => _type;
Map<String, Object?> get result => _result;
}
// TODO(@danchevalier): is this how event should be done?
abstract class DTDEvent {}
class DTDEvent {
DTDEvent(this.stream, this.kind, this.data, this.timestamp);
String stream;
int timestamp;
String kind;
Map<String, Object?> data;
@override
String toString() {
return jsonEncode({
'stream': stream,
'timestamp': timestamp,
'kind': kind,
'data': data,
});
}
}
class DTDConnectionException implements Exception {
static const int callParamsMissingTypeError = 1;
/// The response to a call method is missing the top level type parameter.
factory DTDConnectionException.callResponseMissingType(
Map<String, Object?> json,
) {
return DTDConnectionException._(
callParamsMissingTypeError,
'call received an invalid response, '
"it is missing the 'type' param. Got: $json",
);
}
DTDConnectionException._(this.errorCode, this.message);
@override
String toString() => 'DTDConnectionException: $message';
final int errorCode;
final String message;
}

View file

@ -7,6 +7,7 @@ environment:
sdk: ">=3.0.0 <4.0.0"
dependencies:
json_rpc_2: ^3.0.2
stream_channel: ^2.1.2
web_socket_channel: ^2.4.0

View file

@ -4,3 +4,4 @@ linter:
rules:
- avoid_void_async
- unawaited_futures
- require_trailing_commas

View file

@ -0,0 +1,19 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
abstract class RPCResponses {
static const success = <String, dynamic>{
'type': 'Success',
};
static const collectedSentinel = <String, dynamic>{
'type': 'Sentinel',
'kind': 'Collected',
'valueAsString': '<collected>',
};
}
// Give connections time to reestablish before considering them closed.
// Required to reestablish connections killed by UberProxy.
const sseKeepAlive = Duration(seconds: 30);

View file

@ -0,0 +1,197 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'package:shelf/shelf.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'constants.dart';
import 'dtd_client.dart';
import 'dtd_client_manager.dart';
import 'dtd_stream_manager.dart';
/// A service that facilitates communication between dart tools.
class DartToolingDaemon {
DartToolingDaemon._({
bool ipv6 = false,
bool shouldLogRequests = false,
}) : _ipv6 = ipv6,
_shouldLogRequests = shouldLogRequests {
streamManager = DTDStreamManager(this);
clientManager = DTDClientManager();
}
static const _kSseHandlerPath = '\$debugHandler';
/// Manages the streams for the current [DartToolingDaemon] service.
late final DTDStreamManager streamManager;
/// Manages the connected clients of the current [DartToolingDaemon] service.
late final ClientManager clientManager;
final bool _ipv6;
late HttpServer _server;
final bool _shouldLogRequests;
/// The uri of the current [DartToolingDaemon] service.
Uri? get uri => _uri;
Uri? _uri;
Future<void> _startService() async {
final host =
(_ipv6 ? InternetAddress.loopbackIPv6 : InternetAddress.loopbackIPv4)
.host;
var port = 0;
// Start the DTD server. Run in an error Zone to ensure that asynchronous
// exceptions encountered during request handling are handled, as exceptions
// thrown during request handling shouldn't take down the entire service.
late String errorMessage;
final tmpServer = await runZonedGuarded(
() async {
Future<HttpServer?> startServer() async {
try {
return await io.serve(_handlers().handler, host, port);
} on SocketException catch (e) {
errorMessage = e.message;
if (e.osError != null) {
errorMessage += ' (${e.osError!.message})';
}
errorMessage += ': ${e.address?.host}:${e.port}';
return null;
}
}
return await startServer();
},
(error, stack) {
if (_shouldLogRequests) {
print('Asynchronous error: $error\n$stack');
}
},
);
if (tmpServer == null) {
throw DartToolingDaemonException.connectionIssue(errorMessage);
}
_server = tmpServer;
_uri = Uri(
scheme: 'http',
host: host,
port: _server.port,
path: '/',
);
}
/// Starts a [DartToolingDaemon] service.
///
/// Set [ipv6] to true to have the service use ipv6 instead of ipv4.
///
/// Set [shouldLogRequests] to true to enable logging.
static Future<DartToolingDaemon> startService({
bool ipv6 = false,
bool shouldLogRequests = false,
}) async {
final dtd = DartToolingDaemon._(
ipv6: ipv6,
shouldLogRequests: shouldLogRequests,
);
await dtd._startService();
return dtd;
}
// Attempt to upgrade HTTP requests to a websocket before processing them as
// standard HTTP requests. The websocket handler will fail quickly if the
// request doesn't appear to be a websocket upgrade request.
Cascade _handlers() {
return Cascade().add(_webSocketHandler()).add(_sseHandler());
}
Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
final client = DTDClient.fromWebSocket(
this,
ws,
);
clientManager.addClient(client);
});
Handler _sseHandler() {
final handler = SseHandler(
Uri.parse('/$_kSseHandlerPath'),
keepAlive: sseKeepAlive,
);
handler.connections.rest.listen((sseConnection) {
final client = DTDClient.fromSSEConnection(
this,
sseConnection,
);
clientManager.addClient(client);
});
return handler.handler;
}
}
// TODO(danchevalier): clean up these exceptions so they are more relevant to
// DTD. Also add docs to the factories that remain.
class DartToolingDaemonException implements Exception {
// TODO(danchevalier): add a relevant dart doc here
static const int existingDtdInstanceError = 1;
/// Set when the connection to the remote VM service terminates unexpectedly
/// during Dart Development Service startup.
static const int failedToStartError = 2;
/// Set when a connection error has occurred after startup.
static const int connectionError = 3;
factory DartToolingDaemonException.existingDtdInstance(
String message, {
Uri? dtdUri,
}) {
return ExistingDTDImplException._(message, dtdUri: dtdUri);
}
factory DartToolingDaemonException.failedToStart() {
return DartToolingDaemonException._(
failedToStartError,
'Failed to start Dart Development Service',
);
}
factory DartToolingDaemonException.connectionIssue(String message) {
return DartToolingDaemonException._(connectionError, message);
}
DartToolingDaemonException._(this.errorCode, this.message);
@override
String toString() => 'DartDevelopmentServiceException: $message';
final int errorCode;
final String message;
}
class ExistingDTDImplException extends DartToolingDaemonException {
ExistingDTDImplException._(
String message, {
this.dtdUri,
}) : super._(
DartToolingDaemonException.existingDtdInstanceError,
message,
);
/// The URI of the existing DTD instance, if available.
///
/// This URL is the base HTTP URI such as `http://127.0.0.1:1234/AbcDefg=/`,
/// not the WebSocket URI (which can be obtained by mapping the scheme to
/// `ws` (or `wss`) and appending `ws` to the path segments).
final Uri? dtdUri;
}

View file

@ -0,0 +1,183 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'constants.dart';
import 'dart_tooling_daemon.dart';
import 'rpc_error_codes.dart';
/// Represents a client that is connected to a DTD service.
class DTDClient extends Client {
final StreamChannel connection;
late json_rpc.Peer _clientPeer;
final DartToolingDaemon dtd;
late final Future _done;
Future get done => _done;
DTDClient.fromWebSocket(
DartToolingDaemon dtd,
WebSocketChannel ws,
) : this._(
dtd,
ws,
);
DTDClient.fromSSEConnection(
DartToolingDaemon dtd,
SseConnection sse,
) : this._(
dtd,
sse,
);
DTDClient._(
this.dtd,
this.connection,
) {
_clientPeer = json_rpc.Peer(
connection.cast<String>(),
strictProtocolChecks: false,
);
_registerJsonRpcMethods();
_done = listen();
}
@override
Future<void> close() => _clientPeer.close();
@override
Future<dynamic> sendRequest({
required String method,
dynamic parameters,
}) async {
if (_clientPeer.isClosed) {
return;
}
return await _clientPeer.sendRequest(method, parameters.asMap);
}
@override
void streamNotify(String streamId, Object eventData) {
_clientPeer.sendNotification('streamNotify', eventData);
}
/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dtd.streamManager.onClientDisconnect(this),
);
/// Registers handlers for the Dart Tooling Daemon JSON RPC method endpoints.
void _registerJsonRpcMethods() {
// TODO(danchevalier): do a once over of all methods and ensure that we have
// all necessary validations.
_clientPeer.registerMethod('streamListen', _streamListen);
_clientPeer.registerMethod('streamCancel', _streamCancel);
_clientPeer.registerMethod('postEvent', _postEvent);
_clientPeer.registerMethod('registerService', _registerService);
_clientPeer.registerMethod('getRegisteredStreams', _getRegisteredStreams);
// Handle service extension invocations.
_clientPeer.registerFallback(_fallback);
}
/// jrpc endpoint for cancelling a stream.
///
/// Parameters:
/// 'streamId': the stream to be cancelled.
_streamListen(parameters) async {
final streamId = parameters['streamId'].asString;
await dtd.streamManager.streamListen(
this,
streamId,
);
return RPCResponses.success;
}
/// jrpc endpoint for stopping listening to a stream.
///
/// Parameters:
/// 'streamId': the stream that the client would like to stop listening to.
_streamCancel(parameters) async {
final streamId = parameters['streamId'].asString;
await dtd.streamManager.streamCancel(this, streamId);
return RPCResponses.success;
}
/// jrpc endpoint for posting an event to a stream.
///
/// Parameters:
/// 'eventKind': the kind of event being sent.
/// 'data': the data being sent over the stream.
/// 'stream: the stream that is being posted to.
_postEvent(parameters) async {
final eventKind = parameters['eventKind'].asString;
final eventData = parameters['eventData'].asMap;
final stream = parameters['streamId'].asString;
dtd.streamManager.postEventHelper(stream, eventKind, eventData);
return RPCResponses.success;
}
/// jrpc endpoint for registering a service to the tooling daemon.
///
/// Parameters:
/// 'service': the name of the service that is being registered to.
/// 'method': the name of the method that is being registered on the service.
_registerService(parameters) {
final serviceName = parameters['service'].asString;
final method = parameters['method'].asString;
final combinedName = '$serviceName.$method';
// TODO(danchevalier): enforce only one client can register methods to a
// service.
if (services.containsKey(combinedName)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
);
}
services[combinedName] = method;
return RPCResponses.success;
}
_getRegisteredStreams(parameters) {
// TODO(danchevalier) implement this.
return [];
}
/// jrpc fallback handler.
///
/// Handles all service method calls that will be forwarded to the respective
/// client which registered that service method.
_fallback(parameters) async {
// Lookup the client associated with the service extension's namespace.
// If the client exists and that client has registered the specified
// method, forward the request to that client.
final serviceMethod = parameters.method;
final client = dtd.clientManager.findFirstClientThatHandlesService(
serviceMethod,
);
if (client == null) {
throw json_rpc.RpcException(
RpcErrorCodes.kMethodNotFound,
'Unknown service method: $serviceMethod',
);
}
return await client.sendRequest(
method: serviceMethod,
parameters: parameters,
);
}
}

View file

@ -0,0 +1,18 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'dtd_client.dart';
/// Used for keeping track and managing clients that are connected to a given
/// service.
class DTDClientManager extends ClientManager {
@override
void addClient(Client client) {
client as DTDClient;
super.addClient(client);
client.done.then((_) => removeClient(client));
}
}

View file

@ -2,38 +2,14 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:io';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart';
import 'package:shelf_router/shelf_router.dart';
// Configure routes.
final _router = Router()
..get('/', _rootHandler)
..get('/echo/<message>', _echoHandler);
Response _rootHandler(Request req) {
return Response.ok('Hello, World!\n');
}
Response _echoHandler(Request request) {
final message = request.params['message'];
return Response.ok('$message\n');
}
import 'dart_tooling_daemon.dart';
void main(List<String> args) async {
// Use any available host or container IP (usually `0.0.0.0`).
final ip = InternetAddress.anyIPv4;
final dartToolingDaemon = await DartToolingDaemon.startService(
shouldLogRequests: true,
); // TODO(@danchevalier): turn off logging
// Configure a pipeline that logs requests.
final handler =
Pipeline().addMiddleware(logRequests()).addHandler(_router.call);
// For running in containers, we respect the PORT environment variable.
final port = int.parse(Platform.environment['PORT'] ?? '8080');
final server = await serve(handler, ip, port);
print(
'The Dart Tooling Daemon is listening on ${server.address.host}:${server.port}',
'The Dart Tooling Daemon is listening on ${dartToolingDaemon.uri?.host}:${dartToolingDaemon.uri?.port}',
);
}

View file

@ -0,0 +1,50 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'dart_tooling_daemon.dart';
/// Manages state related to stream subscriptions made by [DTDClient]s.
class DTDStreamManager extends StreamManager {
DTDStreamManager(this.dtd);
final DartToolingDaemon dtd;
/// Send an event to the [stream].
///
/// [stream] must be a registered custom stream (i.e., not a stream specified
/// as part of the VM service protocol).
///
/// If [stream] is not a registered custom stream, an [RPCError] with code
/// [kCustomStreamDoesNotExist] will be thrown.
///
/// If [stream] is a core stream, an [RPCError] with code
/// [kCoreStreamNotAllowed] will be thrown.
void postEventHelper(
String stream,
String eventKind,
Map<String, Object?> eventData,
) {
super.postEvent(
stream,
<String, dynamic>{
'streamId': stream,
'event': {
'timestamp': DateTime.now().millisecondsSinceEpoch,
'eventData': eventData,
'eventKind': eventKind,
},
},
);
}
/// Send `streamNotify` notifications to clients subscribed to `streamId`.
void streamNotify(
String streamId,
Map<String, Object?> data,
) {
super.postEvent(streamId, data);
}
}

View file

@ -0,0 +1,63 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
// TODO(danchevalier): get this from DDS instead.
abstract class RpcErrorCodes {
static json_rpc.RpcException buildRpcException(int code, {dynamic data}) {
return json_rpc.RpcException(
code,
errorMessages[code]!,
data: data,
);
}
// These error codes must be kept in sync with those in vm/json_stream.h and
// vmservice.dart.
// static const kParseError = -32700;
// static const kInvalidRequest = -32600;
static const kMethodNotFound = -32601;
static const kInvalidParams = -32602;
// static const kInternalError = -32603;
// static const kExtensionError = -32000;
static const kFeatureDisabled = 100;
// static const kCannotAddBreakpoint = 102;
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;
// static const kIsolateMustBeRunnable = 105;
static const kIsolateMustBePaused = 106;
// static const kCannotResume = 107;
// static const kIsolateIsReloading = 108;
// static const kIsolateReloadBarred = 109;
// static const kIsolateMustHaveReloaded = 110;
static const kServiceAlreadyRegistered = 111;
static const kServiceDisappeared = 112;
static const kExpressionCompilationError = 113;
// static const kInvalidTimelineRequest = 114;
static const kCustomStreamDoesNotExist = 130;
static const kCoreStreamNotAllowed = 131;
// Experimental (used in private rpcs).
// static const kFileSystemAlreadyExists = 1001;
// static const kFileSystemDoesNotExist = 1002;
// static const kFileDoesNotExist = 1003;
static const errorMessages = {
kFeatureDisabled: 'Feature is disabled',
kStreamAlreadySubscribed: 'Stream already subscribed',
kStreamNotSubscribed: 'Stream not subscribed',
kServiceAlreadyRegistered: 'Service already registered',
kServiceDisappeared: 'Service has disappeared',
kExpressionCompilationError: 'Expression compilation error',
kCustomStreamDoesNotExist: 'Custom stream does not exist',
kCoreStreamNotAllowed: 'Core streams are not allowed',
};
}

View file

@ -4,17 +4,21 @@ repository: https://github.com/dart-lang/sdk/tree/main/pkg/dtd_impl
publish_to: none
environment:
sdk: ^3.3.0-152.0.dev
sdk: ">=3.0.0 <4.0.0"
# Use 'any' constraints here; we get our versions from the DEPS file.
dependencies:
dart_service_protocol_shared: any
json_rpc_2: any
shelf: any
shelf_router: any
shelf_web_socket: any
sse: any
stream_channel: any
web_socket_channel: any
# We use 'any' version constraints here as we get our package versions from
# the dart-lang/sdk repo's DEPS file. Note that this is a special case; the
# best practice for packages is to specify their compatible version ranges.
# See also https://dart.dev/tools/pub/dependencies.
# dev_dependencies:
# http: any
# lints: any
# test: any
dev_dependencies:
lints: any