[ DDS ] Keep event history for Stdout, Stderr, and Extension streams

Change-Id: I14ce732cd408ece1da2801887cac3b518bc36af6
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/169146
Reviewed-by: Kenzie Schmoll <kenzieschmoll@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
This commit is contained in:
Ben Konyi 2020-11-03 19:09:47 +00:00 committed by commit-bot@chromium.org
parent 090e234172
commit 892cc93f26
12 changed files with 312 additions and 38 deletions

View file

@ -1,3 +1,8 @@
# 1.5.0
- Added event caching for `Stdout`, `Stderr`, and `Extension` streams. When a
client subscribes to one of these streams, they will be sent up to 10,000
historical events from the stream.
# 1.4.1
- Fixed issue where `evaluate` and `evaluateInFrame` requests were not being
forwarded to the VM service properly when no external compilation service

View file

@ -47,6 +47,22 @@ See the corresponding section in the VM Service protocol [here][service-protocol
See the corresponding section in the VM Service protocol [here][service-protocol-ids-and-names].
## Streams
For a list of core VM service streams, see [streamListen][service-protocol-streams].
DDS will keep a history of events on certain streams and send historical events
when a client first subscribes to a stream with history. These streams currently
consist of the following:
- `Logging`
- `Stdout`
- `Stderr`
- `Extension`
In addition, subscribing to the `Service` stream will result in a `ServiceRegistered`
event being sent to the subscribing client for each existing service extension.
## Public RPCs
The DDS Protocol supports all [public RPCs defined in the VM Service protocol][service-protocol-public-rpcs].
@ -187,6 +203,7 @@ version | comments
[service-protocol]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md
[service-protocol-rpcs-requests-and-responses]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#rpcs-requests-and-responses
[service-protocol-events]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#events
[service-protocol-streams]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#streamlisten
[service-protocol-binary-events]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#binary-events
[service-protocol-types]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#types
[service-protocol-ids-and-names]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md#ids-and-names

View file

@ -140,7 +140,9 @@ class _DartDevelopmentServiceClient {
'getLogHistorySize',
(parameters) => {
'type': 'Size',
'size': dds.loggingRepository.bufferSize,
'size': _StreamManager
.loggingRepositories[_StreamManager.kLoggingStream]
.bufferSize,
});
_clientPeer.registerMethod('setLogHistorySize', (parameters) {
@ -150,7 +152,8 @@ class _DartDevelopmentServiceClient {
"'size' must be greater or equal to zero",
);
}
dds.loggingRepository.resize(size);
_StreamManager.loggingRepositories[_StreamManager.kLoggingStream]
.resize(size);
return _RPCResponses.success;
});

View file

@ -10,7 +10,6 @@ class _DartDevelopmentService implements DartDevelopmentService {
_clientManager = _ClientManager(this);
_expressionEvaluator = _ExpressionEvaluator(this);
_isolateManager = _IsolateManager(this);
_loggingRepository = _LoggingRepository();
_streamManager = _StreamManager(this);
_authCode = _authCodesEnabled ? _makeAuthToken() : '';
}
@ -270,9 +269,6 @@ class _DartDevelopmentService implements DartDevelopmentService {
_IsolateManager get isolateManager => _isolateManager;
_IsolateManager _isolateManager;
_LoggingRepository get loggingRepository => _loggingRepository;
_LoggingRepository _loggingRepository;
_StreamManager get streamManager => _streamManager;
_StreamManager _streamManager;

View file

@ -91,7 +91,14 @@ class _StreamManager {
// The _IsolateManager requires information from both the Debug and
// Isolate streams, so they must always be subscribed to by DDS.
for (final stream in ddsCoreStreams) {
await streamListen(null, stream);
try {
await streamListen(null, stream);
if (loggingRepositoryStreams.contains(stream)) {
loggingRepositories[stream] = _LoggingRepository();
}
} on json_rpc.RpcException {
// Stdout and Stderr streams may not exist.
}
}
dds._vmServiceClient.registerMethod(
'streamNotify',
@ -101,10 +108,10 @@ class _StreamManager {
if (isolateManagerStreams.contains(streamId)) {
dds.isolateManager.handleIsolateEvent(parameters);
}
// Keep a history of log messages to send to clients when they first
// subscribe to the Logging stream.
if (streamId == kLoggingStream) {
dds.loggingRepository.add(parameters.asMap);
// Keep a history of messages to send to clients when they first
// subscribe to a stream with an event history.
if (loggingRepositories.containsKey(streamId)) {
loggingRepositories[streamId].add(parameters.asMap);
}
streamNotify(streamId, parameters.value);
},
@ -137,8 +144,8 @@ class _StreamManager {
}
if (client != null) {
streamListeners[stream].add(client);
if (stream == kLoggingStream) {
dds.loggingRepository.sendHistoricalLogs(client);
if (loggingRepositories.containsKey(stream)) {
loggingRepositories[stream].sendHistoricalLogs(client);
} else if (stream == kServiceStream) {
// Send all previously registered service extensions when a client
// subscribes to the Service stream.
@ -223,8 +230,13 @@ class _StreamManager {
);
static const kDebugStream = 'Debug';
static const kExtensionStream = 'Extension';
static const kIsolateStream = 'Isolate';
static const kLoggingStream = 'Logging';
static const kStderrStream = 'Stderr';
static const kStdoutStream = 'Stdout';
static Map<String, _LoggingRepository> loggingRepositories = {};
// Never cancel the Debug or Isolate stream as `_IsolateManager` requires
// them for isolate state notifications.
@ -233,10 +245,13 @@ class _StreamManager {
kIsolateStream,
};
// Never cancel the Logging stream as `_LoggingRepository` requires it to
// keep a log history.
// Never cancel the logging and extension event streams as `_LoggingRepository`
// requires them keep history.
static const loggingRepositoryStreams = <String>{
kExtensionStream,
kLoggingStream,
kStderrStream,
kStdoutStream,
};
// The set of streams that DDS requires to function.

View file

@ -3,7 +3,7 @@ description: >-
A library used to spawn the Dart Developer Service, used to communicate with
a Dart VM Service instance.
version: 1.4.1
version: 1.5.0
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds

View file

@ -0,0 +1,52 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:developer';
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'client_resume_approvals_common.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
Future testMain() async {
// Post a total of 9 events
for (int i = 1; i <= 9; ++i) {
postEvent('Test', {
'id': i,
});
}
}
var tests = <IsolateTest>[
isPausedAtStart,
resumeIsolate,
(Isolate isolate) async {
final completer = Completer<void>();
int i = 1;
await subscribeToStream(isolate.vm, 'Extension', (event) async {
expect(event.extensionKind, 'Test');
expect(event.extensionData!['id'], i);
i++;
if (i == 10) {
await cancelStreamSubscription('Extension');
completer.complete();
} else if (i > 10) {
fail('Too many log messages');
}
});
await completer.future;
},
];
main(args) => runIsolateTests(
args,
tests,
enableService: false, // DDS specific feature
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
);

View file

@ -0,0 +1,61 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:observatory/service_io.dart';
import 'package:test/test.dart';
import 'client_resume_approvals_common.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
Future testMain() async {
// Log a total of 9 messages
for (int i = 1; i <= 9; ++i) {
print('Stdout log$i');
stderr.writeln('Stderr log$i');
}
}
Future streamHistoryTest(Isolate isolate, String stream) async {
final completer = Completer<void>();
int i = 1;
await subscribeToStream(isolate.vm, stream, (event) async {
// Newlines are sent as separate events for some reason. Ignore them.
if (!event.bytesAsString!.startsWith(stream)) {
return;
}
expect(event.bytesAsString, '$stream log$i');
i++;
if (i == 10) {
await cancelStreamSubscription(stream);
completer.complete();
} else if (i > 10) {
fail('Too many log messages');
}
});
await completer.future;
}
var tests = <IsolateTest>[
isPausedAtStart,
resumeIsolate,
(Isolate isolate) async {
await streamHistoryTest(isolate, 'Stdout');
},
(Isolate isolate) async {
await streamHistoryTest(isolate, 'Stderr');
},
];
main(args) => runIsolateTests(
args,
tests,
enableService: false, // DDS specific feature
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
);

View file

@ -56,11 +56,14 @@ Future<void> socketTest() async {
postEvent('socketTest', {'socket': 'test'});
}
void checkFinishEvent(ServiceEvent event) {
bool checkFinishEvent(ServiceEvent event) {
expect(event.kind, equals(ServiceEvent.kExtension));
expect(event.extensionKind, equals('socketTest'));
if (event.extensionKind != 'socketTest') {
return false;
}
expect(event.extensionData, isA<Map>());
expect(event.extensionData!['socket'], equals('test'));
return true;
}
var tests = <IsolateTest>[
@ -102,9 +105,10 @@ var tests = <IsolateTest>[
var sub;
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",
@ -150,9 +154,10 @@ var tests = <IsolateTest>[
completer = Completer();
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",
{"targetId": lib.id, "selector": "socketTest", "argumentIds": []});
@ -197,9 +202,10 @@ var tests = <IsolateTest>[
var sub;
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",

View file

@ -0,0 +1,52 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:developer';
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'client_resume_approvals_common.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
Future testMain() async {
// Post a total of 9 events
for (int i = 1; i <= 9; ++i) {
postEvent('Test', {
'id': i,
});
}
}
var tests = <IsolateTest>[
isPausedAtStart,
resumeIsolate,
(Isolate isolate) async {
final completer = Completer<void>();
int i = 1;
await subscribeToStream(isolate.vm, 'Extension', (event) async {
expect(event.extensionKind, 'Test');
expect(event.extensionData['id'], i);
i++;
if (i == 10) {
await cancelStreamSubscription('Extension');
completer.complete();
} else if (i > 10) {
fail('Too many log messages');
}
});
await completer.future;
},
];
main(args) => runIsolateTests(
args,
tests,
enableService: false, // DDS specific feature
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
);

View file

@ -0,0 +1,61 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:observatory_2/service_io.dart';
import 'package:test/test.dart';
import 'client_resume_approvals_common.dart';
import 'service_test_common.dart';
import 'test_helper.dart';
Future testMain() async {
// Log a total of 9 messages
for (int i = 1; i <= 9; ++i) {
print('Stdout log$i');
stderr.writeln('Stderr log$i');
}
}
Future streamHistoryTest(Isolate isolate, String stream) async {
final completer = Completer<void>();
int i = 1;
await subscribeToStream(isolate.vm, stream, (event) async {
// Newlines are sent as separate events for some reason. Ignore them.
if (!event.bytesAsString.startsWith(stream)) {
return;
}
expect(event.bytesAsString, '$stream log$i');
i++;
if (i == 10) {
await cancelStreamSubscription(stream);
completer.complete();
} else if (i > 10) {
fail('Too many log messages');
}
});
await completer.future;
}
var tests = <IsolateTest>[
isPausedAtStart,
resumeIsolate,
(Isolate isolate) async {
await streamHistoryTest(isolate, 'Stdout');
},
(Isolate isolate) async {
await streamHistoryTest(isolate, 'Stderr');
},
];
main(args) => runIsolateTests(
args,
tests,
enableService: false, // DDS specific feature
testeeConcurrent: testMain,
pause_on_start: true,
pause_on_exit: true,
);

View file

@ -56,11 +56,14 @@ Future<void> socketTest() async {
postEvent('socketTest', {'socket': 'test'});
}
Future<void> checkFinishEvent(ServiceEvent event) {
bool checkFinishEvent(ServiceEvent event) {
expect(event.kind, equals(ServiceEvent.kExtension));
expect(event.extensionKind, equals('socketTest'));
if (event.extensionKind != 'socketTest') {
return false;
}
expect(event.extensionData, isA<Map>());
expect(event.extensionData['socket'], equals('test'));
return true;
}
var tests = <IsolateTest>[
@ -102,9 +105,10 @@ var tests = <IsolateTest>[
var sub;
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",
@ -150,9 +154,10 @@ var tests = <IsolateTest>[
completer = Completer();
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",
{"targetId": lib.id, "selector": "socketTest", "argumentIds": []});
@ -197,9 +202,10 @@ var tests = <IsolateTest>[
var sub;
sub = await isolate.vm.listenEventStream(Isolate.kExtensionStream,
(ServiceEvent event) {
checkFinishEvent(event);
sub.cancel();
completer.complete();
if (checkFinishEvent(event)) {
sub.cancel();
completer.complete();
}
});
dynamic result = await isolate.invokeRpc("invoke",