Add obfuscation token to DTD uri

Adding this token adds a small layer of security since the caller needs to have gathered the uri in some way rather than just stumbling on the DTD port.

Change-Id: I0665f4718d162daf94cb49fa1c6f4206c83d77cd
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/355220
Reviewed-by: Kenzie Davisson <kenzieschmoll@google.com>
Commit-Queue: Dan Chevalier <danchevalier@google.com>
Reviewed-by: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Dan Chevalier 2024-03-07 19:33:19 +00:00 committed by Commit Queue
parent 9d043ce763
commit 68c74e2a97
2 changed files with 279 additions and 214 deletions

View file

@ -101,6 +101,13 @@ class DartToolingDaemon {
final String secret;
/// Any requests to DTD must have this token as the first element of the
/// uri path.
///
/// This provides an obfuscation step to prevent bad actors from stumbling
/// onto the dtd address.
final String _uriToken = _generateSecret();
/// The uri of the current [DartToolingDaemon] service.
Uri? get uri => _uri;
Uri? _uri;
@ -118,7 +125,7 @@ class DartToolingDaemon {
() async {
Future<HttpServer?> startServer() async {
try {
return await io.serve(_handlers().handler, host, port);
return await io.serve(_handlers(), host, port);
} on SocketException catch (e) {
errorMessage = e.message;
if (e.osError != null) {
@ -146,7 +153,7 @@ class DartToolingDaemon {
scheme: 'ws',
host: host,
port: _server.port,
path: '/',
path: '/$_uriToken',
);
}
@ -203,10 +210,26 @@ class DartToolingDaemon {
// Attempt to upgrade HTTP requests to a websocket before processing them as
// standard HTTP requests. The websocket handler will fail quickly if the
// request doesn't appear to be a websocket upgrade request.
Cascade _handlers() {
return Cascade().add(_webSocketHandler()).add(_sseHandler());
Handler _handlers() {
return Pipeline().addMiddleware(_uriTokenHandler).addHandler(
Cascade().add(_webSocketHandler()).add(_sseHandler()).handler,
);
}
Handler _uriTokenHandler(Handler innerHandler) => (Request request) {
final forbidden =
Response.forbidden('missing or invalid authentication code');
final pathSegments = request.url.pathSegments;
if (pathSegments.isEmpty) {
return forbidden;
}
final token = pathSegments[0];
if (token != _uriToken) {
return forbidden;
}
return innerHandler(request);
};
Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
final client = DTDClient.fromWebSocket(
this,

View file

@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dtd/dtd.dart' show RpcErrorCodes;
import 'package:json_rpc_2/json_rpc_2.dart';
@ -14,245 +15,286 @@ void main() {
late Peer client;
late DartToolingDaemon? dtd;
late String uri;
setUp(() async {
dtd = await DartToolingDaemon.startService([]);
// Wait for server to start and print to the port to stdout.
uri = dtd!.uri!.toString();
client = _createClient(uri);
});
tearDown(() async {
await client.close();
await dtd?.close();
});
group('streams', () {
final streamId = 'testStream';
final eventKind = 'test';
final eventData = {'the': 'data'};
test(
'forbids connections where the uri token is not the first element in the path',
() async {
dtd = await DartToolingDaemon.startService([]);
test('basics', () async {
var completer = Completer();
client.registerMethod('streamNotify', (Parameters parameters) {
completer.complete(parameters.asMap);
});
final listenResult = await client.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
final postResult = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
final dataFromTheStream = await completer.future;
expect(dataFromTheStream, {
"streamId": streamId,
"eventKind": eventKind,
"eventData": eventData,
"timestamp": anything,
});
// Now cancel the stream
completer = Completer(); // Reset the completer
final cancelResult = await client.sendRequest(
'streamCancel',
{
'streamId': streamId,
},
);
expect(cancelResult, {"type": "Success"});
final postResult2 = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult2, {"type": "Success"});
expect(
completer.future.timeout(
const Duration(seconds: 1),
onTimeout: () => throw TimeoutException('Timed out'),
expect(
() async => await WebSocket.connect(
dtd!.uri!.replace(path: 'someInvalidToken').toString(), // invalid token
),
throwsA(
predicate(
(p0) =>
p0 is WebSocketException &&
RegExp("^Connection to '.*' was not upgraded to websocket\$")
.hasMatch(p0.message),
),
throwsA(predicate((p0) => p0 is TimeoutException)),
);
});
),
);
test('streamListen the same stream', () async {
final listenResult = await client.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
expect(
() => client.sendRequest('streamListen', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamAlreadySubscribed,
),
expect(
() async => await WebSocket.connect(
dtd!.uri!.replace(path: '').toString(), // no token
),
throwsA(
predicate(
(p0) =>
p0 is WebSocketException &&
RegExp("^Connection to '.*' was not upgraded to websocket\$")
.hasMatch(p0.message),
),
);
});
test('stop listening to a stream that is not being listened to', () {
expect(
() => client.sendRequest('streamCancel', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamNotSubscribed,
),
),
);
});
test('postEvent when there are no listeners', () async {
final postResult = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
});
),
);
});
group('service methods', () {
final service1 = 'foo1';
final method1 = 'bar1';
final method2 = 'bar2';
final data1 = {"data": 1};
final response1 = {"response": 1};
group('dtd', () {
setUp(() async {
dtd = await DartToolingDaemon.startService([]);
test('basics', () async {
client.registerMethod('$service1.$method1', (Parameters parameters) {
return response1;
});
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
// Wait for server to start and print to the port to stdout.
uri = dtd!.uri!.toString();
expect(registerResult, {"type": "Success"});
final register2Result = await client.sendRequest('registerService', {
"service": service1,
"method": method2,
});
expect(register2Result, {"type": "Success"});
final methodResponse = await client.sendRequest(
'$service1.$method1',
data1,
);
expect(methodResponse, response1);
client = _createClient(uri);
});
test('registering a service method that already exists', () async {
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
tearDown(() async {
await client.close();
});
group('streams', () {
final streamId = 'testStream';
final eventKind = 'test';
final eventData = {'the': 'data'};
test('basics', () async {
var completer = Completer();
client.registerMethod('streamNotify', (Parameters parameters) {
completer.complete(parameters.asMap);
});
final listenResult = await client.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
final postResult = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
final dataFromTheStream = await completer.future;
expect(dataFromTheStream, {
"streamId": streamId,
"eventKind": eventKind,
"eventData": eventData,
"timestamp": anything,
});
// Now cancel the stream
completer = Completer(); // Reset the completer
final cancelResult = await client.sendRequest(
'streamCancel',
{
'streamId': streamId,
},
);
expect(cancelResult, {"type": "Success"});
final postResult2 = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult2, {"type": "Success"});
expect(
completer.future.timeout(
const Duration(seconds: 1),
onTimeout: () => throw TimeoutException('Timed out'),
),
throwsA(predicate((p0) => p0 is TimeoutException)),
);
});
expect(registerResult, {"type": "Success"});
expect(
() => client.sendRequest('registerService', {
test('streamListen the same stream', () async {
final listenResult = await client.sendRequest('streamListen', {
"streamId": streamId,
});
expect(listenResult, {"type": "Success"});
expect(
() => client.sendRequest('streamListen', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamAlreadySubscribed,
),
),
);
});
test('stop listening to a stream that is not being listened to', () {
expect(
() => client.sendRequest('streamCancel', {
"streamId": streamId,
}),
throwsA(
predicate(
(e) =>
e is RpcException &&
e.code == RpcErrorCodes.kStreamNotSubscribed,
),
),
);
});
test('postEvent when there are no listeners', () async {
final postResult = await client.sendRequest(
'postEvent',
{
'streamId': streamId,
'eventKind': eventKind,
'eventData': eventData,
},
);
expect(postResult, {"type": "Success"});
});
});
group('service methods', () {
final service1 = 'foo1';
final method1 = 'bar1';
final method2 = 'bar2';
final data1 = {"data": 1};
final response1 = {"response": 1};
test('basics', () async {
client.registerMethod('$service1.$method1', (Parameters parameters) {
return response1;
});
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceMethodAlreadyRegistered,
),
),
);
});
});
test('calling a method that does not exist', () {
expect(
() => client.sendRequest('zoo.abc', {}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcException.methodNotFound('zoo.abc').code,
),
),
);
});
expect(registerResult, {"type": "Success"});
test('different clients cannot register the same service', () async {
final client2 = _createClient(uri);
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
expect(
() => client2.sendRequest('registerService', {
final register2Result = await client.sendRequest('registerService', {
"service": service1,
"method": method2,
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceAlreadyRegistered,
),
),
);
});
});
expect(register2Result, {"type": "Success"});
test('releases service methods on disconnect', () async {
final client2 = _createClient(uri);
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
final methodResponse = await client.sendRequest(
'$service1.$method1',
data1,
);
expect(methodResponse, response1);
});
expect(registerResult, {"type": "Success"});
await client.close();
test('registering a service method that already exists', () async {
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
// TODO: replace this polling when notification streams are implemented.
dynamic client2RegisterResult;
for (var i = 0; i < 10; i++) {
try {
// The service method registration should succeed once the other
// finishes closing.
client2RegisterResult = await client2.sendRequest('registerService', {
expect(registerResult, {"type": "Success"});
expect(
() => client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
break;
} catch (_) {}
await Future.delayed(Duration(seconds: 1));
}
expect(client2RegisterResult, {"type": "Success"});
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceMethodAlreadyRegistered,
),
),
);
});
test('calling a method that does not exist', () {
expect(
() => client.sendRequest('zoo.abc', {}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcException.methodNotFound('zoo.abc').code,
),
),
);
});
test('different clients cannot register the same service', () async {
final client2 = _createClient(uri);
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
expect(
() => client2.sendRequest('registerService', {
"service": service1,
"method": method2,
}),
throwsA(
predicate(
(p0) =>
p0 is RpcException &&
p0.code == RpcErrorCodes.kServiceAlreadyRegistered,
),
),
);
});
test('releases service methods on disconnect', () async {
final client2 = _createClient(uri);
final registerResult = await client.sendRequest('registerService', {
"service": service1,
"method": method1,
});
expect(registerResult, {"type": "Success"});
await client.close();
// TODO: replace this polling when notification streams are implemented.
dynamic client2RegisterResult;
for (var i = 0; i < 10; i++) {
try {
// The service method registration should succeed once the other
// finishes closing.
client2RegisterResult =
await client2.sendRequest('registerService', {
"service": service1,
"method": method1,
});
break;
} catch (_) {}
await Future.delayed(Duration(seconds: 1));
}
expect(client2RegisterResult, {"type": "Success"});
});
});
});
}