Add portForwarder for ProxiedDevice. (#100111)

This commit is contained in:
Lau Ching Jun 2022-03-15 14:30:21 -07:00 committed by GitHub
parent 8010a53761
commit 722cbc52b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 349 additions and 40 deletions

View file

@ -5,6 +5,8 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:meta/meta.dart';
import '../application_package.dart';
import '../base/common.dart';
import '../base/file_system.dart';
@ -207,11 +209,15 @@ class ProxiedDevice extends Device {
bool includePastLogs = false,
}) => _ProxiedLogReader(connection, this, app);
_ProxiedPortForwarder? _proxiedPortForwarder;
_ProxiedPortForwarder get proxiedPortForwarder => _proxiedPortForwarder ??= _ProxiedPortForwarder(connection, logger: _logger);
ProxiedPortForwarder? _proxiedPortForwarder;
/// [proxiedPortForwarder] forwards a port from the remote host to local host.
ProxiedPortForwarder get proxiedPortForwarder => _proxiedPortForwarder ??= ProxiedPortForwarder(connection, logger: _logger);
DevicePortForwarder? _portForwarder;
/// [portForwarder] forwards a port from the remote device to remote host, and
/// then forward the port from remote host to local host.
@override
DevicePortForwarder get portForwarder => throw UnimplementedError();
DevicePortForwarder get portForwarder => _portForwarder ??= ProxiedPortForwarder(connection, deviceId: id, logger: _logger);
@override
void clearLogs() => throw UnimplementedError();
@ -390,29 +396,114 @@ class _ProxiedLogReader extends DeviceLogReader {
}
}
/// A port forwarded by a [ProxiedPortForwarder].
class _ProxiedForwardedPort extends ForwardedPort {
_ProxiedForwardedPort(this.connection, {
required int hostPort,
required int devicePort,
required this.remoteDevicePort,
required this.deviceId,
required this.serverSocket
}): super(hostPort, devicePort);
/// [DaemonConnection] used to communicate with the daemon.
final DaemonConnection connection;
/// The forwarded port on the remote device.
final int? remoteDevicePort;
/// The device identifier of the remote device.
final String? deviceId;
/// The [ServerSocket] that is serving the local forwarded port.
final ServerSocket serverSocket;
@override
void dispose() {
unforward();
}
/// Unforwards the remote port, and stops the local server.
Future<void> unforward() async {
await serverSocket.close();
if (remoteDevicePort != null && deviceId != null) {
await connection.sendRequest('device.unforward', <String, Object>{
'deviceId': deviceId!,
'devicePort': remoteDevicePort!,
'hostPort': devicePort,
});
}
}
}
typedef CreateSocketServer = Future<ServerSocket> Function(Logger logger, int? hostPort);
/// A [DevicePortForwarder] for a proxied device.
class _ProxiedPortForwarder extends DevicePortForwarder {
_ProxiedPortForwarder(this.connection, {
///
/// If [deviceId] is not null, the port forwarder forwards ports from the remote
/// device, to the remote host, and then to the local host.
///
/// If [deviceId] is null, then the port forwarder only forwards ports from the
/// remote host to the local host.
@visibleForTesting
class ProxiedPortForwarder extends DevicePortForwarder {
ProxiedPortForwarder(this.connection, {
String? deviceId,
required Logger logger,
}) : _logger = logger;
@visibleForTesting CreateSocketServer createSocketServer = _defaultCreateServerSocket,
}) : _logger = logger,
_deviceId = deviceId,
_createSocketServer = createSocketServer;
final String? _deviceId;
DaemonConnection connection;
final Logger _logger;
final CreateSocketServer _createSocketServer;
@override
final List<ForwardedPort> forwardedPorts = <ForwardedPort>[];
List<ForwardedPort> get forwardedPorts => _hostPortToForwardedPorts.values.toList();
final Map<int, _ProxiedForwardedPort> _hostPortToForwardedPorts = <int, _ProxiedForwardedPort>{};
final List<Socket> _connectedSockets = <Socket>[];
final Map<int, ServerSocket> _serverSockets = <int, ServerSocket>{};
@override
Future<int> forward(int devicePort, { int? hostPort }) async {
final ServerSocket serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, hostPort ?? 0);
int? remoteDevicePort;
final String? deviceId = _deviceId;
_serverSockets[serverSocket.port] = serverSocket;
forwardedPorts.add(ForwardedPort(serverSocket.port, devicePort));
// If deviceId is set, we need to forward the remote device port to remote host as well.
// And then, forward the remote host port to a local host port.
if (deviceId != null) {
final Map<String, Object?> result = _cast<Map<String, Object?>>(
await connection.sendRequest('device.forward', <String, Object>{
'deviceId': deviceId,
'devicePort': devicePort,
}));
remoteDevicePort = devicePort;
devicePort = result['hostPort']! as int;
}
final ServerSocket serverSocket = await _startProxyServer(devicePort, hostPort);
_hostPortToForwardedPorts[serverSocket.port] = _ProxiedForwardedPort(
connection,
hostPort: serverSocket.port,
devicePort: devicePort,
remoteDevicePort: remoteDevicePort,
deviceId: deviceId,
serverSocket: serverSocket,
);
return serverSocket.port;
}
Future<ServerSocket> _startProxyServer(int devicePort, int? hostPort) async {
final ServerSocket serverSocket = await _createSocketServer(_logger, hostPort);
serverSocket.listen((Socket socket) async {
final String id = _cast<String>(await connection.sendRequest('proxy.connect', <String, Object>{
@ -423,6 +514,9 @@ class _ProxiedPortForwarder extends DevicePortForwarder {
final Future<DaemonEventData> disconnectFuture = connection.listenToEvent('proxy.disconnected.$id').first;
unawaited(disconnectFuture.then((_) {
socket.close();
}).catchError((_) {
// The event is not guaranteed to be sent if we initiated the disconnection.
// Do nothing here.
}));
socket.listen((Uint8List data) {
connection.sendRequest('proxy.write', <String, Object>{
@ -446,30 +540,21 @@ class _ProxiedPortForwarder extends DevicePortForwarder {
_logger.printWarning('Server socket error: $error');
_logger.printTrace('Server socket error: $error, stack trace: $stackTrace');
});
return serverSocket.port;
return serverSocket;
}
@override
Future<void> unforward(ForwardedPort forwardedPort) async {
if (!forwardedPorts.remove(forwardedPort)) {
// Not in list. Nothing to remove.
return;
}
forwardedPort.dispose();
final ServerSocket? serverSocket = _serverSockets.remove(forwardedPort.hostPort);
await serverSocket?.close();
// Look for the forwarded port entry in our own map.
final _ProxiedForwardedPort? proxiedForwardedPort = _hostPortToForwardedPorts.remove(forwardedPort.hostPort);
await proxiedForwardedPort?.unforward();
}
@override
Future<void> dispose() async {
for (final ForwardedPort forwardedPort in forwardedPorts) {
forwardedPort.dispose();
}
for (final ServerSocket serverSocket in _serverSockets.values) {
await serverSocket.close();
for (final _ProxiedForwardedPort forwardedPort in _hostPortToForwardedPorts.values) {
await forwardedPort.unforward();
}
for (final Socket socket in _connectedSockets) {
@ -477,3 +562,15 @@ class _ProxiedPortForwarder extends DevicePortForwarder {
}
}
}
Future<ServerSocket> _defaultCreateServerSocket(Logger logger, int? hostPort) async {
try {
return await ServerSocket.bind(InternetAddress.loopbackIPv4, hostPort ?? 0);
} on SocketException {
logger.printTrace('Bind on $hostPort failed with IPv4, retrying on IPv6');
}
// If binding on ipv4 failed, try binding on ipv6.
// Omit try catch here, let the failure fallthrough.
return ServerSocket.bind(InternetAddress.loopbackIPv6, hostPort ?? 0);
}

View file

@ -13,18 +13,6 @@ import 'package:flutter_tools/src/proxied_devices/file_transfer.dart';
import '../../src/common.dart';
void main() {
// late BufferLogger bufferLogger;
// late FakeDaemonStreams daemonStreams;
// late DaemonConnection daemonConnection;
// setUp(() {
// bufferLogger = BufferLogger.test();
// daemonStreams = FakeDaemonStreams();
// daemonConnection = DaemonConnection(
// daemonStreams: daemonStreams,
// logger: bufferLogger,
// );
// });
group('convertToChunks', () {
test('works correctly', () async {
final StreamController<Uint8List> controller = StreamController<Uint8List>();

View file

@ -0,0 +1,224 @@
// Copyright 2014 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:flutter_tools/src/base/common.dart';
import 'package:flutter_tools/src/base/logger.dart';
import 'package:flutter_tools/src/daemon.dart';
import 'package:flutter_tools/src/proxied_devices/devices.dart';
import 'package:test/fake.dart';
import '../../src/common.dart';
void main() {
group('ProxiedPortForwarder', () {
late BufferLogger bufferLogger;
late DaemonConnection serverDaemonConnection;
late DaemonConnection clientDaemonConnection;
setUp(() {
bufferLogger = BufferLogger.test();
final FakeDaemonStreams serverDaemonStreams = FakeDaemonStreams();
serverDaemonConnection = DaemonConnection(
daemonStreams: serverDaemonStreams,
logger: bufferLogger,
);
final FakeDaemonStreams clientDaemonStreams = FakeDaemonStreams();
clientDaemonConnection = DaemonConnection(
daemonStreams: clientDaemonStreams,
logger: bufferLogger,
);
serverDaemonStreams.inputs.addStream(clientDaemonStreams.outputs.stream);
clientDaemonStreams.inputs.addStream(serverDaemonStreams.outputs.stream);
});
tearDown(() async {
await serverDaemonConnection.dispose();
await clientDaemonConnection.dispose();
});
testWithoutContext('works correctly without device id', () async {
final FakeServerSocket fakeServerSocket = FakeServerSocket(200);
final ProxiedPortForwarder portForwarder = ProxiedPortForwarder(
clientDaemonConnection,
logger: bufferLogger,
createSocketServer: (Logger logger, int? hostPort) async =>
fakeServerSocket,
);
final int result = await portForwarder.forward(100);
expect(result, 200);
final FakeSocket fakeSocket = FakeSocket();
fakeServerSocket.controller.add(fakeSocket);
final Stream<DaemonMessage> broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream();
DaemonMessage message = await broadcastOutput.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'proxy.connect');
expect(message.data['params'], <String, Object?>{'port': 100});
const String id = 'random_id';
serverDaemonConnection.sendResponse(message.data['id']!, id);
// Forwards the data received from socket to daemon.
fakeSocket.controller.add(Uint8List.fromList(<int>[1, 2, 3]));
message = await broadcastOutput.first;
expect(message.data['method'], 'proxy.write');
expect(message.data['params'], <String, Object?>{'id': id});
expect(message.binary, isNotNull);
final List<List<int>> binary = await message.binary!.toList();
expect(binary, <List<int>>[<int>[1, 2, 3]]);
// Forwards data received as event to socket.
expect(fakeSocket.addedData.isEmpty, true);
serverDaemonConnection.sendEvent('proxy.data.$id', null, <int>[4, 5, 6]);
await pumpEventQueue();
expect(fakeSocket.addedData.isNotEmpty, true);
expect(fakeSocket.addedData[0], <int>[4, 5, 6]);
// Closes the socket after the remote end disconnects
expect(fakeSocket.closeCalled, false);
serverDaemonConnection.sendEvent('proxy.disconnected.$id');
await pumpEventQueue();
expect(fakeSocket.closeCalled, true);
});
testWithoutContext('forwards the port from the remote end with device id', () async {
final FakeServerSocket fakeServerSocket = FakeServerSocket(400);
final ProxiedPortForwarder portForwarder = ProxiedPortForwarder(
clientDaemonConnection,
deviceId: 'device_id',
logger: bufferLogger,
createSocketServer: (Logger logger, int? hostPort) async =>
fakeServerSocket,
);
final Stream<DaemonMessage> broadcastOutput = serverDaemonConnection.incomingCommands.asBroadcastStream();
final Future<int> result = portForwarder.forward(300);
DaemonMessage message = await broadcastOutput.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'device.forward');
expect(message.data['params'], <String, Object?>{'deviceId': 'device_id', 'devicePort': 300});
serverDaemonConnection.sendResponse(message.data['id']!, <String, Object?>{'hostPort': 350});
expect(await result, 400);
final FakeSocket fakeSocket = FakeSocket();
fakeServerSocket.controller.add(fakeSocket);
message = await broadcastOutput.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'proxy.connect');
expect(message.data['params'], <String, Object?>{'port': 350});
const String id = 'random_id';
serverDaemonConnection.sendResponse(message.data['id']!, id);
// Unforward will try to disconnect the remote port.
portForwarder.forwardedPorts.single.dispose();
expect(fakeServerSocket.closeCalled, true);
message = await broadcastOutput.first;
expect(message.data['id'], isNotNull);
expect(message.data['method'], 'device.unforward');
expect(message.data['params'], <String, Object?>{
'deviceId': 'device_id',
'devicePort': 300,
'hostPort': 350,
});
});
});
}
class FakeDaemonStreams implements DaemonStreams {
final StreamController<DaemonMessage> inputs = StreamController<DaemonMessage>();
final StreamController<DaemonMessage> outputs = StreamController<DaemonMessage>();
@override
Stream<DaemonMessage> get inputStream {
return inputs.stream;
}
@override
void send(Map<String, dynamic> message, [List<int>? binary]) {
outputs.add(DaemonMessage(message, binary != null ? Stream<List<int>>.value(binary) : null));
}
@override
Future<void> dispose() async {
await inputs.close();
// In some tests, outputs have no listeners. We don't wait for outputs to close.
unawaited(outputs.close());
}
}
class FakeServerSocket extends Fake implements ServerSocket {
FakeServerSocket(this.port);
@override
final int port;
bool closeCalled = false;
final StreamController<Socket> controller = StreamController<Socket>();
@override
StreamSubscription<Socket> listen(
void Function(Socket event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
Future<ServerSocket> close() async {
closeCalled = true;
return this;
}
}
class FakeSocket extends Fake implements Socket {
bool closeCalled = false;
final StreamController<Uint8List> controller = StreamController<Uint8List>();
final List<List<int>> addedData = <List<int>>[];
final Completer<bool> doneCompleter = Completer<bool>();
@override
StreamSubscription<Uint8List> listen(
void Function(Uint8List event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) {
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
void add(List<int> data) {
addedData.add(data);
}
@override
Future<void> close() async {
closeCalled = true;
}
@override
Future<bool> get done => doneCompleter.future;
@override
void destroy() {}
}