Prepare analysis_server_client for isolate implementation.

The isolate implementation will not be included in analysis_server_client
(as that is a public package), but to allow sharing of code this
refactors Server with a base class for common code between the
implementations.  This is intended to be a non-breaking change for
the interface of analysis_server_client.

Bug: https://github.com/dart-lang/sdk/issues/41056
Change-Id: I60fc5233193527f19cbc4b937a784150a53e86b3
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/140140
Reviewed-by: Paul Berry <paulberry@google.com>
Commit-Queue: Janice Collins <jcollins@google.com>
This commit is contained in:
Janice Collins 2020-03-20 19:10:48 +00:00 committed by commit-bot@chromium.org
parent 4f2cb5afbe
commit 7878d4437e
2 changed files with 236 additions and 129 deletions

View file

@ -6,39 +6,22 @@ import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:analysis_server_client/src/server_base.dart';
import 'package:analysis_server_client/listener/server_listener.dart';
import 'package:analysis_server_client/protocol.dart';
import 'package:path/path.dart';
/// Type of callbacks used to process notifications.
typedef NotificationProcessor = void Function(Notification notification);
export 'package:analysis_server_client/src/server_base.dart'
show NotificationProcessor;
/// Instances of the class [Server] manage a server process,
/// and facilitate communication to and from the server.
///
/// Clients may not extend, implement or mix-in this class.
class Server {
/// If not `null`, [_listener] will be sent information
/// about interactions with the server.
final ServerListener _listener;
class Server extends ServerBase {
/// Server process object, or `null` if server hasn't been started yet
/// or if the server has already been stopped.
Process _process;
/// Replicate all stdout/stderr data from the server process to stdout/stderr,
/// when true.
final bool _stdioPassthrough;
/// Commands that have been sent to the server but not yet acknowledged,
/// and the [Completer] objects which should be completed
/// when acknowledgement is received.
final _pendingCommands = <String, Completer<Map<String, dynamic>>>{};
/// Number which should be used to compute the 'id'
/// to send in the next command sent to the server.
int _nextId = 0;
/// The stderr subscription or `null` if either
/// [listenToOutput] has not been called or [stop] has been called.
StreamSubscription<String> _stderrSubscription;
@ -49,13 +32,13 @@ class Server {
Server(
{ServerListener listener, Process process, bool stdioPassthrough = false})
: _listener = listener,
_process = process,
_stdioPassthrough = stdioPassthrough;
: _process = process,
super(listener: listener, stdioPassthrough: stdioPassthrough);
/// Force kill the server. Returns exit code future.
@override
Future<int> kill({String reason = 'none'}) {
_listener?.killingServerProcess(reason);
listener?.killingServerProcess(reason);
final process = _process;
_process = null;
process.kill();
@ -64,69 +47,16 @@ class Server {
/// Start listening to output from the server,
/// and deliver notifications to [notificationProcessor].
@override
void listenToOutput({NotificationProcessor notificationProcessor}) {
_stdoutSubscription = _process.stdout
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((String line) {
if (_stdioPassthrough) stdout.writeln(line);
String trimmedLine = line.trim();
// Guard against lines like:
// {"event":"server.connected","params":{...}}Observatory listening on ...
const observatoryMessage = 'Observatory listening on ';
if (trimmedLine.contains(observatoryMessage)) {
trimmedLine = trimmedLine
.substring(0, trimmedLine.indexOf(observatoryMessage))
.trim();
}
if (trimmedLine.isEmpty) {
return;
}
_listener?.messageReceived(trimmedLine);
Map<String, dynamic> message;
try {
message = json.decoder.convert(trimmedLine);
} catch (exception) {
_listener?.badMessage(trimmedLine, exception);
return;
}
final id = message[Response.ID];
if (id != null) {
// Handle response
final completer = _pendingCommands.remove(id);
if (completer == null) {
_listener?.unexpectedResponse(message, id);
}
if (message.containsKey(Response.ERROR)) {
completer.completeError(RequestError.fromJson(
ResponseDecoder(null), '.error', message[Response.ERROR]));
} else {
completer.complete(message[Response.RESULT]);
}
} else {
// Handle notification
final String event = message[Notification.EVENT];
if (event != null) {
if (notificationProcessor != null) {
notificationProcessor(
Notification(event, message[Notification.PARAMS]));
}
} else {
_listener?.unexpectedMessage(message);
}
}
});
.listen((line) => outputProcessor(line, notificationProcessor));
_stderrSubscription = _process.stderr
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((String line) {
if (_stdioPassthrough) stderr.writeln(line);
String trimmedLine = line.trim();
_listener?.errorMessage(trimmedLine);
});
.listen((line) => errorProcessor(line, notificationProcessor));
}
/// Send a command to the server. An 'id' will be automatically assigned.
@ -136,23 +66,10 @@ class Server {
/// the future will be completed with the 'result' field from the response.
/// If the server acknowledges the command with an error response,
/// the future will be completed with an error.
@override
Future<Map<String, dynamic>> send(
String method, Map<String, dynamic> params) {
String id = '${_nextId++}';
Map<String, dynamic> command = <String, dynamic>{
Request.ID: id,
Request.METHOD: method
};
if (params != null) {
command[Request.PARAMS] = params;
}
final completer = Completer<Map<String, dynamic>>();
_pendingCommands[id] = completer;
String line = json.encode(command);
_listener?.requestSent(line);
_process.stdin.add(utf8.encoder.convert('$line\n'));
return completer.future;
}
String method, Map<String, dynamic> params) =>
sendCommandWith(method, params, _process.stdin.add);
/// Start the server.
///
@ -166,6 +83,7 @@ class Server {
/// If [enableAsserts] is specified, then asserts will be enabled in the new
/// dart process for that server. This is typically just useful to enable
/// locally for debugging.
@override
Future start({
String clientId,
String clientVersion,
@ -222,48 +140,30 @@ class Server {
// Add the server executable.
//
arguments.add(serverPath);
//
// Add server arguments.
//
// TODO(danrubel): Consider moving all cmdline argument consts
// out of analysis_server and into analysis_server_client
if (clientId != null) {
arguments.add('--client-id');
arguments.add(clientId);
}
if (clientVersion != null) {
arguments.add('--client-version');
arguments.add(clientVersion);
}
if (suppressAnalytics) {
arguments.add('--suppress-analytics');
}
if (diagnosticPort != null) {
arguments.add('--port');
arguments.add(diagnosticPort.toString());
}
if (instrumentationLogFile != null) {
arguments.add('--instrumentation-log-file=$instrumentationLogFile');
}
if (sdkPath != null) {
arguments.add('--sdk=$sdkPath');
}
if (useAnalysisHighlight2) {
arguments.add('--useAnalysisHighlight2');
}
_listener?.startingServer(dartBinary, arguments);
arguments.addAll(getServerArguments(
clientId: clientId,
clientVersion: clientVersion,
suppressAnalytics: suppressAnalytics,
diagnosticPort: diagnosticPort,
instrumentationLogFile: instrumentationLogFile,
sdkPath: sdkPath,
useAnalysisHighlight2: useAnalysisHighlight2));
listener?.startingServer(dartBinary, arguments);
_process = await Process.start(dartBinary, arguments);
// ignore: unawaited_futures
_process.exitCode.then((int code) {
if (code != 0 && _process != null) {
// Report an error if server abruptly terminated
_listener?.unexpectedStop(code);
listener?.unexpectedStop(code);
}
});
}
/// Attempt to gracefully shutdown the server.
/// If that fails, then kill the process.
@override
Future<int> stop({Duration timeLimit}) async {
timeLimit ??= const Duration(seconds: 5);
if (_process == null) {
@ -286,7 +186,7 @@ class Server {
return process.exitCode.timeout(
timeLimit,
onTimeout: () {
_listener?.killingServerProcess('server failed to exit');
listener?.killingServerProcess('server failed to exit');
process.kill();
return process.exitCode;
},

View file

@ -0,0 +1,207 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:analysis_server_client/listener/server_listener.dart';
import 'package:analysis_server_client/protocol.dart';
/// Type of callbacks used to process notifications.
typedef NotificationProcessor = void Function(Notification notification);
/// A function via which data can be sent to a started server.
typedef CommandSender = void Function(List<int> utf8bytes);
///
/// Add server arguments.
///
/// TODO(danrubel): Consider moving all cmdline argument consts
/// out of analysis_server and into analysis_server_client.
List<String> getServerArguments({
String clientId,
String clientVersion,
int diagnosticPort,
String instrumentationLogFile,
String sdkPath,
bool suppressAnalytics,
bool useAnalysisHighlight2 = false,
}) {
List<String> arguments = [];
if (clientId != null) {
arguments.add('--client-id');
arguments.add(clientId);
}
if (clientVersion != null) {
arguments.add('--client-version');
arguments.add(clientVersion);
}
if (suppressAnalytics) {
arguments.add('--suppress-analytics');
}
if (diagnosticPort != null) {
arguments.add('--port');
arguments.add(diagnosticPort.toString());
}
if (instrumentationLogFile != null) {
arguments.add('--instrumentation-log-file=$instrumentationLogFile');
}
if (sdkPath != null) {
arguments.add('--sdk=$sdkPath');
}
if (useAnalysisHighlight2) {
arguments.add('--useAnalysisHighlight2');
}
return arguments;
}
/// Implementations of the class [ServerBase] manage an analysis server,
/// and facilitate communication to and from the server.
///
/// Clients outside of this package may not extend or implement this class.
abstract class ServerBase {
/// Replicate all data from the server process to stdout/stderr, when true.
final bool _stdioPassthrough;
/// Number which should be used to compute the 'id'
/// to send in the next command sent to the server.
int _nextId = 0;
/// If not `null`, [_listener] will be sent information
/// about interactions with the server.
final ServerListener _listener;
ServerListener get listener => _listener;
ServerBase({ServerListener listener, bool stdioPassthrough = false})
: _listener = listener,
_stdioPassthrough = stdioPassthrough;
/// Commands that have been sent to the server but not yet acknowledged,
/// and the [Completer] objects which should be completed
/// when acknowledgement is received.
final _pendingCommands = <String, Completer<Map<String, dynamic>>>{};
/// Force kill the server. Returns a future that completes when the server
/// stops.
Future kill({String reason = 'none'});
/// Start listening to output from the server,
/// and deliver notifications to [notificationProcessor].
void listenToOutput({NotificationProcessor notificationProcessor});
/// Send a command to the server. An 'id' will be automatically assigned.
/// The returned [Future] will be completed when the server acknowledges
/// the command with a response.
/// If the server acknowledges the command with a normal (non-error) response,
/// the future will be completed with the 'result' field from the response.
/// If the server acknowledges the command with an error response,
/// the future will be completed with an error.
Future<Map<String, dynamic>> send(String method, Map<String, dynamic> params);
/// Encodes a request for transmission and sends it as a utf8 encoded byte
/// string with [sendWith].
Future<Map<String, dynamic>> sendCommandWith(
String method, Map<String, dynamic> params, CommandSender sendWith) {
String id = '${_nextId++}';
Map<String, dynamic> command = <String, dynamic>{
Request.ID: id,
Request.METHOD: method
};
if (params != null) {
command[Request.PARAMS] = params;
}
final completer = Completer<Map<String, dynamic>>();
_pendingCommands[id] = completer;
String line = json.encode(command);
listener?.requestSent(line);
sendWith(utf8.encoder.convert('$line\n'));
return completer.future;
}
/// If the implementation of [ServerBase] captures an error stream,
/// it can use this to forward the errors to [listener] and [stderr] if
/// appropriate.
void errorProcessor(
String line, NotificationProcessor notificationProcessor) {
if (_stdioPassthrough) stderr.writeln(line);
String trimmedLine = line.trim();
listener?.errorMessage(trimmedLine);
}
/// Handle a (possibly) json encoded object, completing the [Completer] in
/// [_pendingCommands] corresponding to the response. Reports problems in
/// decoding or message synchronization using [listener], and replicates
/// raw data to [stdout] as appropriate.
void outputProcessor(
String line, NotificationProcessor notificationProcessor) {
if (_stdioPassthrough) stdout.writeln(line);
String trimmedLine = line.trim();
// Guard against lines like:
// {"event":"server.connected","params":{...}}Observatory listening on ...
const observatoryMessage = 'Observatory listening on ';
if (trimmedLine.contains(observatoryMessage)) {
trimmedLine = trimmedLine
.substring(0, trimmedLine.indexOf(observatoryMessage))
.trim();
}
if (trimmedLine.isEmpty) {
return;
}
listener?.messageReceived(trimmedLine);
Map<String, dynamic> message;
try {
message = json.decoder.convert(trimmedLine);
} catch (exception) {
listener?.badMessage(trimmedLine, exception);
return;
}
final id = message[Response.ID];
if (id != null) {
// Handle response
final completer = _pendingCommands.remove(id);
if (completer == null) {
listener?.unexpectedResponse(message, id);
}
if (message.containsKey(Response.ERROR)) {
completer.completeError(RequestError.fromJson(
ResponseDecoder(null), '.error', message[Response.ERROR]));
} else {
completer.complete(message[Response.RESULT]);
}
} else {
// Handle notification
final String event = message[Notification.EVENT];
if (event != null) {
if (notificationProcessor != null) {
notificationProcessor(
Notification(event, message[Notification.PARAMS]));
}
} else {
listener?.unexpectedMessage(message);
}
}
}
/// Start the server. The returned future completes when the server
/// is started and it is valid to call [listenToOutput].
Future start({
String clientId,
String clientVersion,
int diagnosticPort,
String instrumentationLogFile,
String sdkPath,
bool suppressAnalytics,
bool useAnalysisHighlight2,
});
/// Attempt to gracefully shutdown the server.
/// If that fails, then force it to shut down.
Future stop({Duration timeLimit});
}