Migrate more of analysis_server_client

Added a few new methods to the listener, tests.

Change-Id: I477a17da3fd31db04c4960effee076680db62301
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/193895
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
This commit is contained in:
Konstantin Shcheglov 2021-04-02 23:36:52 +00:00 committed by commit-bot@chromium.org
parent ada329f6b1
commit 48c0eadb86
8 changed files with 201 additions and 121 deletions

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:io' show Directory, Platform, ProcessSignal, exit;
@ -37,7 +35,7 @@ void main(List<String> args) async {
AnalysisSetAnalysisRootsParams([target], const []).toJson());
// Continue to watch for analysis until the user presses Ctrl-C
StreamSubscription<ProcessSignal> subscription;
late StreamSubscription<ProcessSignal> subscription;
subscription = ProcessSignal.sigint.watch().listen((_) async {
print('Exiting...');
// ignore: unawaited_futures
@ -109,15 +107,14 @@ class _Handler with NotificationHandler, ConnectionHandler {
} else {
print('Server Error: ${params.message}');
}
if (params.stackTrace != null) {
print(params.stackTrace);
}
print(params.stackTrace);
super.onServerError(params);
}
@override
void onServerStatus(ServerStatusParams params) {
if (!params.analysis.isAnalyzing) {
var analysisStatus = params.analysis;
if (analysisStatus != null && !analysisStatus.isAnalyzing) {
// Whenever the server stops analyzing,
// print a brief summary of what issues have been found.
if (errorCount == 0) {

View file

@ -1,12 +1,6 @@
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// This file has been automatically generated. Please do not edit it manually.
// To regenerate the file, use the script
// "pkg/analysis_server/tool/spec/generate_files".
// @dart = 2.9
import 'dart:async';
@ -60,7 +54,7 @@ mixin ConnectionHandler implements NotificationHandler {
/// Return a future that completes with a `bool` indicating whether
/// a connection was successfully established with the server.
Future<bool> serverConnected({Duration timeLimit}) {
Future<bool> serverConnected({Duration? timeLimit}) {
var future = _connected.future;
if (timeLimit != null) {
future = future.timeout(timeLimit, onTimeout: () {

View file

@ -43,16 +43,28 @@ mixin ServerListener {
/// Called when the [Server] receives an unexpected message
/// which is not a notification or response.
void unexpectedMessage(Map<String, dynamic> message) {
void unexpectedMessage(Map<String, Object?> message) {
log('Unexpected message from server:', '$message');
}
/// Called when the [Server] received a response, which is not an error,
/// but the result is not an JSON object.
void unexpectedNotificationFormat(Map<String, Object?> message) {
log('Unexpected notification format from server', '$message');
}
/// Called when the [Server] recieved an unexpected response
/// where the [id] does not match the [id] of an outstanding request.
void unexpectedResponse(Map<String, dynamic> message, id) {
void unexpectedResponse(Map<String, Object?> message, Object id) {
log('Unexpected response from server', 'id=$id');
}
/// Called when the [Server] received a response, which is not an error,
/// but the result is not an JSON object.
void unexpectedResponseFormat(Map<String, Object?> message) {
log('Unexpected response format from server', '$message');
}
/// Called when the server process unexpectedly exits
/// with a non-zero exit code.
void unexpectedStop(int exitCode) {

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -23,18 +21,20 @@ export 'package:analysis_server_client/src/server_base.dart'
class Server extends ServerBase {
/// Server process object, or `null` if server hasn't been started yet
/// or if the server has already been stopped.
Process _process;
Process? _process;
/// The stderr subscription or `null` if either
/// [listenToOutput] has not been called or [stop] has been called.
StreamSubscription<String> _stderrSubscription;
StreamSubscription<String>? _stderrSubscription;
/// The stdout subscription or `null` if either
/// [listenToOutput] has not been called or [stop] has been called.
StreamSubscription<String> _stdoutSubscription;
StreamSubscription<String>? _stdoutSubscription;
Server(
{ServerListener listener, Process process, bool stdioPassthrough = false})
{ServerListener? listener,
Process? process,
bool stdioPassthrough = false})
: _process = process,
super(listener: listener, stdioPassthrough: stdioPassthrough);
@ -42,7 +42,7 @@ class Server extends ServerBase {
@override
Future<int> kill({String reason = 'none'}) {
listener?.killingServerProcess(reason);
final process = _process;
final process = _process!;
_process = null;
process.kill();
return process.exitCode;
@ -51,12 +51,12 @@ class Server extends ServerBase {
/// Start listening to output from the server,
/// and deliver notifications to [notificationProcessor].
@override
void listenToOutput({NotificationProcessor notificationProcessor}) {
_stdoutSubscription = _process.stdout
void listenToOutput({NotificationProcessor? notificationProcessor}) {
_stdoutSubscription = _process!.stdout
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((line) => outputProcessor(line, notificationProcessor));
_stderrSubscription = _process.stderr
_stderrSubscription = _process!.stderr
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((line) => errorProcessor(line, notificationProcessor));
@ -70,9 +70,9 @@ class Server extends ServerBase {
/// If the server acknowledges the command with an error response,
/// the future will be completed with an error.
@override
Future<Map<String, dynamic>> send(
String method, Map<String, dynamic> params) =>
sendCommandWith(method, params, _process.stdin.add);
Future<Map<String, Object?>?> send(
String method, Map<String, Object?>? params) =>
sendCommandWith(method, params, _process!.stdin.add);
/// Start the server.
///
@ -88,14 +88,14 @@ class Server extends ServerBase {
/// locally for debugging.
@override
Future start({
String clientId,
String clientVersion,
int diagnosticPort,
String instrumentationLogFile,
String? clientId,
String? clientVersion,
int? diagnosticPort,
String? instrumentationLogFile,
bool profileServer = false,
String sdkPath,
String serverPath,
int servicesPort,
String? sdkPath,
String? serverPath,
int? servicesPort,
bool suppressAnalytics = true,
bool useAnalysisHighlight2 = false,
bool enableAsserts = false,
@ -154,9 +154,10 @@ class Server extends ServerBase {
useAnalysisHighlight2: useAnalysisHighlight2));
listener?.startingServer(dartBinary, arguments);
_process = await Process.start(dartBinary, arguments);
final process = await Process.start(dartBinary, arguments);
_process = process;
// ignore: unawaited_futures
_process.exitCode.then((int code) {
process.exitCode.then((int code) {
if (code != 0 && _process != null) {
// Report an error if server abruptly terminated
listener?.unexpectedStop(code);
@ -167,19 +168,21 @@ class Server extends ServerBase {
/// Attempt to gracefully shutdown the server.
/// If that fails, then kill the process.
@override
Future<int> stop({Duration timeLimit}) async {
Future<int> stop({Duration? timeLimit}) async {
timeLimit ??= const Duration(seconds: 5);
if (_process == null) {
final process = _process;
if (process == null) {
// Process already exited
return -1;
}
final future = send(SERVER_REQUEST_SHUTDOWN, null);
final process = _process;
_process = null;
await future
// fall through to wait for exit
.timeout(timeLimit, onTimeout: () {
return null;
return {};
}).whenComplete(() async {
await _stderrSubscription?.cancel();
_stderrSubscription = null;

View file

@ -34,7 +34,7 @@ class Notification {
/// A table mapping the names of notification parameters to their values, or
/// `null` if there are no notification parameters.
final Map<String, Object>? params;
final Map<String, Object?>? params;
/// Initialize a newly created [Notification] to have the given [event] name.
/// If [params] is provided, it will be used as the params; otherwise no
@ -44,7 +44,7 @@ class Notification {
/// Initialize a newly created instance based on the given JSON data.
factory Notification.fromJson(Map json) {
return Notification(json[Notification.EVENT],
json[Notification.PARAMS] as Map<String, Object>);
json[Notification.PARAMS] as Map<String, Object?>);
}
/// Return a table representing the structure of the Json object that will be

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -17,12 +15,12 @@ import 'package:analysis_server_client/protocol.dart';
/// TODO(danrubel): Consider moving all cmdline argument consts
/// out of analysis_server and into analysis_server_client.
List<String> getServerArguments({
String clientId,
String clientVersion,
int diagnosticPort,
String instrumentationLogFile,
String sdkPath,
bool suppressAnalytics,
String? clientId,
String? clientVersion,
int? diagnosticPort,
String? instrumentationLogFile,
String? sdkPath,
bool suppressAnalytics = true,
bool useAnalysisHighlight2 = false,
}) {
var arguments = <String>[];
@ -74,24 +72,24 @@ abstract class ServerBase {
/// If not `null`, [_listener] will be sent information
/// about interactions with the server.
final ServerListener _listener;
final ServerListener? _listener;
/// Commands that have been sent to the server but not yet acknowledged,
/// and the [Completer] objects which should be completed
/// when acknowledgement is received.
final _pendingCommands = <String, Completer<Map<String, dynamic>>>{};
final _pendingCommands = <String, Completer<Map<String, Object?>?>>{};
ServerBase({ServerListener listener, bool stdioPassthrough = false})
ServerBase({ServerListener? listener, bool stdioPassthrough = false})
: _listener = listener,
_stdioPassthrough = stdioPassthrough;
ServerListener get listener => _listener;
ServerListener? get listener => _listener;
/// If the implementation of [ServerBase] captures an error stream,
/// it can use this to forward the errors to [listener] and [stderr] if
/// appropriate.
void errorProcessor(
String line, NotificationProcessor notificationProcessor) {
String line, NotificationProcessor? notificationProcessor) {
if (_stdioPassthrough) stderr.writeln(line);
var trimmedLine = line.trim();
listener?.errorMessage(trimmedLine);
@ -110,7 +108,7 @@ abstract class ServerBase {
/// decoding or message synchronization using [listener], and replicates
/// raw data to [stdout] as appropriate.
void outputProcessor(
String line, NotificationProcessor notificationProcessor) {
String line, NotificationProcessor? notificationProcessor) {
if (_stdioPassthrough) stdout.writeln(line);
var trimmedLine = line.trim();
@ -127,7 +125,7 @@ abstract class ServerBase {
}
listener?.messageReceived(trimmedLine);
Map<String, dynamic> message;
Map<String, Object?> message;
try {
message = json.decoder.convert(trimmedLine);
} catch (exception) {
@ -135,31 +133,47 @@ abstract class ServerBase {
return;
}
// Handle response.
final id = message[Response.ID];
if (id != null) {
// Handle response
final completer = _pendingCommands.remove(id);
if (completer == null) {
listener?.unexpectedResponse(message, id);
return;
}
if (message.containsKey(Response.ERROR)) {
completer.completeError(RequestError.fromJson(
ResponseDecoder(null), '.error', message[Response.ERROR]));
} else {
completer.complete(message[Response.RESULT]);
final errorJson = message[Response.ERROR];
if (errorJson != null) {
completer.completeError(
RequestError.fromJson(ResponseDecoder(null), '.error', errorJson));
return;
}
} else {
// Handle notification
final String event = message[Notification.EVENT];
if (event != null) {
final resultJson = message[Response.RESULT];
if (resultJson is Map<String, Object?>?) {
completer.complete(resultJson);
return;
}
listener?.unexpectedResponseFormat(message);
return;
}
// Handle notification.
final event = message[Notification.EVENT];
if (event is String) {
final paramsJson = message[Notification.PARAMS];
if (paramsJson is Map<String, Object?>) {
if (notificationProcessor != null) {
notificationProcessor(
Notification(event, message[Notification.PARAMS]));
notificationProcessor(Notification(event, paramsJson));
}
} else {
listener?.unexpectedMessage(message);
listener?.unexpectedNotificationFormat(message);
}
return;
}
listener?.unexpectedMessage(message);
}
/// Send a command to the server. An 'id' will be automatically assigned.
@ -169,18 +183,19 @@ abstract class ServerBase {
/// the future will be completed with the 'result' field from the response.
/// If the server acknowledges the command with an error response,
/// the future will be completed with an error.
Future<Map<String, dynamic>> send(String method, Map<String, dynamic> params);
Future<Map<String, Object?>?> send(
String method, Map<String, Object?>? params);
/// Encodes a request for transmission and sends it as a utf8 encoded byte
/// string with [sendWith].
Future<Map<String, dynamic>> sendCommandWith(
String method, Map<String, dynamic> params, CommandSender sendWith) {
Future<Map<String, Object?>?> sendCommandWith(
String method, Map<String, Object?>? params, CommandSender sendWith) {
var id = '${_nextId++}';
var command = <String, dynamic>{Request.ID: id, Request.METHOD: method};
if (params != null) {
command[Request.PARAMS] = params;
}
final completer = Completer<Map<String, dynamic>>();
final completer = Completer<Map<String, Object?>?>();
_pendingCommands[id] = completer;
var line = json.encode(command);
listener?.requestSent(line);

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'package:analysis_server_client/handler/connection_handler.dart';
import 'package:analysis_server_client/handler/notification_handler.dart';
import 'package:analysis_server_client/listener/server_listener.dart';

View file

@ -2,23 +2,24 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:analysis_server_client/listener/server_listener.dart';
import 'package:analysis_server_client/protocol.dart';
import 'package:analysis_server_client/server.dart';
import 'package:test/test.dart';
void main() {
MockProcess process;
Server server;
late _ServerListener listener;
late MockProcess process;
late Server server;
setUp(() async {
process = MockProcess();
server = Server(process: process);
listener = _ServerListener();
server = Server(process: process, listener: listener);
});
group('listenToOutput', () {
@ -29,7 +30,7 @@ void main() {
final future = server.send('blahMethod', null);
server.listenToOutput();
final response = await future;
final response = (await future)!;
expect(response['foo'], 'bar');
});
@ -45,6 +46,7 @@ void main() {
expect(error.code, RequestErrorCode.UNKNOWN_REQUEST);
expect(error.message, 'something went wrong');
expect(error.stackTrace, 'some long stack trace');
return <String, Object?>{};
});
server.listenToOutput();
});
@ -56,9 +58,10 @@ void main() {
final completer = Completer();
void eventHandler(Notification notification) {
expect(notification.event, 'fooEvent');
expect(notification.params.length, 2);
expect(notification.params['foo'] as String, 'bar');
expect(notification.params['baz'] as String, 'bang');
var params = notification.params!;
expect(params.length, 2);
expect(params['foo'] as String, 'bar');
expect(params['baz'] as String, 'bang');
completer.complete();
}
@ -67,6 +70,61 @@ void main() {
server.listenToOutput(notificationProcessor: eventHandler);
await completer.future;
});
test('unexpected message', () async {
// No 'id', so not a response.
// No 'event', so not a notification.
process.stdout = Stream.value(
utf8.encoder.convert(json.encode({'foo': 'bar'})),
);
process.stderr = _noMessage();
server.listenToOutput();
// Must happen for the test to pass.
await listener.unexpectedMessageController.stream.first;
});
test('unexpected notification format', () async {
process.stdout = Stream.value(
utf8.encoder.convert(json.encode({'event': 'foo', 'noParams': '42'})),
);
process.stderr = _noMessage();
server.listenToOutput();
// Must happen for the test to pass.
await listener.unexpectedNotificationFormatCompleter.stream.first;
});
test('unexpected response', () async {
// We have no asked anything, but got a response.
process.stdout = Stream.value(
utf8.encoder.convert(json.encode({'id': '0'})),
);
process.stderr = _noMessage();
server.listenToOutput();
// Must happen for the test to pass.
await listener.unexpectedResponseCompleter.stream.first;
});
test('unexpected response format', () async {
// We expect that the first request has id `0`.
// The response is invalid - the "result" field is not an object.
process.stdout = Stream.value(
utf8.encoder.convert(json.encode({'id': '0', 'result': '42'})),
);
process.stderr = _noMessage();
// ignore: unawaited_futures
server.send('blahMethod', null);
server.listenToOutput();
// Must happen for the test to pass.
await listener.unexpectedResponseFormatCompleter.stream.first;
});
});
group('stop', () {
@ -151,16 +209,13 @@ class MockProcess implements Process {
bool killed = false;
@override
Stream<List<int>> stderr;
late Stream<List<int>> stderr;
@override
Stream<List<int>> stdout;
late Stream<List<int>> stdout;
@override
Future<int> exitCode;
@override
int get pid => null;
late Future<int> exitCode;
@override
IOSink get stdin => mockin;
@ -171,43 +226,49 @@ class MockProcess implements Process {
killed = true;
return !wasKilled;
}
@override
dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
}
class MockStdin implements IOSink {
final controller = StreamController<String>();
@override
Encoding encoding;
@override
Future get done => null;
@override
void add(List<int> data) {
controller.add(utf8.decode(data));
}
@override
void addError(Object error, [StackTrace stackTrace]) {}
@override
Future addStream(Stream<List<int>> stream) => null;
@override
Future close() => null;
@override
Future flush() => null;
@override
void write(Object obj) {}
@override
void writeAll(Iterable objects, [String separator = '']) {}
@override
void writeCharCode(int charCode) {}
@override
void writeln([Object obj = '']) {}
dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
}
class _ServerListener with ServerListener {
final unexpectedMessageController = StreamController<Object>();
final unexpectedNotificationFormatCompleter = StreamController<Object>();
final unexpectedResponseCompleter = StreamController<Object>();
final unexpectedResponseFormatCompleter = StreamController<Object>();
@override
void log(String prefix, String details) {}
@override
void unexpectedMessage(Map<String, Object?> message) {
unexpectedMessageController.add(message);
}
@override
void unexpectedNotificationFormat(Map<String, Object?> message) {
unexpectedNotificationFormatCompleter.add(message);
}
@override
void unexpectedResponse(Map<String, Object?> message, Object id) {
unexpectedResponseCompleter.add(message);
}
@override
void unexpectedResponseFormat(Map<String, Object?> message) {
unexpectedResponseFormatCompleter.add(message);
}
}