[ package:dds ] Add locking when modifying DDS state via client requests

Fixes https://github.com/dart-lang/sdk/issues/46696

Change-Id: I666b59a0661f4df3b1f0a47aba52096133f5fbb7
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/209140
Reviewed-by: Anna Gringauze <annagrin@google.com>
This commit is contained in:
Ben Konyi 2021-08-05 17:56:01 +00:00
parent b8b041ffb4
commit 0ecfc7da6f
6 changed files with 218 additions and 135 deletions

View file

@ -11,7 +11,7 @@
"constraint, update this by running tools/generate_package_config.dart."
],
"configVersion": 2,
"generated": "2021-08-04T16:42:24.433381",
"generated": "2021-08-05T11:33:04.746536",
"generator": "tools/generate_package_config.dart",
"packages": [
{
@ -256,7 +256,7 @@
"name": "dds",
"rootUri": "../pkg/dds",
"packageUri": "lib/",
"languageVersion": "2.12"
"languageVersion": "2.14"
},
{
"name": "dev_compiler",

View file

@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:dds/src/utils/mutex.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'client.dart';
@ -139,32 +140,36 @@ class IsolateManager {
}
void _updateIsolateState(String id, String name, String eventKind) {
switch (eventKind) {
case ServiceEvents.isolateStart:
isolateStarted(id, name);
break;
case ServiceEvents.isolateExit:
isolateExited(id);
break;
default:
final isolate = isolates[id];
_mutex.runGuarded(
() {
switch (eventKind) {
case ServiceEvents.pauseExit:
isolate!.pausedOnExit();
case ServiceEvents.isolateStart:
isolateStarted(id, name);
break;
case ServiceEvents.pausePostRequest:
isolate!.pausedPostRequest();
break;
case ServiceEvents.pauseStart:
isolate!.pausedOnStart();
break;
case ServiceEvents.resume:
isolate!.resumed();
case ServiceEvents.isolateExit:
isolateExited(id);
break;
default:
break;
final isolate = isolates[id];
switch (eventKind) {
case ServiceEvents.pauseExit:
isolate!.pausedOnExit();
break;
case ServiceEvents.pausePostRequest:
isolate!.pausedPostRequest();
break;
case ServiceEvents.pauseStart:
isolate!.pausedOnStart();
break;
case ServiceEvents.resume:
isolate!.resumed();
break;
default:
break;
}
}
}
},
);
}
/// Initializes the set of running isolates.
@ -172,25 +177,30 @@ class IsolateManager {
if (_initialized) {
return;
}
final vm = await dds.vmServiceClient.sendRequest('getVM');
final List<Map> isolateRefs = vm['isolates'].cast<Map<String, dynamic>>();
// Check the pause event for each isolate to determine whether or not the
// isolate is already paused.
for (final isolateRef in isolateRefs) {
final id = isolateRef['id'];
final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
'isolateId': id,
});
final name = isolate['name'];
if (isolate.containsKey('pauseEvent')) {
isolates[id] = _RunningIsolate(this, id, name);
final eventKind = isolate['pauseEvent']['kind'];
_updateIsolateState(id, name, eventKind);
} else {
// If the isolate doesn't have a pauseEvent, assume it's running.
isolateStarted(id, name);
}
}
await _mutex.runGuarded(
() async {
final vm = await dds.vmServiceClient.sendRequest('getVM');
final List<Map> isolateRefs =
vm['isolates'].cast<Map<String, dynamic>>();
// Check the pause event for each isolate to determine whether or not the
// isolate is already paused.
for (final isolateRef in isolateRefs) {
final id = isolateRef['id'];
final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
'isolateId': id,
});
final name = isolate['name'];
if (isolate.containsKey('pauseEvent')) {
isolates[id] = _RunningIsolate(this, id, name);
final eventKind = isolate['pauseEvent']['kind'];
_updateIsolateState(id, name, eventKind);
} else {
// If the isolate doesn't have a pauseEvent, assume it's running.
isolateStarted(id, name);
}
}
},
);
_initialized = true;
}
@ -218,16 +228,20 @@ class IsolateManager {
DartDevelopmentServiceClient client,
json_rpc.Parameters parameters,
) async {
final isolateId = parameters['isolateId'].asString;
final isolate = isolates[isolateId];
if (isolate == null) {
return RPCResponses.collectedSentinel;
}
if (isolate.shouldResume(resumingClient: client)) {
isolate.clearResumeApprovals();
return await _sendResumeRequest(isolateId, parameters);
}
return RPCResponses.success;
return await _mutex.runGuarded(
() async {
final isolateId = parameters['isolateId'].asString;
final isolate = isolates[isolateId];
if (isolate == null) {
return RPCResponses.collectedSentinel;
}
if (isolate.shouldResume(resumingClient: client)) {
isolate.clearResumeApprovals();
return await _sendResumeRequest(isolateId, parameters);
}
return RPCResponses.success;
},
);
}
/// Forwards a `resume` request to the VM service.
@ -248,5 +262,6 @@ class IsolateManager {
bool _initialized = false;
final DartDevelopmentServiceImpl dds;
final _mutex = Mutex();
final Map<String, _RunningIsolate> isolates = {};
}

View file

@ -10,6 +10,7 @@ import 'client.dart';
import 'dds_impl.dart';
import 'logging_repository.dart';
import 'rpc_error_codes.dart';
import 'utils/mutex.dart';
class StreamManager {
StreamManager(this.dds);
@ -133,51 +134,56 @@ class StreamManager {
DartDevelopmentServiceClient? client,
String stream,
) async {
assert(stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// Initialize the list of clients for the new stream before we do
// anything else to ensure multiple clients registering for the same
// stream in quick succession doesn't result in multiple streamListen
// requests being sent to the VM service.
streamListeners[stream] = <DartDevelopmentServiceClient>[];
if ((stream == kDebugStream && client == null) ||
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
}
}
if (streamListeners[stream]!.contains(client)) {
throw kStreamAlreadySubscribedException;
}
if (client != null) {
streamListeners[stream]!.add(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream]!.sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
// Send all previously registered service extensions when a client
// subscribes to the Service stream.
for (final c in dds.clientManager.clients) {
if (c == client) {
continue;
}
final namespace = dds.getNamespace(c);
for (final service in c.services.keys) {
client.sendNotification(
'streamNotify',
_buildStreamRegisteredEvent(
namespace!,
service,
c.services[service]!,
),
);
await _mutex.runGuarded(
() async {
assert(stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// Initialize the list of clients for the new stream before we do
// anything else to ensure multiple clients registering for the same
// stream in quick succession doesn't result in multiple streamListen
// requests being sent to the VM service.
streamListeners[stream] = <DartDevelopmentServiceClient>[];
if ((stream == kDebugStream && client == null) ||
stream != kDebugStream) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result =
await dds.vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
}
}
}
}
if (streamListeners[stream]!.contains(client)) {
throw kStreamAlreadySubscribedException;
}
if (client != null) {
streamListeners[stream]!.add(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream]!.sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
// Send all previously registered service extensions when a client
// subscribes to the Service stream.
for (final c in dds.clientManager.clients) {
if (c == client) {
continue;
}
final namespace = dds.getNamespace(c);
for (final service in c.services.keys) {
client.sendNotification(
'streamNotify',
_buildStreamRegisteredEvent(
namespace!,
service,
c.services[service]!,
),
);
}
}
}
}
},
);
}
List<Map<String, dynamic>>? getStreamHistory(String stream) {
@ -198,27 +204,32 @@ class StreamManager {
String stream, {
bool cancelCoreStream = false,
}) async {
assert(stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || client != null && !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
// Don't cancel streams DDS needs to function.
if (listeners.isEmpty &&
(!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
streamListeners.remove(stream);
// Ensure the VM service hasn't shutdown.
if (dds.vmServiceClient.isClosed) {
return;
}
final result = await dds.vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
await _mutex.runGuarded(
() async {
assert(stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null ||
client != null && !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
// Don't cancel streams DDS needs to function.
if (listeners.isEmpty &&
(!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
streamListeners.remove(stream);
// Ensure the VM service hasn't shutdown.
if (dds.vmServiceClient.isClosed) {
return;
}
final result = await dds.vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
},
);
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
@ -280,4 +291,5 @@ class StreamManager {
final DartDevelopmentServiceImpl dds;
final streamListeners = <String, List<DartDevelopmentServiceClient>>{};
final _mutex = Mutex();
}

View file

@ -0,0 +1,48 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
typedef _LockRequest = Completer<void>;
/// Used to protect global state accessed in blocks containing calls to
/// asynchronous methods.
class Mutex {
/// Executes a block of code containing asynchronous calls atomically.
///
/// If no other asynchronous context is currently executing within
/// [criticalSection], it will immediately be called. Otherwise, the caller
/// will be suspended and entered into a queue to be resumed once the lock is
/// released.
Future<T> runGuarded<T>(FutureOr<T> Function() criticalSection) async {
try {
await _acquireLock();
return await criticalSection();
} finally {
_releaseLock();
}
}
Future<void> _acquireLock() async {
if (!_locked) {
_locked = true;
return;
}
final request = _LockRequest();
_outstandingRequests.add(request);
await request.future;
}
void _releaseLock() {
_locked = false;
if (_outstandingRequests.isNotEmpty) {
final request = _outstandingRequests.removeFirst();
request.complete();
}
}
bool _locked = false;
final _outstandingRequests = Queue<_LockRequest>();
}

View file

@ -8,7 +8,7 @@ version: 2.0.2
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds
environment:
sdk: '>=2.12.0 <3.0.0'
sdk: '>=2.14.0 <3.0.0'
dependencies:
async: ^2.4.1

View file

@ -3,10 +3,13 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:io';
import 'dart:math';
import 'package:dds/dds.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';
void main() {
@ -27,27 +30,32 @@ void main() {
process.kill();
});
Future<void> streamSubscribeUnsubscribe(
VmService client, {
required bool delay,
}) async {
await client.streamListen('Service');
await Future.delayed(
Duration(milliseconds: delay ? Random().nextInt(200) : 0),
);
await client.streamCancel('Service');
}
test('Ensure streamListen and streamCancel calls are handled atomically',
() async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final connection1 = await vmServiceConnectUri(dds.wsUri.toString());
final connection2 = await vmServiceConnectUri(dds.wsUri.toString());
for (int i = 0; i < 100; ++i) {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final connection1 = await vmServiceConnectUri(dds.wsUri.toString());
final connection2 = await vmServiceConnectUri(dds.wsUri.toString());
for (int i = 0; i < 50; ++i) {
final listenFutures = <Future>[
connection1.streamListen('Service'),
connection2.streamListen('Service'),
];
await Future.wait(listenFutures);
final cancelFutures = <Future>[
connection1.streamCancel('Service'),
connection2.streamCancel('Service'),
];
await Future.wait(cancelFutures);
await Future.wait([
streamSubscribeUnsubscribe(connection1, delay: true),
streamSubscribeUnsubscribe(connection2, delay: false),
]);
await dds.shutdown();
}
});
}