Migrate lib/src/channel/

R=brianwilkerson@google.com

Change-Id: I2748f92589ef9501325535d79188fdbb103130f9
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/194118
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
This commit is contained in:
Konstantin Shcheglov 2021-04-06 23:16:43 +00:00 committed by commit-bot@chromium.org
parent ad498f34dc
commit 43b5d134b3
5 changed files with 85 additions and 104 deletions

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -17,7 +15,6 @@ import 'package:analyzer/instrumentation/instrumentation.dart';
/// [ClientCommunicationChannel] that uses a stream and a sink (typically,
/// standard input and standard output) to communicate with servers.
class ByteStreamClientChannel implements ClientCommunicationChannel {
final Stream input;
final IOSink output;
@override
@ -26,23 +23,37 @@ class ByteStreamClientChannel implements ClientCommunicationChannel {
@override
Stream<Notification> notificationStream;
ByteStreamClientChannel(this.input, this.output) {
Stream jsonStream = input
factory ByteStreamClientChannel(Stream<List<int>> input, IOSink output) {
var jsonStream = input
.transform(const Utf8Decoder())
.transform(LineSplitter())
.transform(JsonStreamDecoder())
.where((json) => json is Map)
.where((json) => json is Map<String, Object?>)
.cast<Map<String, Object?>>()
.asBroadcastStream();
responseStream = jsonStream
var responseStream = jsonStream
.where((json) => json[Notification.EVENT] == null)
.transform(ResponseConverter())
.where((response) => response != null)
.cast<Response>()
.asBroadcastStream();
notificationStream = jsonStream
var notificationStream = jsonStream
.where((json) => json[Notification.EVENT] != null)
.transform(NotificationConverter())
.asBroadcastStream();
return ByteStreamClientChannel._(
output,
responseStream,
notificationStream,
);
}
ByteStreamClientChannel._(
this.output,
this.responseStream,
this.notificationStream,
);
@override
Future close() {
return output.close();
@ -69,7 +80,7 @@ class ByteStreamServerChannel implements ServerCommunicationChannel {
final InstrumentationService _instrumentationService;
/// The helper for recording request / response statistics.
final RequestStatisticsHelper _requestStatistics;
final RequestStatisticsHelper? _requestStatistics;
/// Completer that will be signalled when the input stream is closed.
final Completer _closed = Completer();
@ -79,7 +90,7 @@ class ByteStreamServerChannel implements ServerCommunicationChannel {
ByteStreamServerChannel(
this._input, this._output, this._instrumentationService,
{RequestStatisticsHelper requestStatistics})
{RequestStatisticsHelper? requestStatistics})
: _requestStatistics = requestStatistics {
_requestStatistics?.serverChannel = this;
}
@ -100,12 +111,12 @@ class ByteStreamServerChannel implements ServerCommunicationChannel {
@override
void listen(void Function(Request request) onRequest,
{Function onError, void Function() onDone}) {
{Function? onError, void Function()? onDone}) {
_input.transform(const Utf8Decoder()).transform(LineSplitter()).listen(
(String data) => _readRequest(data, onRequest),
onError: onError, onDone: () {
close();
onDone();
onDone?.call();
});
}
@ -148,7 +159,7 @@ class ByteStreamServerChannel implements ServerCommunicationChannel {
/// Read a request from the given [data] and use the given function to handle
/// the request.
void _readRequest(Object data, void Function(Request request) onRequest) {
void _readRequest(String data, void Function(Request request) onRequest) {
// Ignore any further requests after the communication channel is closed.
if (_closed.isCompleted) {
return;

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:convert';
import 'package:analysis_server/protocol/protocol.dart';
@ -46,10 +44,10 @@ class ChannelChunkSink<S, T> extends ChunkedConversionSink<S> {
/// receive both [Response]s and [Notification]s.
abstract class ClientCommunicationChannel {
/// The stream of notifications from the server.
Stream<Notification> notificationStream;
Stream<Notification> get notificationStream;
/// The stream of responses from the server.
Stream<Response> responseStream;
Stream<Response> get responseStream;
/// Close the channel to the server. Once called, all future communication
/// with the server via [sendRequest] will silently be ignored.
@ -60,36 +58,40 @@ abstract class ClientCommunicationChannel {
Future<Response> sendRequest(Request request);
}
/// Instances of the class [JsonStreamDecoder] convert JSON strings to JSON
/// maps.
class JsonStreamDecoder extends Converter<String, Map> {
/// Instances of the class [JsonStreamDecoder] convert JSON strings to values.
class JsonStreamDecoder extends Converter<String, Object?> {
@override
Map convert(String text) => json.decode(text);
Object? convert(String text) => json.decode(text);
@override
ChunkedConversionSink<String> startChunkedConversion(Sink<Map> sink) =>
ChannelChunkSink<String, Map>(this, sink);
ChunkedConversionSink<String> startChunkedConversion(Sink<Object?> sink) =>
ChannelChunkSink<String, Object?>(this, sink);
}
/// Instances of the class [NotificationConverter] convert JSON maps to
/// [Notification]s.
class NotificationConverter extends Converter<Map, Notification> {
class NotificationConverter
extends Converter<Map<String, Object?>, Notification> {
@override
Notification convert(Map json) => Notification.fromJson(json);
@override
ChunkedConversionSink<Map> startChunkedConversion(Sink<Notification> sink) =>
ChannelChunkSink<Map, Notification>(this, sink);
ChunkedConversionSink<Map<String, Object?>> startChunkedConversion(
Sink<Notification> sink) =>
ChannelChunkSink<Map<String, Object?>, Notification>(this, sink);
}
/// Instances of the class [ResponseConverter] convert JSON maps to [Response]s.
class ResponseConverter extends Converter<Map, Response> {
class ResponseConverter extends Converter<Map<String, Object?>, Response?> {
@override
Response convert(Map json) => Response.fromJson(json);
Response? convert(Map<String, Object?> json) => Response.fromJson(json);
@override
ChunkedConversionSink<Map> startChunkedConversion(Sink<Response> sink) =>
ChannelChunkSink<Map, Response>(this, sink);
ChunkedConversionSink<Map<String, Object?>> startChunkedConversion(
Sink<Response?> sink,
) {
return ChannelChunkSink<Map<String, Object?>, Response?>(this, sink);
}
}
/// The abstract class [ServerCommunicationChannel] defines the behavior of

View file

@ -2,9 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:convert';
import 'dart:io';
import 'package:analysis_server/protocol/protocol.dart';
@ -20,17 +17,16 @@ class RequestStatisticsHelper {
final Map<String, _RequestStatistics> _statisticsMap = {};
/// The [StringSink] to which performance logger should copy its output.
_ServerLogStringSink _perfLoggerStringSink;
late final _ServerLogStringSink _perfLoggerStringSink =
_ServerLogStringSink(this);
/// The channel to send 'server.log' notifications to.
ByteStreamServerChannel _serverChannel;
ByteStreamServerChannel? _serverChannel;
/// Is `true` if the client subscribed for "server.log" notification.
bool _isNotificationSubscribed = false;
RequestStatisticsHelper() {
_perfLoggerStringSink = _ServerLogStringSink(this);
}
RequestStatisticsHelper();
/// Set whether the client subscribed for "server.log" notification.
set isNotificationSubscribed(bool value) {
@ -93,8 +89,8 @@ class RequestStatisticsHelper {
var id = response.id;
var stat = _statisticsMap.remove(id);
if (stat != null) {
stat.responseTime = DateTime.now();
_sendLogEntry(ServerLogEntryKind.RESPONSE, stat.toJson());
var responseTime = DateTime.now();
_sendLogEntry(ServerLogEntryKind.RESPONSE, stat.toJson(responseTime));
}
}
@ -110,6 +106,11 @@ class RequestStatisticsHelper {
return;
}
var params = notification.params;
if (params == null) {
return;
}
var map = <String, Object>{
'event': event,
};
@ -119,13 +120,13 @@ class RequestStatisticsHelper {
event == 'analysis.navigation' ||
event == 'analysis.outline' ||
event == 'analysis.overrides') {
map['file'] = notification.params['file'];
map['file'] = params['file'] as String;
}
if (event == 'server.status') {
var analysis = notification.params['analysis'];
if (analysis is Map<String, Object>) {
map['isAnalyzing'] = analysis['isAnalyzing'];
var analysis = params['analysis'];
if (analysis is Map<String, Object?>) {
map['isAnalyzing'] = analysis['isAnalyzing'] as bool;
}
}
@ -163,9 +164,11 @@ class RequestStatisticsHelper {
void _sendLogEntry(ServerLogEntryKind kind, Object data) {
if (!_isNotificationSubscribed) return;
if (_serverChannel == null) return;
_serverChannel.sendNotification(
var serverChannel = _serverChannel;
if (serverChannel == null) return;
serverChannel.sendNotification(
Notification(
'server.log',
<String, Object>{
@ -185,7 +188,6 @@ class _RequestStatistics {
final DateTime clientRequestTime;
final DateTime serverRequestTime;
final List<_RequestStatisticsItem> items = [];
DateTime responseTime;
_RequestStatistics(
this.id,
@ -194,7 +196,7 @@ class _RequestStatistics {
this.serverRequestTime,
);
Map<String, Object> toJson() {
Map<String, Object> toJson(DateTime responseTime) {
var map = {
'id': id,
'method': method,
@ -207,21 +209,16 @@ class _RequestStatistics {
}
return map;
}
@override
String toString() {
var map = toJson();
return json.encode(map);
}
}
class _RequestStatisticsItem {
final String name;
final DateTime timeValue;
final DateTime? timeValue;
_RequestStatisticsItem(this.name, {this.timeValue});
Map<String, Object> toJson() {
var timeValue = this.timeValue;
if (timeValue != null) {
return {
'name': name,
@ -238,7 +235,7 @@ class _ServerLogStringSink implements StringSink {
_ServerLogStringSink(this.helper);
@override
void write(Object obj) {
void write(Object? obj) {
throw UnimplementedError();
}
@ -253,7 +250,7 @@ class _ServerLogStringSink implements StringSink {
}
@override
void writeln([Object obj = '']) {
void writeln([Object? obj = '']) {
helper._sendLogEntry(ServerLogEntryKind.RAW, '$obj');
}
}

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -23,17 +21,17 @@ void main() {
@reflectiveTest
class ByteStreamClientChannelTest {
ByteStreamClientChannel channel;
late ByteStreamClientChannel channel;
/// Sink that may be used to deliver data to the channel, as though it's
/// coming from the server.
IOSink inputSink;
late IOSink inputSink;
/// Sink through which the channel delivers data to the server.
IOSink outputSink;
late IOSink outputSink;
/// Stream of lines sent back to the client by the channel.
Stream<String> outputLineStream;
late Stream<String> outputLineStream;
void setUp() {
var inputStream = StreamController<List<int>>();
@ -103,23 +101,23 @@ class ByteStreamClientChannelTest {
@reflectiveTest
class ByteStreamServerChannelTest {
ByteStreamServerChannel channel;
late ByteStreamServerChannel channel;
/// Sink that may be used to deliver data to the channel, as though it's
/// coming from the client.
IOSink inputSink;
late IOSink inputSink;
/// Stream of lines sent back to the client by the channel.
Stream<String> outputLineStream;
late Stream<String> outputLineStream;
/// Stream of requests received from the channel via [listen()].
Stream<Request> requestStream;
late Stream<Request> requestStream;
/// Stream of errors received from the channel via [listen()].
Stream errorStream;
late Stream errorStream;
/// Future which is completed when then [listen()] reports [onDone].
Future doneFuture;
late Future doneFuture;
void setUp() {
var inputStream = StreamController<List<int>>();
@ -220,10 +218,10 @@ class ByteStreamServerChannelTest {
Future<void> test_sendNotification_exceptionInSink() async {
// This IOSink asynchronously throws an exception on any writeln().
var outputSink = _IOSinkMock();
var outputSink = _IOSinkThatAsyncThrowsOnWrite();
var channel = ByteStreamServerChannel(
null, outputSink, InstrumentationService.NULL_SERVICE);
var channel = ByteStreamServerChannel(StreamController<List<int>>().stream,
outputSink, InstrumentationService.NULL_SERVICE);
// Attempt to send a notification.
channel.sendNotification(Notification('foo'));
@ -245,39 +243,14 @@ class ByteStreamServerChannelTest {
}
}
class _IOSinkMock implements IOSink {
class _IOSinkThatAsyncThrowsOnWrite implements IOSink {
@override
Encoding encoding;
dynamic noSuchMethod(Invocation invocation) {
return super.noSuchMethod(invocation);
}
@override
Future done;
@override
void add(List<int> data) {}
@override
void addError(Object error, [StackTrace stackTrace]) {}
@override
Future addStream(Stream<List<int>> stream) => null;
@override
Future close() => null;
@override
Future flush() => null;
@override
void write(Object obj) {}
@override
void writeAll(Iterable objects, [String separator = '']) {}
@override
void writeCharCode(int charCode) {}
@override
void writeln([Object obj = '']) {
void writeln([Object? obj = '']) {
Timer(Duration(milliseconds: 10), () {
throw '42';
});

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'package:test/test.dart';
import 'byte_stream_channel_test.dart' as byte_stream_channel_test;