[ package:dds ] Add support for caching CPU samples based on UserTag

DDS can be configured to listen for CPU sample events and cache samples
that were collected while certain UserTags are active. These cached
samples are stored in a ring buffer and are stored until the isolate
shuts down.

TEST=pkg/dds/test/get_cached_cpu_samples_test.dart

Change-Id: Ib20770f59f1672c703413486f87795b3bb23f676
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/207206
Commit-Queue: Ben Konyi <bkonyi@google.com>
Reviewed-by: Kenzie Schmoll <kenzieschmoll@google.com>
This commit is contained in:
Ben Konyi 2021-07-28 00:33:33 +00:00 committed by commit-bot@chromium.org
parent c17e2a13da
commit 746b8f1f5c
20 changed files with 734 additions and 105 deletions

View file

@ -244,7 +244,7 @@
"name": "dds",
"rootUri": "../pkg/dds",
"packageUri": "lib/",
"languageVersion": "2.12"
"languageVersion": "2.14"
},
{
"name": "dev_compiler",

View file

@ -1,6 +1,6 @@
# Dart Development Service Protocol 1.2
# Dart Development Service Protocol 1.3
This document describes _version 1.2_ of the Dart Development Service Protocol.
This document describes _version 1.3_ of the Dart Development Service Protocol.
This protocol is an extension of the Dart VM Service Protocol and implements it
in it's entirety. For details on the VM Service Protocol, see the [Dart VM Service Protocol Specification][service-protocol].
@ -67,6 +67,29 @@ event being sent to the subscribing client for each existing service extension.
The DDS Protocol supports all [public RPCs defined in the VM Service protocol][service-protocol-public-rpcs].
### getAvailableCachedCpuSamples
```
AvailableCachedCpuSamples getAvailableCachedCpuSamples();
```
The _getAvailableCachedCpuSamples_ RPC is used to determine which caches of CPU samples
are available. Caches are associated with individual _UserTag_ names and are specified
when DDS is started via the _cachedUserTags_ parameter.
See [AvailableCachedCpuSamples](#availablecachedcpusamples).
### getCachedCpuSamples
```
CachedCpuSamples getCachedCpuSamples(string isolateId, string userTag);
```
The _getCachedCpuSamples_ RPC is used to retrieve a cache of CPU samples collected
under a _UserTag_ with name _userTag_.
See [CachedCpuSamples](#cachedcpusamples).
### getClientName
```
@ -181,6 +204,37 @@ See [Success](#success).
The DDS Protocol supports all [public types defined in the VM Service protocol][service-protocol-public-types].
### AvailableCachedCpuSamples
```
class AvailableCachedCpuSamples extends Response {
// A list of UserTag names associated with CPU sample caches.
string[] cacheNames;
}
```
A collection of [UserTag] names associated with caches of CPU samples.
See [getAvailableCachedCpuSamples](#getavailablecachedcpusamples).
### CachedCpuSamples
```
class CachedCpuSamples extends CpuSamples {
// The name of the UserTag associated with this cache of samples.
string userTag;
// Provided if the CPU sample cache has filled and older samples have been
// dropped.
bool truncated [optional];
}
```
An extension of [CpuSamples](#cpu-samples) which represents a set of cached
samples, associated with a particular [UserTag] name.
See [getCachedCpuSamples](#getcachedcpusamples).
### ClientName
```
@ -220,10 +274,12 @@ version | comments
1.0 | Initial revision
1.1 | Added `getDartDevelopmentServiceVersion` RPC.
1.2 | Added `getStreamHistory` RPC.
1.3 | Added `getAvailableCachedCpuSamples` and `getCachedCpuSamples` RPCs.
[resume]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#resume
[success]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#success
[version]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#version
[cpu-samples]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#cpusamples
[service-protocol]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md
[service-protocol-rpcs-requests-and-responses]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#rpcs-requests-and-responses

View file

@ -42,6 +42,7 @@ abstract class DartDevelopmentService {
Uri? serviceUri,
bool enableAuthCodes = true,
bool ipv6 = false,
List<String> cachedUserTags = const [],
DevToolsConfiguration? devToolsConfiguration,
bool logRequests = false,
}) async {
@ -79,6 +80,7 @@ abstract class DartDevelopmentService {
remoteVmServiceUri,
serviceUri,
enableAuthCodes,
cachedUserTags,
ipv6,
devToolsConfiguration,
logRequests,
@ -136,9 +138,13 @@ abstract class DartDevelopmentService {
/// requests.
bool get isRunning;
/// The list of [UserTag]s used to determine which CPU samples are cached by
/// DDS.
List<String> get cachedUserTags;
/// The version of the DDS protocol supported by this [DartDevelopmentService]
/// instance.
static const String protocolVersion = '1.2';
static const String protocolVersion = '1.3';
}
class DartDevelopmentServiceException implements Exception {

View file

@ -206,6 +206,19 @@ class DartDevelopmentServiceClient {
return supportedProtocols;
});
_clientPeer.registerMethod(
'getAvailableCachedCpuSamples',
(_) => {
'type': 'AvailableCachedCpuSamples',
'cacheNames': dds.cachedUserTags,
},
);
_clientPeer.registerMethod(
'getCachedCpuSamples',
dds.isolateManager.getCachedCpuSamples,
);
// `evaluate` and `evaluateInFrame` actually consist of multiple RPC
// invocations, including a call to `compileExpression` which can be
// overridden by clients which provide their own implementation (e.g.,

View file

@ -0,0 +1,68 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:math';
class RingBuffer<T> {
RingBuffer(this._bufferSize) {
_buffer = List<T?>.filled(
_bufferSize,
null,
);
}
Iterable<T> call() sync* {
for (int i = _size - 1; i >= 0; --i) {
yield _buffer[(_count - i - 1) % _bufferSize]!;
}
}
/// Inserts a new element into the [RingBuffer].
///
/// Returns the element evicted as a result of adding the new element if the
/// buffer is as max capacity, null otherwise.
T? add(T e) {
if (_buffer.isEmpty) {
return null;
}
T? evicted;
final index = _count % _bufferSize;
if (isTruncated) {
evicted = _buffer[index];
}
_buffer[index] = e;
_count++;
return evicted;
}
void resize(int size) {
assert(size >= 0);
if (size == _bufferSize) {
return;
}
final resized = List<T?>.filled(
size,
null,
);
int count = 0;
if (size > 0) {
for (final e in this()) {
resized[count++ % size] = e;
}
}
_count = count;
_bufferSize = size;
_buffer = resized;
}
bool get isTruncated => _count % bufferSize < _count;
int get bufferSize => _bufferSize;
int get _size => min(_count, _bufferSize);
int _bufferSize;
int _count = 0;
late List<T?> _buffer;
}

View file

@ -0,0 +1,201 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:dds/src/common/ring_buffer.dart';
import 'package:vm_service/vm_service.dart';
import 'dds_impl.dart';
/// Manages CPU sample caches for an individual [Isolate].
class CpuSamplesManager {
CpuSamplesManager(this.dds, this.isolateId) {
for (final userTag in dds.cachedUserTags) {
cpuSamplesCaches[userTag] = CpuSamplesRepository(userTag);
}
}
void handleUserTagEvent(Event event) {
assert(event.kind! == EventKind.kUserTagChanged);
_currentTag = event.updatedTag!;
final previousTag = event.previousTag!;
if (cpuSamplesCaches.containsKey(previousTag)) {
_lastCachedTag = previousTag;
}
}
void handleCpuSamplesEvent(Event event) {
assert(event.kind! == EventKind.kCpuSamples);
// There might be some samples left in the buffer for the previously set
// user tag. We'll check for them here and then close out the cache.
if (_lastCachedTag != null) {
cpuSamplesCaches[_lastCachedTag]!.cacheSamples(
event.cpuSamples!,
);
_lastCachedTag = null;
}
cpuSamplesCaches[_currentTag]?.cacheSamples(event.cpuSamples!);
}
final DartDevelopmentServiceImpl dds;
final String isolateId;
final cpuSamplesCaches = <String, CpuSamplesRepository>{};
String _currentTag = '';
String? _lastCachedTag;
}
class CpuSamplesRepository extends RingBuffer<CpuSample> {
// TODO: math to figure out proper buffer sizes.
CpuSamplesRepository(
this.tag, [
int bufferSize = 1000000,
]) : super(bufferSize);
void cacheSamples(CpuSamples samples) {
String getFunctionId(ProfileFunction function) {
final functionObject = function.function;
if (functionObject is NativeFunction) {
return 'native/${functionObject.name}';
}
return functionObject.id!;
}
// Initialize upon seeing our first samples.
if (functions.isEmpty) {
samplePeriod = samples.samplePeriod!;
maxStackDepth = samples.maxStackDepth!;
pid = samples.pid!;
functions.addAll(samples.functions!);
// Build the initial id to function index mapping. This allows for us to
// lookup a ProfileFunction in the global function list stored in this
// cache. This works since most ProfileFunction objects will have an
// associated function with a *typically* stable service ID that we can
// use as a key.
//
// TODO(bkonyi): investigate creating some form of stable ID for
// Functions tied to closures.
for (int i = 0; i < functions.length; ++i) {
idToFunctionIndex[getFunctionId(functions[i])] = i;
}
// Clear tick information as we'll need to recalculate these values later
// when a request for samples from this repository is received.
for (final f in functions) {
f.inclusiveTicks = 0;
f.exclusiveTicks = 0;
}
_firstSampleTimestamp = samples.timeOriginMicros!;
} else {
final newFunctions = samples.functions!;
final indexMapping = <int, int>{};
// Check to see if we've got a function object we've never seen before.
for (int i = 0; i < newFunctions.length; ++i) {
final key = getFunctionId(newFunctions[i]);
if (!idToFunctionIndex.containsKey(key)) {
idToFunctionIndex[key] = functions.length;
// Keep track of the original index and the location of the function
// in the master function list so we can update the function indicies
// for each sample in this batch.
indexMapping[i] = functions.length;
functions.add(newFunctions[i]);
// Reset tick state as we'll recalculate later.
functions.last.inclusiveTicks = 0;
functions.last.exclusiveTicks = 0;
}
}
// Update the indicies into the function table for functions that were
// newly processed in the most recent event.
for (final sample in samples.samples!) {
final stack = sample.stack!;
for (int i = 0; i < stack.length; ++i) {
if (indexMapping.containsKey(stack[i])) {
stack[i] = indexMapping[stack[i]]!;
}
}
}
}
final relevantSamples = samples.samples!.where((s) => s.userTag == tag);
for (final sample in relevantSamples) {
add(sample);
}
}
@override
CpuSample? add(CpuSample sample) {
final evicted = super.add(sample);
void updateTicksForSample(CpuSample sample, int increment) {
final stack = sample.stack!;
for (int i = 0; i < stack.length; ++i) {
final function = functions[stack[i]];
function.inclusiveTicks = function.inclusiveTicks! + increment;
if (i + 1 == stack.length) {
function.exclusiveTicks = function.exclusiveTicks! + increment;
}
}
}
if (evicted != null) {
// If a sample is evicted from the cache, we need to decrement the tick
// counters for each function in the sample's stack.
updateTicksForSample(sample, -1);
// We also need to change the first timestamp to that of the next oldest
// sample.
_firstSampleTimestamp = call().first.timestamp!;
}
_lastSampleTimestamp = sample.timestamp!;
// Update function ticks to include the new sample.
updateTicksForSample(sample, 1);
return evicted;
}
Map<String, dynamic> toJson() {
return {
'type': 'CachedCpuSamples',
'userTag': tag,
'truncated': isTruncated,
if (functions.isNotEmpty) ...{
'samplePeriod': samplePeriod,
'maxStackDepth': maxStackDepth,
},
'timeOriginMicros': _firstSampleTimestamp,
'timeExtentMicros': _lastSampleTimestamp - _firstSampleTimestamp,
'functions': [
// TODO(bkonyi): remove functions with no ticks and update sample stacks.
for (final f in functions) f.toJson(),
],
'sampleCount': call().length,
'samples': [
for (final s in call()) s.toJson(),
]
};
}
/// The UserTag associated with all samples stored in this repository.
final String tag;
/// The list of function references with corresponding profiler tick data.
/// ** NOTE **: The tick values here need to be updated as new CpuSamples
/// events are delivered.
final functions = <ProfileFunction>[];
final idToFunctionIndex = <String, int>{};
/// Assume sample period and max stack depth won't change.
late final int samplePeriod;
late final int maxStackDepth;
late final int pid;
int _firstSampleTimestamp = 0;
int _lastSampleTimestamp = 0;
}

View file

@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'dart:math';
@ -54,6 +55,7 @@ class DartDevelopmentServiceImpl implements DartDevelopmentService {
this._remoteVmServiceUri,
this._uri,
this._authCodesEnabled,
this._cachedUserTags,
this._ipv6,
this._devToolsConfiguration,
this.shouldLogRequests,
@ -381,6 +383,9 @@ class DartDevelopmentServiceImpl implements DartDevelopmentService {
final DevToolsConfiguration? _devToolsConfiguration;
List<String> get cachedUserTags => UnmodifiableListView(_cachedUserTags);
final List<String> _cachedUserTags;
Future<void> get done => _done.future;
Completer _done = Completer<void>();
bool _shuttingDown = false;

View file

@ -2,7 +2,9 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:dds/src/cpu_samples_manager.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:vm_service/vm_service.dart';
import 'client.dart';
import 'constants.dart';
@ -35,7 +37,11 @@ enum _IsolateState {
}
class _RunningIsolate {
_RunningIsolate(this.isolateManager, this.id, this.name);
_RunningIsolate(this.isolateManager, this.id, this.name)
: cpuSamplesManager = CpuSamplesManager(
isolateManager.dds,
id,
);
// State setters.
void pausedOnExit() => _state = _IsolateState.pauseExit;
@ -103,6 +109,29 @@ class _RunningIsolate {
/// Should always be called after an isolate is resumed.
void clearResumeApprovals() => _resumeApprovalsByName.clear();
Map<String, dynamic> getCachedCpuSamples(String userTag) {
final repo = cpuSamplesManager.cpuSamplesCaches[userTag];
if (repo == null) {
throw json_rpc.RpcException.invalidParams(
'CPU sample caching is not enabled for tag: "$userTag"',
);
}
return repo.toJson();
}
void handleEvent(Event event) {
switch (event.kind) {
case EventKind.kUserTagChanged:
cpuSamplesManager.handleUserTagEvent(event);
return;
case EventKind.kCpuSamples:
cpuSamplesManager.handleCpuSamplesEvent(event);
return;
default:
return;
}
}
int get _isolateStateMask => isolateStateToMaskMapping[_state] ?? 0;
static const isolateStateToMaskMapping = {
@ -112,6 +141,7 @@ class _RunningIsolate {
};
final IsolateManager isolateManager;
final CpuSamplesManager cpuSamplesManager;
final String name;
final String id;
final Set<String?> _resumeApprovalsByName = {};
@ -122,20 +152,25 @@ class IsolateManager {
IsolateManager(this.dds);
/// Handles state changes for isolates.
void handleIsolateEvent(json_rpc.Parameters parameters) {
final event = parameters['event'];
final eventKind = event['kind'].asString;
void handleIsolateEvent(Event event) {
// There's no interesting information about isolate state associated with
// and IsolateSpawn event.
if (eventKind == ServiceEvents.isolateSpawn) {
// TODO(bkonyi): why isn't IsolateSpawn in package:vm_service
if (event.kind! == ServiceEvents.isolateSpawn) {
return;
}
final isolateData = event['isolate'];
final id = isolateData['id'].asString;
final name = isolateData['name'].asString;
_updateIsolateState(id, name, eventKind);
final isolateData = event.isolate!;
final id = isolateData.id!;
final name = isolateData.name!;
_updateIsolateState(id, name, event.kind!);
}
void routeEventToIsolate(Event event) {
final isolateId = event.isolate!.id!;
if (isolates.containsKey(isolateId)) {
isolates[isolateId]!.handleEvent(event);
}
}
void _updateIsolateState(String id, String name, String eventKind) {
@ -230,6 +265,16 @@ class IsolateManager {
return RPCResponses.success;
}
Map<String, dynamic> getCachedCpuSamples(json_rpc.Parameters parameters) {
final isolateId = parameters['isolateId'].asString;
if (!isolates.containsKey(isolateId)) {
return RPCResponses.collectedSentinel;
}
final isolate = isolates[isolateId]!;
final userTag = parameters['userTag'].asString;
return isolate.getCachedCpuSamples(userTag);
}
/// Forwards a `resume` request to the VM service.
Future<Map<String, dynamic>> _sendResumeRequest(
String isolateId,

View file

@ -2,17 +2,16 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:math';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'client.dart';
import 'common/ring_buffer.dart';
/// [LoggingRepository] is used to store historical log messages from the
/// target VM service. Clients which connect to DDS and subscribe to the
/// `Logging` stream will be sent all messages contained within this repository
/// upon initial subscription.
class LoggingRepository extends _RingBuffer<Map<String, dynamic>> {
class LoggingRepository extends RingBuffer<Map<String, dynamic>> {
LoggingRepository([int logHistoryLength = 10000]) : super(logHistoryLength) {
// TODO(bkonyi): enforce log history limit when DartDevelopmentService
// allows for this to be set via Dart code.
@ -46,53 +45,3 @@ class LoggingRepository extends _RingBuffer<Map<String, dynamic>> {
static const int _kMaxLogBufferSize = 100000;
}
// TODO(bkonyi): move to standalone file if we decide to use this elsewhere.
class _RingBuffer<T> {
_RingBuffer(this._bufferSize) {
_buffer = List<T?>.filled(
_bufferSize,
null,
);
}
Iterable<T> call() sync* {
for (int i = _size - 1; i >= 0; --i) {
yield _buffer[(_count - i - 1) % _bufferSize]!;
}
}
void add(T e) {
if (_buffer.isEmpty) {
return;
}
_buffer[_count++ % _bufferSize] = e;
}
void resize(int size) {
assert(size >= 0);
if (size == _bufferSize) {
return;
}
final resized = List<T?>.filled(
size,
null,
);
int count = 0;
if (size > 0) {
for (final e in this()) {
resized[count++ % size] = e;
}
}
_count = count;
_bufferSize = size;
_buffer = resized;
}
int get bufferSize => _bufferSize;
int get _size => min(_count, _bufferSize);
int _bufferSize;
int _count = 0;
late List<T?> _buffer;
}

View file

@ -5,6 +5,7 @@
import 'dart:typed_data';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:vm_service/vm_service.dart';
import 'client.dart';
import 'dds_impl.dart';
@ -107,18 +108,31 @@ class StreamManager {
// Stdout and Stderr streams may not exist.
}
}
if (dds.cachedUserTags.isNotEmpty) {
await streamListen(null, EventStreams.kProfiler);
}
dds.vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
(json_rpc.Parameters parameters) {
final streamId = parameters['streamId'].asString;
final event =
Event.parse(parameters['event'].asMap.cast<String, dynamic>())!;
// Forward events from the streams IsolateManager subscribes to.
if (isolateManagerStreams.contains(streamId)) {
dds.isolateManager.handleIsolateEvent(parameters);
dds.isolateManager.handleIsolateEvent(event);
}
// Keep a history of messages to send to clients when they first
// subscribe to a stream with an event history.
if (loggingRepositories.containsKey(streamId)) {
loggingRepositories[streamId]!.add(parameters.asMap);
loggingRepositories[streamId]!.add(
parameters.asMap.cast<String, dynamic>(),
);
}
// If the event contains an isolate, forward the event to the
// corresponding isolate to be handled.
if (event.isolate != null) {
dds.isolateManager.routeEventToIsolate(event);
}
streamNotify(streamId, parameters.value);
},
@ -251,6 +265,7 @@ class StreamManager {
static const kExtensionStream = 'Extension';
static const kIsolateStream = 'Isolate';
static const kLoggingStream = 'Logging';
static const kProfilerStream = 'Profiler';
static const kStderrStream = 'Stderr';
static const kStdoutStream = 'Stdout';
@ -272,10 +287,17 @@ class StreamManager {
kStdoutStream,
};
// Never cancel the profiler stream as `CpuSampleRepository` requires
// `UserTagChanged` events to enable/disable sample caching.
static const cpuSampleRepositoryStreams = <String>{
kProfilerStream,
};
// The set of streams that DDS requires to function.
static final ddsCoreStreams = <String>{
...isolateManagerStreams,
...loggingRepositoryStreams,
...cpuSampleRepositoryStreams,
};
final DartDevelopmentServiceImpl dds;

View file

@ -13,18 +13,46 @@ extension DdsExtension on VmService {
static bool _factoriesRegistered = false;
static Version? _ddsVersion;
/// The _getDartDevelopmentServiceVersion_ RPC is used to determine what version of
/// The [getDartDevelopmentServiceVersion] RPC is used to determine what version of
/// the Dart Development Service Protocol is served by a DDS instance.
///
/// The result of this call is cached for subsequent invocations.
Future<Version> getDartDevelopmentServiceVersion() async {
if (_ddsVersion == null) {
_ddsVersion =
await _callHelper<Version>('getDartDevelopmentServiceVersion');
_ddsVersion = await _callHelper<Version>(
'getDartDevelopmentServiceVersion',
);
}
return _ddsVersion!;
}
/// The [getCachedCpuSamples] RPC is used to retrieve a cache of CPU samples
/// collected under a [UserTag] with name `userTag`.
Future<CachedCpuSamples> getCachedCpuSamples(
String isolateId, String userTag) async {
if (!(await _versionCheck(1, 3))) {
throw UnimplementedError('getCachedCpuSamples requires DDS version 1.3');
}
return _callHelper<CachedCpuSamples>('getCachedCpuSamples', args: {
'isolateId': isolateId,
'userTag': userTag,
});
}
/// The [getAvailableCachedCpuSamples] RPC is used to determine which caches of CPU samples
/// are available. Caches are associated with individual [UserTag] names and are specified
/// when DDS is started via the `cachedUserTags` parameter.
Future<AvailableCachedCpuSamples> getAvailableCachedCpuSamples() async {
if (!(await _versionCheck(1, 3))) {
throw UnimplementedError(
'getAvailableCachedCpuSamples requires DDS version 1.3',
);
}
return _callHelper<AvailableCachedCpuSamples>(
'getAvailableCachedCpuSamples',
);
}
/// Retrieve the event history for `stream`.
///
/// If `stream` does not have event history collected, a parameter error is
@ -126,6 +154,11 @@ extension DdsExtension on VmService {
static void _registerFactories() {
addTypeFactory('StreamHistory', StreamHistory.parse);
addTypeFactory(
'AvailableCachedCpuSamples',
AvailableCachedCpuSamples.parse,
);
addTypeFactory('CachedCpuSamples', CachedCpuSamples.parse);
_factoriesRegistered = true;
}
}
@ -154,3 +187,86 @@ class StreamHistory extends Response {
List<Event> get history => UnmodifiableListView(_history);
final List<Event> _history;
}
/// An extension of [CpuSamples] which represents a set of cached samples,
/// associated with a particular [UserTag] name.
class CachedCpuSamples extends CpuSamples {
static CachedCpuSamples? parse(Map<String, dynamic>? json) =>
json == null ? null : CachedCpuSamples._fromJson(json);
CachedCpuSamples({
required this.userTag,
this.truncated,
required int? samplePeriod,
required int? maxStackDepth,
required int? sampleCount,
required int? timeSpan,
required int? timeOriginMicros,
required int? timeExtentMicros,
required int? pid,
required List<ProfileFunction>? functions,
required List<CpuSample>? samples,
}) : super(
samplePeriod: samplePeriod,
maxStackDepth: maxStackDepth,
sampleCount: sampleCount,
timeSpan: timeSpan,
timeOriginMicros: timeOriginMicros,
timeExtentMicros: timeExtentMicros,
pid: pid,
functions: functions,
samples: samples,
);
CachedCpuSamples._fromJson(Map<String, dynamic> json)
: userTag = json['userTag']!,
truncated = json['truncated'],
super(
samplePeriod: json['samplePeriod'] ?? -1,
maxStackDepth: json['maxStackDepth'] ?? -1,
sampleCount: json['sampleCount'] ?? -1,
timeSpan: json['timeSpan'] ?? -1,
timeOriginMicros: json['timeOriginMicros'] ?? -1,
timeExtentMicros: json['timeExtentMicros'] ?? -1,
pid: json['pid'] ?? -1,
functions: List<ProfileFunction>.from(
createServiceObject(json['functions'], const ['ProfileFunction'])
as List? ??
[],
),
samples: List<CpuSample>.from(
createServiceObject(json['samples'], const ['CpuSample'])
as List? ??
[],
),
);
@override
String get type => 'CachedCpuSamples';
/// The name of the [UserTag] associated with this cache of [CpuSamples].
final String userTag;
/// Provided if the CPU sample cache has filled and older samples have been
/// dropped.
final bool? truncated;
}
/// A collection of [UserTag] names associated with caches of CPU samples.
class AvailableCachedCpuSamples extends Response {
static AvailableCachedCpuSamples? parse(Map<String, dynamic>? json) =>
json == null ? null : AvailableCachedCpuSamples._fromJson(json);
AvailableCachedCpuSamples({
required this.cacheNames,
});
AvailableCachedCpuSamples._fromJson(Map<String, dynamic> json)
: cacheNames = List<String>.from(json['cacheNames']);
@override
String get type => 'AvailableCachedUserTagCpuSamples';
/// A [List] of [UserTag] names associated with CPU sample caches.
final List<String> cacheNames;
}

View file

@ -3,12 +3,12 @@ description: >-
A library used to spawn the Dart Developer Service, used to communicate with
a Dart VM Service instance.
version: 2.0.2
version: 2.1.0
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds
environment:
sdk: '>=2.12.0 <3.0.0'
sdk: '>=2.14.0-0 <3.0.0'
dependencies:
async: ^2.4.1
@ -25,7 +25,7 @@ dependencies:
sse: ^4.0.0
stream_channel: ^2.0.0
usage: ^4.0.0
vm_service: ^7.0.0
vm_service: ^7.2.0
web_socket_channel: ^2.0.0
dev_dependencies:

View file

@ -24,6 +24,7 @@ Future<Process> spawnDartProcess(
'--observe=0',
if (pauseOnStart) '--pause-isolates-on-start',
'--write-service-info=$serviceInfoUri',
'--sample-buffer-duration=1',
...Platform.executableArguments,
Platform.script.resolve(script).toString(),
];

View file

@ -0,0 +1,21 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:developer';
fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}
void main() {
UserTag('Testing').makeCurrent();
int i = 5;
while (true) {
++i;
fib(i);
}
}

View file

@ -0,0 +1,112 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:dds/dds.dart';
import 'package:dds/vm_service_extensions.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
import 'common/test_helper.dart';
void main() {
late Process process;
late DartDevelopmentService dds;
setUp(() async {
process = await spawnDartProcess(
'get_cached_cpu_samples_script.dart',
);
});
tearDown(() async {
await dds.shutdown();
process.kill();
});
test(
'No UserTags to cache',
() async {
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
);
expect(dds.isRunning, true);
final service = await vmServiceConnectUri(dds.wsUri.toString());
// We didn't provide `cachedUserTags` when starting DDS, so we shouldn't
// be caching anything.
final availableCaches = await service.getAvailableCachedCpuSamples();
expect(availableCaches.cacheNames.length, 0);
final isolate = (await service.getVM()).isolates!.first;
try {
await service.getCachedCpuSamples(isolate.id!, 'Fake');
fail('Invalid userTag did not cause an exception');
} on RPCError catch (e) {
expect(
e.message,
'CPU sample caching is not enabled for tag: "Fake"',
);
}
},
timeout: Timeout.none,
);
test(
'Cache CPU samples for provided UserTag name',
() async {
const kUserTag = 'Testing';
dds = await DartDevelopmentService.startDartDevelopmentService(
remoteVmServiceUri,
cachedUserTags: [kUserTag],
);
expect(dds.isRunning, true);
final service = await vmServiceConnectUri(dds.wsUri.toString());
// Ensure we're caching results for samples under the 'Testing' UserTag.
final availableCaches = await service.getAvailableCachedCpuSamples();
expect(availableCaches.cacheNames.length, 1);
expect(availableCaches.cacheNames.first, kUserTag);
final isolate = (await service.getVM()).isolates!.first;
final completer = Completer<void>();
int i = 0;
int count = 0;
service.onProfilerEvent.listen((event) async {
if (event.kind == EventKind.kCpuSamples &&
event.isolate!.id! == isolate.id!) {
// Pause so we don't evict another block of samples before we've
// retrieved the cached samples after this event.
await service.pause(isolate.id!);
// Ensure the number of CPU samples in the CpuSample event is
// is consistent with the number of samples in the cache.
expect(event.cpuSamples, isNotNull);
count += event.cpuSamples!.samples!
.where((e) => e.userTag == kUserTag)
.length;
final cache = await service.getCachedCpuSamples(
isolate.id!,
availableCaches.cacheNames.first,
);
expect(cache.sampleCount, count);
await service.resume(isolate.id!);
i++;
if (i == 3) {
completer.complete();
}
}
});
await service.streamListen(EventStreams.kProfiler);
await service.resume(isolate.id!);
await completer.future;
},
timeout: Timeout.none,
);
}

View file

@ -1,5 +1,9 @@
# Changelog
## 7.2.0
- Update to version `3.49` of the spec.
- Added `CpuSamples` event kind, and `cpuSamples` property to `Event`.
## 7.1.1
- Update to version `3.48` of the spec.
- Added `shows` and `hides` properties to `LibraryDependency`.

View file

@ -3,7 +3,7 @@ description: >-
A library to communicate with a service implementing the Dart VM
service protocol.
version: 7.1.1
version: 7.2.0
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/vm_service

View file

@ -2500,6 +2500,25 @@ void Isolate::Shutdown() {
ServiceIsolate::SendIsolateShutdownMessage();
#if !defined(PRODUCT)
debugger()->Shutdown();
// Cleanup profiler state.
SampleBlock* cpu_block = current_sample_block();
if (cpu_block != nullptr) {
cpu_block->release_block();
}
SampleBlock* allocation_block = current_allocation_sample_block();
if (allocation_block != nullptr) {
allocation_block->release_block();
}
// Process the previously assigned sample blocks if we're using the
// profiler's sample buffer. Some tests create their own SampleBlockBuffer
// and handle block processing themselves.
if ((cpu_block != nullptr || allocation_block != nullptr) &&
Profiler::sample_block_buffer() != nullptr) {
StackZone zone(thread);
HandleScope handle_scope(thread);
Profiler::sample_block_buffer()->ProcessCompletedBlocks();
}
#endif
}
@ -2558,26 +2577,6 @@ void Isolate::LowLevelCleanup(Isolate* isolate) {
// requests anymore.
Thread::ExitIsolate();
#if !defined(PRODUCT)
// Cleanup profiler state.
SampleBlock* cpu_block = isolate->current_sample_block();
if (cpu_block != nullptr) {
cpu_block->release_block();
}
SampleBlock* allocation_block = isolate->current_allocation_sample_block();
if (allocation_block != nullptr) {
allocation_block->release_block();
}
// Process the previously assigned sample blocks if we're using the
// profiler's sample buffer. Some tests create their own SampleBlockBuffer
// and handle block processing themselves.
if ((cpu_block != nullptr || allocation_block != nullptr) &&
Profiler::sample_block_buffer() != nullptr) {
Profiler::sample_block_buffer()->ProcessCompletedBlocks();
}
#endif // !defined(PRODUCT)
// Now it's safe to delete the isolate.
delete isolate;

View file

@ -226,7 +226,9 @@ void SampleBlockBuffer::ProcessCompletedBlocks() {
int64_t start = Dart_TimelineGetMicros();
for (intptr_t i = 0; i < capacity_; ++i) {
SampleBlock* block = &blocks_[i];
if (block->is_full() && !block->evictable()) {
// Only evict blocks owned by the current thread.
if (block->owner() == thread->isolate() && block->is_full() &&
!block->evictable()) {
if (Service::profiler_stream.enabled()) {
Profile profile(block->owner());
profile.Build(thread, nullptr, block);
@ -327,8 +329,13 @@ Sample* SampleBlockBuffer::ReserveSampleImpl(Isolate* isolate,
isolate->set_current_sample_block(next);
}
next->set_is_allocation_block(allocation_sample);
can_process_block_.store(true);
isolate->mutator_thread()->ScheduleInterrupts(Thread::kVMInterrupt);
bool scheduled = can_process_block_.exchange(true);
// We don't process samples on the kernel isolate.
if (!isolate->is_kernel_isolate() &&
!isolate->is_service_isolate() &&
!scheduled) {
isolate->mutator_thread()->ScheduleInterrupts(Thread::kVMInterrupt);
}
return ReserveSampleImpl(isolate, allocation_sample);
}

View file

@ -450,11 +450,15 @@ ErrorPtr Thread::HandleInterrupts() {
}
#if !defined(PRODUCT)
// Processes completed SampleBlocks and sends CPU sample events over the
// service protocol when applicable.
SampleBlockBuffer* sample_buffer = Profiler::sample_block_buffer();
if (sample_buffer != nullptr && sample_buffer->process_blocks()) {
sample_buffer->ProcessCompletedBlocks();
// Don't block the kernel isolate to process CPU samples as we can
// potentially deadlock when trying to compile source for the main isolate.
if (!isolate()->is_kernel_isolate() && !isolate()->is_service_isolate()) {
// Processes completed SampleBlocks and sends CPU sample events over the
// service protocol when applicable.
SampleBlockBuffer* sample_buffer = Profiler::sample_block_buffer();
if (sample_buffer != nullptr && sample_buffer->process_blocks()) {
sample_buffer->ProcessCompletedBlocks();
}
}
#endif // !defined(PRODUCT)
}