mirror of
https://github.com/dart-lang/sdk
synced 2024-11-02 12:24:24 +00:00
[ DDS ] Don't await getIsolate calls in DDS initialization
Fix an issue where DDS would fail to initialize when an isolate in the target process was unable to handle service requests (e.g., when executing FFI code or blocked on a system call). Fixes b/323386606 Change-Id: I659ebaf750e2c800e9819809d1104e024cb059da Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/354681 Reviewed-by: Derek Xu <derekx@google.com> Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
parent
3775357116
commit
3786c5e5ee
7 changed files with 197 additions and 99 deletions
|
@ -1,5 +1,6 @@
|
|||
# 3.3.0-unreleased
|
||||
- **Breaking change:** [DAP] Several signatures in DAP debug adapter classes have been updated to use `Uri`s where they previously used `String path`s. This is to support communicating with the DAP client using URIs instead of file paths. URIs may be used only when the client sets the custom `supportsDartUris` client capability during initialization.
|
||||
- Fixed issue where DDS would fail to initialize when an isolate in the target process was unable to handle service requests (b/323386606).
|
||||
- Updated `package:dap` version to 1.2.0.
|
||||
|
||||
# 3.2.1
|
||||
|
|
|
@ -107,28 +107,45 @@ class DartDevelopmentServiceImpl implements DartDevelopmentService {
|
|||
},
|
||||
),
|
||||
);
|
||||
// Run in an error Zone to ensure that asynchronous exceptions encountered
|
||||
// during request handling are handled, as exceptions thrown during request
|
||||
// handling shouldn't take down the entire service.
|
||||
await runZonedGuarded(
|
||||
() async {
|
||||
try {
|
||||
// Setup stream event handling.
|
||||
await streamManager.listen();
|
||||
|
||||
try {
|
||||
// Setup stream event handling.
|
||||
await streamManager.listen();
|
||||
// Populate initial isolate state.
|
||||
await _isolateManager.initialize();
|
||||
|
||||
// Populate initial isolate state.
|
||||
await _isolateManager.initialize();
|
||||
|
||||
// Once we have a connection to the VM service, we're ready to spawn the intermediary.
|
||||
await _startDDSServer();
|
||||
_initializationComplete = true;
|
||||
} on StateError {
|
||||
// Handle json-rpc state errors.
|
||||
//
|
||||
// It's possible that ordering of events on the event queue can result in
|
||||
// the cleanup code above being called after this function has returned,
|
||||
// resulting in an invalid DDS instance being released into the wild.
|
||||
//
|
||||
// If initialization hasn't completed and the error hasn't already been
|
||||
// set, set it now.
|
||||
error ??= DartDevelopmentServiceException.failedToStart();
|
||||
}
|
||||
// Once we have a connection to the VM service, we're ready to spawn
|
||||
// the intermediary.
|
||||
await _startDDSServer();
|
||||
_initializationComplete = true;
|
||||
} on StateError {
|
||||
// Handle json-rpc state errors.
|
||||
//
|
||||
// It's possible that ordering of events on the event queue can
|
||||
// result in the cleanup code above being called after this function
|
||||
// has returned,
|
||||
// resulting in an invalid DDS instance being released into the wild.
|
||||
//
|
||||
// If initialization hasn't completed and the error hasn't already
|
||||
// been set, set it now.
|
||||
error ??= DartDevelopmentServiceException.failedToStart();
|
||||
} on DartDevelopmentServiceException catch (e) {
|
||||
// Forward any DartDevelopmentServiceExceptions thrown when starting
|
||||
// the server.
|
||||
error = e;
|
||||
}
|
||||
},
|
||||
(error, stack) {
|
||||
if (shouldLogRequests) {
|
||||
print('Asynchronous error: $error\n$stack');
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Check if we encountered any errors during startup, cleanup, and throw.
|
||||
if (error != null) {
|
||||
|
@ -155,38 +172,28 @@ class DartDevelopmentServiceImpl implements DartDevelopmentService {
|
|||
}
|
||||
pipeline = pipeline.addMiddleware(_authCodeMiddleware);
|
||||
final handler = pipeline.addHandler(_handlers().handler);
|
||||
// Start the DDS server. Run in an error Zone to ensure that asynchronous
|
||||
// exceptions encountered during request handling are handled, as exceptions
|
||||
// thrown during request handling shouldn't take down the entire service.
|
||||
// Start the DDS server.
|
||||
late String errorMessage;
|
||||
final tmpServer = await runZonedGuarded(
|
||||
() async {
|
||||
Future<HttpServer?> startServer() async {
|
||||
try {
|
||||
return await io.serve(handler, host, port);
|
||||
} on SocketException catch (e) {
|
||||
if (_enableServicePortFallback && port != 0) {
|
||||
// Try again, this time with a random port.
|
||||
port = 0;
|
||||
return await startServer();
|
||||
}
|
||||
errorMessage = e.message;
|
||||
if (e.osError != null) {
|
||||
errorMessage += ' (${e.osError!.message})';
|
||||
}
|
||||
errorMessage += ': ${e.address?.host}:${e.port}';
|
||||
return null;
|
||||
}
|
||||
Future<HttpServer?> startServer() async {
|
||||
try {
|
||||
return await io.serve(handler, host, port);
|
||||
} on SocketException catch (e) {
|
||||
if (_enableServicePortFallback && port != 0) {
|
||||
// Try again, this time with a random port.
|
||||
port = 0;
|
||||
return await startServer();
|
||||
}
|
||||
errorMessage = e.message;
|
||||
if (e.osError != null) {
|
||||
errorMessage += ' (${e.osError!.message})';
|
||||
}
|
||||
errorMessage += ': ${e.address?.host}:${e.port}';
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
final tmpServer = await startServer();
|
||||
|
||||
return await startServer();
|
||||
},
|
||||
(error, stack) {
|
||||
if (shouldLogRequests) {
|
||||
print('Asynchronous error: $error\n$stack');
|
||||
}
|
||||
},
|
||||
);
|
||||
if (tmpServer == null) {
|
||||
throw DartDevelopmentServiceException.connectionIssue(errorMessage);
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:convert' show base64Decode, base64Encode;
|
||||
|
||||
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
|
||||
|
@ -37,6 +38,7 @@ enum _IsolateState {
|
|||
pauseStart,
|
||||
pauseExit,
|
||||
pausePostRequest,
|
||||
unknown,
|
||||
}
|
||||
|
||||
class RunningIsolate {
|
||||
|
@ -44,7 +46,8 @@ class RunningIsolate {
|
|||
: cpuSamplesManager = CpuSamplesManager(
|
||||
isolateManager.dds,
|
||||
id,
|
||||
);
|
||||
),
|
||||
_state = _IsolateState.unknown;
|
||||
|
||||
// State setters.
|
||||
void pausedOnExit() => _state = _IsolateState.pauseExit;
|
||||
|
@ -146,7 +149,7 @@ class RunningIsolate {
|
|||
final String name;
|
||||
final String id;
|
||||
final Set<String?> _resumeApprovalsByName = {};
|
||||
_IsolateState? _state;
|
||||
_IsolateState _state;
|
||||
}
|
||||
|
||||
class IsolateManager {
|
||||
|
@ -217,49 +220,70 @@ class IsolateManager {
|
|||
if (_initialized) {
|
||||
return;
|
||||
}
|
||||
await _mutex.runGuarded(
|
||||
() async {
|
||||
final vm = await dds.vmServiceClient.sendRequest('getVM');
|
||||
final List<Map> isolateRefs =
|
||||
vm['isolates'].cast<Map<String, dynamic>>();
|
||||
// Check the pause event for each isolate to determine whether or not the
|
||||
// isolate is already paused.
|
||||
for (final isolateRef in isolateRefs) {
|
||||
final id = isolateRef['id'];
|
||||
final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
|
||||
'isolateId': id,
|
||||
});
|
||||
// If the isolate has shutdown after the getVM request, ignore it and
|
||||
// continue to the next isolate.
|
||||
if (isolate['type'] == 'Sentinel') {
|
||||
continue;
|
||||
}
|
||||
final name = isolate['name'];
|
||||
if (isolate.containsKey('pauseEvent')) {
|
||||
isolates[id] = RunningIsolate(this, id, name);
|
||||
final eventKind = isolate['pauseEvent']['kind'];
|
||||
_updateIsolateState(id, name, eventKind);
|
||||
} else {
|
||||
// If the isolate doesn't have a pauseEvent, assume it's running.
|
||||
isolateStarted(id, name);
|
||||
}
|
||||
}
|
||||
if (dds.cachedUserTags.isNotEmpty) {
|
||||
await dds.vmServiceClient.sendRequestAndIgnoreMethodNotFound(
|
||||
'streamCpuSamplesWithUserTag',
|
||||
{
|
||||
'userTags': dds.cachedUserTags,
|
||||
final vm = await dds.vmServiceClient.sendRequest('getVM');
|
||||
final isolateRefs = vm['isolates'].cast<Map<String, dynamic>>();
|
||||
// Check the pause event for each isolate to determine whether or not the
|
||||
// isolate is already paused.
|
||||
for (final isolateRef in isolateRefs) {
|
||||
final id = isolateRef['id'];
|
||||
final name = isolateRef['name'];
|
||||
|
||||
// Create an entry for the running isolate.
|
||||
initializeRunningIsolate(id, name);
|
||||
|
||||
// The calls to `getIsolate` are intentionally unawaited as it's
|
||||
// possible for the isolate to be in a state where it is unable to
|
||||
// process service messages, potentially indefinitely. For example,
|
||||
// an isolate that invoked FFI code will be blocked until control is
|
||||
// returned from native code.
|
||||
//
|
||||
// See b/323386606 for details.
|
||||
unawaited(
|
||||
dds.vmServiceClient.sendRequest('getIsolate', {
|
||||
'isolateId': id,
|
||||
}).then(
|
||||
(isolate) async => await _mutex.runGuarded(
|
||||
() {
|
||||
// If the isolate has shutdown after the getVM request, ignore it and
|
||||
// continue to the next isolate.
|
||||
if (isolate['type'] == 'Sentinel') {
|
||||
return;
|
||||
}
|
||||
if (isolate.containsKey('pauseEvent')) {
|
||||
isolates[id] = RunningIsolate(this, id, name);
|
||||
final eventKind = isolate['pauseEvent']['kind'];
|
||||
_updateIsolateState(id, name, eventKind);
|
||||
} else {
|
||||
// If the isolate doesn't have a pauseEvent, assume it's running.
|
||||
isolateStarted(id, name);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
),
|
||||
),
|
||||
);
|
||||
if (dds.cachedUserTags.isNotEmpty) {
|
||||
await dds.vmServiceClient.sendRequestAndIgnoreMethodNotFound(
|
||||
'streamCpuSamplesWithUserTag',
|
||||
{
|
||||
'userTags': dds.cachedUserTags,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
_initialized = true;
|
||||
}
|
||||
|
||||
/// This method creates an entry for a running isolate, leaves its run state
|
||||
/// as [_IsolateState.unknown].
|
||||
RunningIsolate initializeRunningIsolate(String id, String name) =>
|
||||
isolates.putIfAbsent(
|
||||
id,
|
||||
() => RunningIsolate(this, id, name),
|
||||
);
|
||||
|
||||
/// Initializes state for a newly started isolate.
|
||||
void isolateStarted(String id, String name) {
|
||||
final isolate = RunningIsolate(this, id, name);
|
||||
final isolate = initializeRunningIsolate(id, name);
|
||||
isolate.running();
|
||||
isolates[id] = isolate;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ Future<io.Process> spawnDartProcess(
|
|||
bool serveObservatory = true,
|
||||
bool pauseOnStart = true,
|
||||
bool disableServiceAuthCodes = false,
|
||||
bool subscribeToStdio = true,
|
||||
}) async {
|
||||
final executable = io.Platform.executable;
|
||||
final tmpDir = await io.Directory.systemTemp.createTemp('dart_service');
|
||||
|
@ -55,12 +56,14 @@ Future<io.Process> spawnDartProcess(
|
|||
io.Platform.script.resolve(script).toString(),
|
||||
];
|
||||
final process = await io.Process.start(executable, arguments);
|
||||
process.stdout
|
||||
.transform(utf8.decoder)
|
||||
.listen((line) => print('TESTEE OUT: $line'));
|
||||
process.stderr
|
||||
.transform(utf8.decoder)
|
||||
.listen((line) => print('TESTEE ERR: $line'));
|
||||
if (subscribeToStdio) {
|
||||
process.stdout
|
||||
.transform(utf8.decoder)
|
||||
.listen((line) => print('TESTEE OUT: $line'));
|
||||
process.stderr
|
||||
.transform(utf8.decoder)
|
||||
.listen((line) => print('TESTEE ERR: $line'));
|
||||
}
|
||||
while ((await serviceInfoFile.length()) <= 5) {
|
||||
await Future.delayed(const Duration(milliseconds: 50));
|
||||
}
|
||||
|
|
10
pkg/dds/test/long_sleep_script.dart
Normal file
10
pkg/dds/test/long_sleep_script.dart
Normal file
|
@ -0,0 +1,10 @@
|
|||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import 'dart:io';
|
||||
|
||||
void main() {
|
||||
print('started');
|
||||
sleep(Duration(minutes: 5));
|
||||
}
|
|
@ -28,17 +28,24 @@ void main() {
|
|||
process.kill();
|
||||
});
|
||||
|
||||
Future<IsolateRef> getIsolate(VmService service) async {
|
||||
IsolateRef isolate;
|
||||
Future<Isolate> getIsolate(VmService service) async {
|
||||
while (true) {
|
||||
final vm = await service.getVM();
|
||||
if (vm.isolates!.isNotEmpty) {
|
||||
isolate = vm.isolates!.first;
|
||||
break;
|
||||
final isolateId = vm.isolates!.first.id!;
|
||||
Isolate isolate;
|
||||
bool retry;
|
||||
do {
|
||||
isolate = await service.getIsolate(isolateId);
|
||||
retry = isolate.pauseEvent?.kind != EventKind.kPauseStart;
|
||||
if (retry) {
|
||||
await Future.delayed(const Duration(milliseconds: 50));
|
||||
}
|
||||
} while (retry);
|
||||
return isolate;
|
||||
}
|
||||
await Future.delayed(const Duration(milliseconds: 50));
|
||||
}
|
||||
return isolate;
|
||||
}
|
||||
|
||||
test('sends a postEvent over a custom stream to multiple listeners',
|
||||
|
|
46
pkg/dds/test/regress_b323386606_test.dart
Normal file
46
pkg/dds/test/regress_b323386606_test.dart
Normal file
|
@ -0,0 +1,46 @@
|
|||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:dds/dds.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'common/test_helper.dart';
|
||||
|
||||
void main() {
|
||||
late Process process;
|
||||
late DartDevelopmentService dds;
|
||||
|
||||
setUp(() async {
|
||||
process = await spawnDartProcess(
|
||||
'long_sleep_script.dart',
|
||||
pauseOnStart: false,
|
||||
subscribeToStdio: false,
|
||||
);
|
||||
});
|
||||
|
||||
tearDown(() async {
|
||||
await dds.shutdown();
|
||||
process.kill();
|
||||
});
|
||||
|
||||
test('Ensure DDS starts when isolate is blocked', () async {
|
||||
// Wait for the Dart program to start running, then wait a bit more to make
|
||||
// sure the isolate is actually blocked on the sleep(...) call.
|
||||
await process.stdout.transform(utf8.decoder).first;
|
||||
await Future.delayed(const Duration(milliseconds: 500));
|
||||
|
||||
print('Starting DDS...');
|
||||
// Before the fix for b/323386606, this call would hang as the isolate
|
||||
// waiting on the sleep(...) call would never respond to a service request,
|
||||
// preventing DDS initialization from completing.
|
||||
dds = await DartDevelopmentService.startDartDevelopmentService(
|
||||
remoteVmServiceUri,
|
||||
);
|
||||
print('DDS started');
|
||||
expect(dds.isRunning, true);
|
||||
});
|
||||
}
|
Loading…
Reference in a new issue