From 0ecfc7da6fced1f7bd7c7e95c7f8393056881da6 Mon Sep 17 00:00:00 2001 From: Ben Konyi Date: Thu, 5 Aug 2021 17:56:01 +0000 Subject: [PATCH] [ package:dds ] Add locking when modifying DDS state via client requests Fixes https://github.com/dart-lang/sdk/issues/46696 Change-Id: I666b59a0661f4df3b1f0a47aba52096133f5fbb7 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/209140 Reviewed-by: Anna Gringauze --- .dart_tool/package_config.json | 4 +- pkg/dds/lib/src/isolate_manager.dart | 115 ++++++++++++---------- pkg/dds/lib/src/stream_manager.dart | 140 +++++++++++++++------------ pkg/dds/lib/src/utils/mutex.dart | 48 +++++++++ pkg/dds/pubspec.yaml | 2 +- pkg/dds/test/regress_45569_test.dart | 44 +++++---- 6 files changed, 218 insertions(+), 135 deletions(-) create mode 100644 pkg/dds/lib/src/utils/mutex.dart diff --git a/.dart_tool/package_config.json b/.dart_tool/package_config.json index d2df8f388ac..98aef421093 100644 --- a/.dart_tool/package_config.json +++ b/.dart_tool/package_config.json @@ -11,7 +11,7 @@ "constraint, update this by running tools/generate_package_config.dart." ], "configVersion": 2, - "generated": "2021-08-04T16:42:24.433381", + "generated": "2021-08-05T11:33:04.746536", "generator": "tools/generate_package_config.dart", "packages": [ { @@ -256,7 +256,7 @@ "name": "dds", "rootUri": "../pkg/dds", "packageUri": "lib/", - "languageVersion": "2.12" + "languageVersion": "2.14" }, { "name": "dev_compiler", diff --git a/pkg/dds/lib/src/isolate_manager.dart b/pkg/dds/lib/src/isolate_manager.dart index e9e14dff303..96769b7328e 100644 --- a/pkg/dds/lib/src/isolate_manager.dart +++ b/pkg/dds/lib/src/isolate_manager.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'package:dds/src/utils/mutex.dart'; import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'client.dart'; @@ -139,32 +140,36 @@ class IsolateManager { } void _updateIsolateState(String id, String name, String eventKind) { - switch (eventKind) { - case ServiceEvents.isolateStart: - isolateStarted(id, name); - break; - case ServiceEvents.isolateExit: - isolateExited(id); - break; - default: - final isolate = isolates[id]; + _mutex.runGuarded( + () { switch (eventKind) { - case ServiceEvents.pauseExit: - isolate!.pausedOnExit(); + case ServiceEvents.isolateStart: + isolateStarted(id, name); break; - case ServiceEvents.pausePostRequest: - isolate!.pausedPostRequest(); - break; - case ServiceEvents.pauseStart: - isolate!.pausedOnStart(); - break; - case ServiceEvents.resume: - isolate!.resumed(); + case ServiceEvents.isolateExit: + isolateExited(id); break; default: - break; + final isolate = isolates[id]; + switch (eventKind) { + case ServiceEvents.pauseExit: + isolate!.pausedOnExit(); + break; + case ServiceEvents.pausePostRequest: + isolate!.pausedPostRequest(); + break; + case ServiceEvents.pauseStart: + isolate!.pausedOnStart(); + break; + case ServiceEvents.resume: + isolate!.resumed(); + break; + default: + break; + } } - } + }, + ); } /// Initializes the set of running isolates. @@ -172,25 +177,30 @@ class IsolateManager { if (_initialized) { return; } - final vm = await dds.vmServiceClient.sendRequest('getVM'); - final List isolateRefs = vm['isolates'].cast>(); - // Check the pause event for each isolate to determine whether or not the - // isolate is already paused. - for (final isolateRef in isolateRefs) { - final id = isolateRef['id']; - final isolate = await dds.vmServiceClient.sendRequest('getIsolate', { - 'isolateId': id, - }); - final name = isolate['name']; - if (isolate.containsKey('pauseEvent')) { - isolates[id] = _RunningIsolate(this, id, name); - final eventKind = isolate['pauseEvent']['kind']; - _updateIsolateState(id, name, eventKind); - } else { - // If the isolate doesn't have a pauseEvent, assume it's running. - isolateStarted(id, name); - } - } + await _mutex.runGuarded( + () async { + final vm = await dds.vmServiceClient.sendRequest('getVM'); + final List isolateRefs = + vm['isolates'].cast>(); + // Check the pause event for each isolate to determine whether or not the + // isolate is already paused. + for (final isolateRef in isolateRefs) { + final id = isolateRef['id']; + final isolate = await dds.vmServiceClient.sendRequest('getIsolate', { + 'isolateId': id, + }); + final name = isolate['name']; + if (isolate.containsKey('pauseEvent')) { + isolates[id] = _RunningIsolate(this, id, name); + final eventKind = isolate['pauseEvent']['kind']; + _updateIsolateState(id, name, eventKind); + } else { + // If the isolate doesn't have a pauseEvent, assume it's running. + isolateStarted(id, name); + } + } + }, + ); _initialized = true; } @@ -218,16 +228,20 @@ class IsolateManager { DartDevelopmentServiceClient client, json_rpc.Parameters parameters, ) async { - final isolateId = parameters['isolateId'].asString; - final isolate = isolates[isolateId]; - if (isolate == null) { - return RPCResponses.collectedSentinel; - } - if (isolate.shouldResume(resumingClient: client)) { - isolate.clearResumeApprovals(); - return await _sendResumeRequest(isolateId, parameters); - } - return RPCResponses.success; + return await _mutex.runGuarded( + () async { + final isolateId = parameters['isolateId'].asString; + final isolate = isolates[isolateId]; + if (isolate == null) { + return RPCResponses.collectedSentinel; + } + if (isolate.shouldResume(resumingClient: client)) { + isolate.clearResumeApprovals(); + return await _sendResumeRequest(isolateId, parameters); + } + return RPCResponses.success; + }, + ); } /// Forwards a `resume` request to the VM service. @@ -248,5 +262,6 @@ class IsolateManager { bool _initialized = false; final DartDevelopmentServiceImpl dds; + final _mutex = Mutex(); final Map isolates = {}; } diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart index 94f791a3a34..fd1290ca0e6 100644 --- a/pkg/dds/lib/src/stream_manager.dart +++ b/pkg/dds/lib/src/stream_manager.dart @@ -10,6 +10,7 @@ import 'client.dart'; import 'dds_impl.dart'; import 'logging_repository.dart'; import 'rpc_error_codes.dart'; +import 'utils/mutex.dart'; class StreamManager { StreamManager(this.dds); @@ -133,51 +134,56 @@ class StreamManager { DartDevelopmentServiceClient? client, String stream, ) async { - assert(stream.isNotEmpty); - if (!streamListeners.containsKey(stream)) { - // Initialize the list of clients for the new stream before we do - // anything else to ensure multiple clients registering for the same - // stream in quick succession doesn't result in multiple streamListen - // requests being sent to the VM service. - streamListeners[stream] = []; - if ((stream == kDebugStream && client == null) || - stream != kDebugStream) { - // This will return an RPC exception if the stream doesn't exist. This - // will throw and the exception will be forwarded to the client. - final result = await dds.vmServiceClient.sendRequest('streamListen', { - 'streamId': stream, - }); - assert(result['type'] == 'Success'); - } - } - if (streamListeners[stream]!.contains(client)) { - throw kStreamAlreadySubscribedException; - } - if (client != null) { - streamListeners[stream]!.add(client); - if (loggingRepositories.containsKey(stream)) { - loggingRepositories[stream]!.sendHistoricalLogs(client); - } else if (stream == kServiceStream) { - // Send all previously registered service extensions when a client - // subscribes to the Service stream. - for (final c in dds.clientManager.clients) { - if (c == client) { - continue; - } - final namespace = dds.getNamespace(c); - for (final service in c.services.keys) { - client.sendNotification( - 'streamNotify', - _buildStreamRegisteredEvent( - namespace!, - service, - c.services[service]!, - ), - ); + await _mutex.runGuarded( + () async { + assert(stream.isNotEmpty); + if (!streamListeners.containsKey(stream)) { + // Initialize the list of clients for the new stream before we do + // anything else to ensure multiple clients registering for the same + // stream in quick succession doesn't result in multiple streamListen + // requests being sent to the VM service. + streamListeners[stream] = []; + if ((stream == kDebugStream && client == null) || + stream != kDebugStream) { + // This will return an RPC exception if the stream doesn't exist. This + // will throw and the exception will be forwarded to the client. + final result = + await dds.vmServiceClient.sendRequest('streamListen', { + 'streamId': stream, + }); + assert(result['type'] == 'Success'); } } - } - } + if (streamListeners[stream]!.contains(client)) { + throw kStreamAlreadySubscribedException; + } + if (client != null) { + streamListeners[stream]!.add(client); + if (loggingRepositories.containsKey(stream)) { + loggingRepositories[stream]!.sendHistoricalLogs(client); + } else if (stream == kServiceStream) { + // Send all previously registered service extensions when a client + // subscribes to the Service stream. + for (final c in dds.clientManager.clients) { + if (c == client) { + continue; + } + final namespace = dds.getNamespace(c); + for (final service in c.services.keys) { + client.sendNotification( + 'streamNotify', + _buildStreamRegisteredEvent( + namespace!, + service, + c.services[service]!, + ), + ); + } + } + } + } + }, + ); } List>? getStreamHistory(String stream) { @@ -198,27 +204,32 @@ class StreamManager { String stream, { bool cancelCoreStream = false, }) async { - assert(stream.isNotEmpty); - final listeners = streamListeners[stream]; - if (listeners == null || client != null && !listeners.contains(client)) { - throw kStreamNotSubscribedException; - } - listeners.remove(client); - // Don't cancel streams DDS needs to function. - if (listeners.isEmpty && - (!ddsCoreStreams.contains(stream) || cancelCoreStream)) { - streamListeners.remove(stream); - // Ensure the VM service hasn't shutdown. - if (dds.vmServiceClient.isClosed) { - return; - } - final result = await dds.vmServiceClient.sendRequest('streamCancel', { - 'streamId': stream, - }); - assert(result['type'] == 'Success'); - } else { - streamListeners[stream] = listeners; - } + await _mutex.runGuarded( + () async { + assert(stream.isNotEmpty); + final listeners = streamListeners[stream]; + if (listeners == null || + client != null && !listeners.contains(client)) { + throw kStreamNotSubscribedException; + } + listeners.remove(client); + // Don't cancel streams DDS needs to function. + if (listeners.isEmpty && + (!ddsCoreStreams.contains(stream) || cancelCoreStream)) { + streamListeners.remove(stream); + // Ensure the VM service hasn't shutdown. + if (dds.vmServiceClient.isClosed) { + return; + } + final result = await dds.vmServiceClient.sendRequest('streamCancel', { + 'streamId': stream, + }); + assert(result['type'] == 'Success'); + } else { + streamListeners[stream] = listeners; + } + }, + ); } /// Cleanup stream subscriptions for `client` when it has disconnected. @@ -280,4 +291,5 @@ class StreamManager { final DartDevelopmentServiceImpl dds; final streamListeners = >{}; + final _mutex = Mutex(); } diff --git a/pkg/dds/lib/src/utils/mutex.dart b/pkg/dds/lib/src/utils/mutex.dart new file mode 100644 index 00000000000..7f67ea6857d --- /dev/null +++ b/pkg/dds/lib/src/utils/mutex.dart @@ -0,0 +1,48 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:collection'; + +typedef _LockRequest = Completer; + +/// Used to protect global state accessed in blocks containing calls to +/// asynchronous methods. +class Mutex { + /// Executes a block of code containing asynchronous calls atomically. + /// + /// If no other asynchronous context is currently executing within + /// [criticalSection], it will immediately be called. Otherwise, the caller + /// will be suspended and entered into a queue to be resumed once the lock is + /// released. + Future runGuarded(FutureOr Function() criticalSection) async { + try { + await _acquireLock(); + return await criticalSection(); + } finally { + _releaseLock(); + } + } + + Future _acquireLock() async { + if (!_locked) { + _locked = true; + return; + } + final request = _LockRequest(); + _outstandingRequests.add(request); + await request.future; + } + + void _releaseLock() { + _locked = false; + if (_outstandingRequests.isNotEmpty) { + final request = _outstandingRequests.removeFirst(); + request.complete(); + } + } + + bool _locked = false; + final _outstandingRequests = Queue<_LockRequest>(); +} diff --git a/pkg/dds/pubspec.yaml b/pkg/dds/pubspec.yaml index cfbf77958b4..64480845647 100644 --- a/pkg/dds/pubspec.yaml +++ b/pkg/dds/pubspec.yaml @@ -8,7 +8,7 @@ version: 2.0.2 homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds environment: - sdk: '>=2.12.0 <3.0.0' + sdk: '>=2.14.0 <3.0.0' dependencies: async: ^2.4.1 diff --git a/pkg/dds/test/regress_45569_test.dart b/pkg/dds/test/regress_45569_test.dart index d08c9c163fa..a630d6ac3f8 100644 --- a/pkg/dds/test/regress_45569_test.dart +++ b/pkg/dds/test/regress_45569_test.dart @@ -3,10 +3,13 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:io'; +import 'dart:math'; import 'package:dds/dds.dart'; import 'package:test/test.dart'; +import 'package:vm_service/vm_service.dart'; import 'package:vm_service/vm_service_io.dart'; + import 'common/test_helper.dart'; void main() { @@ -27,27 +30,32 @@ void main() { process.kill(); }); + Future streamSubscribeUnsubscribe( + VmService client, { + required bool delay, + }) async { + await client.streamListen('Service'); + await Future.delayed( + Duration(milliseconds: delay ? Random().nextInt(200) : 0), + ); + await client.streamCancel('Service'); + } + test('Ensure streamListen and streamCancel calls are handled atomically', () async { - dds = await DartDevelopmentService.startDartDevelopmentService( - remoteVmServiceUri, - ); - expect(dds.isRunning, true); - final connection1 = await vmServiceConnectUri(dds.wsUri.toString()); - final connection2 = await vmServiceConnectUri(dds.wsUri.toString()); + for (int i = 0; i < 100; ++i) { + dds = await DartDevelopmentService.startDartDevelopmentService( + remoteVmServiceUri, + ); + expect(dds.isRunning, true); + final connection1 = await vmServiceConnectUri(dds.wsUri.toString()); + final connection2 = await vmServiceConnectUri(dds.wsUri.toString()); - for (int i = 0; i < 50; ++i) { - final listenFutures = [ - connection1.streamListen('Service'), - connection2.streamListen('Service'), - ]; - await Future.wait(listenFutures); - - final cancelFutures = [ - connection1.streamCancel('Service'), - connection2.streamCancel('Service'), - ]; - await Future.wait(cancelFutures); + await Future.wait([ + streamSubscribeUnsubscribe(connection1, delay: true), + streamSubscribeUnsubscribe(connection2, delay: false), + ]); + await dds.shutdown(); } }); }