[ DDS ] Add R/W-like locking around streamListen and streamCancel

Reduces latency for concurrent stream subscriptions while no
subscription cancellations are pending.

TEST=Existing regression tests

Fixes https://github.com/dart-lang/sdk/issues/47042

Change-Id: I548bc5d1f30fddcf03af397a2c2b3ed8beeb09f2
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239862
Reviewed-by: Kenzie Davisson <kenzieschmoll@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Ben Konyi 2022-04-04 23:16:42 +00:00 committed by Commit Bot
parent dbe6d1d574
commit 2aed3ea034
4 changed files with 55 additions and 8 deletions

View file

@ -1,5 +1,8 @@
# 2.2.1
- Reduce latency of `streamListen` calls through improved locking behavior.
# 2.2.0
- Add support for serving DevTools via `package:dds/devtools_server.dart`
- Add support for serving DevTools via `package:dds/devtools_server.dart`.
# 2.1.7
- Re-release 2.1.6+1.

View file

@ -173,7 +173,11 @@ class StreamManager {
String stream, {
bool? includePrivates,
}) async {
await _streamSubscriptionMutex.runGuarded(
// Weakly guard stream listening as it's safe to perform multiple listens
// on a stream concurrently. However, cancelling streams while listening
// to them concurrently can put things in a bad state. Use weak guarding to
// improve latency of stream subscription.
await _streamSubscriptionMutex.runGuardedWeak(
() async {
assert(stream.isNotEmpty);
bool streamNewlySubscribed = false;

View file

@ -11,9 +11,9 @@ class Mutex {
/// Executes a block of code containing asynchronous calls atomically.
///
/// If no other asynchronous context is currently executing within
/// [criticalSection], it will immediately be called. Otherwise, the caller
/// will be suspended and entered into a queue to be resumed once the lock is
/// released.
/// [criticalSection] or a [runGuardedWeak] scope, it will immediately be
/// called. Otherwise, the caller will be suspended and entered into a queue
/// to be resumed once the lock is released.
Future<T> runGuarded<T>(FutureOr<T> Function() criticalSection) async {
try {
await _acquireLock();
@ -23,11 +23,49 @@ class Mutex {
}
}
Future<void> _acquireLock() async {
/// Executes a block of code containing asynchronous calls, allowing for other
/// weakly guarded sections to be executed concurrently.
///
/// If no other asynchronous context is currently executing within a
/// [runGuarded] scope, [criticalSection] will immediately be called.
/// Otherwise, the caller will be suspended and entered into a queue to be
/// resumed once the lock is released.
Future<T> runGuardedWeak<T>(FutureOr<T> Function() criticalSection) async {
_weakGuards++;
if (_weakGuards == 1) {
// Reinitialize if this is the only weakly guarded scope.
_outstandingReadersCompleter = Completer<void>();
}
final result;
try {
await _acquireLock(strong: false);
result = await criticalSection();
} finally {
_weakGuards--;
if (_weakGuards == 0) {
// Notify callers of `runGuarded` that they can try to execute again.
_outstandingReadersCompleter.complete();
}
}
return result;
}
Future<void> _acquireLock({bool strong = true}) async {
// The lock cannot be acquired by `runGuarded` if there is outstanding
// execution in weakly guarded sections. Loop in case we've entered another
// weakly guarded scope before we've woken up.
while (strong && _weakGuards > 0) {
await _outstandingReadersCompleter.future;
}
if (!_locked) {
_locked = true;
if (strong) {
// Don't actually lock for weakly guarded sections, just make sure the
// lock isn't held before entering.
_locked = true;
}
return;
}
final request = Completer<void>();
_outstandingRequests.add(request);
await request.future;
@ -41,6 +79,8 @@ class Mutex {
}
}
int _weakGuards = 0;
bool _locked = false;
var _outstandingReadersCompleter = Completer<void>();
final _outstandingRequests = Queue<Completer<void>>();
}

View file

@ -3,7 +3,7 @@ description: >-
A library used to spawn the Dart Developer Service, used to communicate with
a Dart VM Service instance.
version: 2.2.0
version: 2.2.1
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds