Add tracking of open processes to the new io resource tracking.

I plan on refactoring the io_resource_info.dart file a bit in a follow up, but I keept this simple

I also plan on adding some sort of sanity checking when the vm exits, to make sure
all 3 maps are empty (in debug mode)

This fixes issue #24314, before we would hold on to process that failed to start
running in the _processes map.

Closes #24314
BUG=
R=johnmccutchan@google.com

Review URL: https://codereview.chromium.org//1331033003 .
This commit is contained in:
Rico Wind 2015-09-14 12:54:36 +02:00
parent de456dac24
commit 38b579fa78
3 changed files with 205 additions and 47 deletions

View file

@ -176,10 +176,9 @@ class _ProcessStartStatus {
// implicit constructor.
class _ProcessImplNativeWrapper extends NativeFieldWrapperClass1 {}
class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
implements Process {
// Use default Map so we keep order.
static Map<int, _ProcessImpl> _processes = new Map<int, _ProcessImpl>();
class _ProcessImpl extends _ProcessImplNativeWrapper implements Process {
_ProcessResourceInfo _resourceInfo;
static bool connectedResourceHandler = false;
_ProcessImpl(String path,
List<String> arguments,
@ -188,7 +187,14 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
bool includeParentEnvironment,
bool runInShell,
ProcessStartMode mode) : super() {
_processes[_serviceId] = this;
if (!connectedResourceHandler) {
registerExtension('__getProcesses',
_ProcessResourceInfo.getStartedProcesses);
registerExtension('__getProcessById',
_ProcessResourceInfo.getProcessInfoMapById);
connectedResourceHandler = true;
}
if (runInShell) {
arguments = _getShellArguments(path, arguments);
path = _getShellCommand();
@ -267,38 +273,6 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
_started = false;
}
String get _serviceTypePath => 'io/processes';
String get _serviceTypeName => 'Process';
Map _toJSON(bool ref) {
var r = {
'id': _servicePath,
'type': _serviceType(ref),
'name': '$_path',
'user_name': '$_path',
'pid': '$pid',
'arguments': _arguments.join(' '),
};
if (ref) {
return r;
}
r['started'] = _started;
r['ended'] = _ended;
r['path'] = _path;
r['environment'] = _environment;
r['workingDirectory'] = _workingDirectory == null ? '.' : _workingDirectory;
if (_stdin._sink._nativeSocket.owner != null) {
r['stdin'] = _stdin._sink._nativeSocket._toJSON(true);
}
if (_stdout._stream._nativeSocket.owner != null) {
r['stdout'] = _stdout._stream._nativeSocket._toJSON(true);
}
if (_stderr._stream._nativeSocket.owner != null) {
r['stderr'] = _stderr._stream._nativeSocket._toJSON(true);
}
return r;
}
static String _getShellCommand() {
if (Platform.isWindows) {
return 'cmd.exe';
@ -418,6 +392,7 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
}
_started = true;
_resourceInfo = new _ProcessResourceInfo(this);
// Setup an exit handler to handle internal cleanup and possible
// callback when a process terminates.
@ -439,7 +414,7 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
_exitCode.complete(exitCode(exitDataBuffer));
// Kill stdin, helping hand if the user forgot to do it.
_stdin._sink.destroy();
_processes.remove(_serviceId);
_resourceInfo.stopped();
}
exitDataBuffer.setRange(
@ -477,6 +452,8 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
status._errorCode);
}
_resourceInfo = new _ProcessResourceInfo(this);
var result = _wait(
_stdin._sink._nativeSocket,
_stdout._stream._nativeSocket,
@ -488,7 +465,7 @@ class _ProcessImpl extends _ProcessImplNativeWrapper with _ServiceObject
return encoding.decode(output);
}
_processes.remove(_serviceId);
_resourceInfo.stopped();
return new ProcessResult(
result[0],

View file

@ -0,0 +1,123 @@
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:developer';
import 'dart:io' as io;
import 'package:observatory/service_io.dart';
import 'package:unittest/unittest.dart';
import 'test_helper.dart';
Future setupProcesses() async {
var dir = await io.Directory.systemTemp.createTemp('file_service');
var args = ['--pause_isolates_on_start', io.Platform.script.toFilePath()];
var process1;
var process2;
var process3;
void closeDown() {
if (process1 != null) {
process1.kill();
}
if (process2 != null) {
process2.kill();
}
if (process3 != null) {
process3.kill();
}
dir.deleteSync(recursive: true);
}
Future<ServiceExtensionResponse> cleanup(ignored_a, ignored_b) {
closeDown();
var result = JSON.encode({'type' : 'foobar'});
return new Future.value(new ServiceExtensionResponse.result(result));
}
Future<ServiceExtensionResponse> setup(ignored_a, ignored_b) async {
try {
process1 = await io.Process.start(io.Platform.executable, args);
process2 = await io.Process.start(io.Platform.executable,
args..add('foobar'));
var codeFilePath = dir.path + io.Platform.pathSeparator + "other_file";
var codeFile = new io.File(codeFilePath);
await codeFile.writeAsString(
'''
import "dart:io";
void main() async {
await stdin.drain();
}
''');
process3 = await io.Process.start(io.Platform.executable,
[codeFilePath]);
} catch (e) {
closeDown();
throw e;
}
var result =
JSON.encode({'type': 'foobar',
'pids' : [process1.pid, process2.pid, process3.pid]});
return new Future.value(new ServiceExtensionResponse.result(result));
}
Future<ServiceExtensionResponse> closeStdin(ignored_a, ignored_b) async {
process3.stdin.close();
var result = JSON.encode({'type' : 'foobar'});
var returnValue =
new Future.value(new ServiceExtensionResponse.result(result));
return process3.exitCode.then((int exit) => returnValue);
}
registerExtension('__cleanup', cleanup);
registerExtension('__setup', setup);
registerExtension('__closeStdin', closeStdin);
}
var processTests = [
// Initial.
(Isolate isolate) async {
var setup = await isolate.invokeRpcNoUpgrade('__setup', {});
try {
var all = await isolate.invokeRpcNoUpgrade('__getProcesses', {});
expect(all['type'], equals('_startedprocesses'));
expect(all['data'].length, equals(3));
var first = await isolate.invokeRpcNoUpgrade(
'__getProcessById', { 'id' : all['data'][0]['id'] });
expect(first['name'], io.Platform.executable);
expect(first['pid'], equals(setup['pids'][0]));
expect(first['arguments'].contains('foobar'), isFalse);
var second = await isolate.invokeRpcNoUpgrade(
'__getProcessById', { 'id' : all['data'][1]['id'] });
expect(second['name'], io.Platform.executable);
expect(second['pid'], equals(setup['pids'][1]));
expect(second['arguments'].contains('foobar'), isTrue);
expect(second['pid'] != first['pid'], isTrue);
var third = await isolate.invokeRpcNoUpgrade(
'__getProcessById', { 'id' : all['data'][2]['id'] });
expect(third['name'], io.Platform.executable);
expect(third['pid'], equals(setup['pids'][2]));
expect(third['pid'] != first['pid'], isTrue);
expect(third['pid'] != second['pid'], isTrue);
await isolate.invokeRpcNoUpgrade('__closeStdin', {});
all = await isolate.invokeRpcNoUpgrade('__getProcesses', {});
expect(all['type'], equals('_startedprocesses'));
expect(all['data'].length, equals(2));
} finally {
await isolate.invokeRpcNoUpgrade('__cleanup', {});
}
},
];
main(args) async => runIsolateTests(args, processTests,
testeeBefore:setupProcesses);

View file

@ -10,9 +10,9 @@ abstract class _IOResourceInfo {
String get name;
static int _count = 0;
_IOResourceInfo(this.type) : id = _IOResourceInfo.getNextID();
static final Stopwatch _sw = new Stopwatch()..start();
String toJSON();
_IOResourceInfo(this.type) : id = _IOResourceInfo.getNextID();
/// Get the full set of values for a specific implementation. This is normally
/// looked up based on an id from a referenceValueMap.
@ -39,8 +39,8 @@ abstract class _ReadWriteResourceInfo extends _IOResourceInfo {
double lastRead;
double lastWrite;
static final Stopwatch _sw = new Stopwatch()..start();
static double get timestamp => _sw.elapsedMicroseconds / 1000000.0;
static double get timestamp =>
_IOResourceInfo._sw.elapsedMicroseconds / 1000000.0;
// Not all call sites use this. In some cases, e.g., a socket, a read does
// not always mean that we actually read some bytes (we may do a read to see
@ -83,10 +83,6 @@ abstract class _ReadWriteResourceInfo extends _IOResourceInfo {
'last_read': lastRead,
'last_write': lastWrite
};
String toJSON() {
return JSON.encode(fullValueMap);
}
}
class _FileResourceInfo extends _ReadWriteResourceInfo {
@ -141,6 +137,68 @@ class _FileResourceInfo extends _ReadWriteResourceInfo {
}
}
class _ProcessResourceInfo extends _IOResourceInfo{
static const String TYPE = '_process';
final process;
final int startedAt;
static Map<int, _ProcessResourceInfo> startedProcesses =
new Map<int, _ProcessResourceInfo>();
_ProcessResourceInfo(this.process) :
startedAt = new DateTime.now().millisecondsSinceEpoch,
super(TYPE) {
ProcessStarted(this);
}
String get name => process._path;
void stopped() => ProcessStopped(this);
Map<String, String> get fullValueMap =>
{
'type': type,
'id': id,
'name': name,
'pid': process.pid,
'started_at': startedAt,
'arguments': process._arguments,
'working_directory':
process._workingDirectory == null ? '.' : process._workingDirectory,
};
static ProcessStarted(_ProcessResourceInfo info) {
assert(!startedProcesses.containsKey(info.id));
startedProcesses[info.id] = info;
}
static ProcessStopped(_ProcessResourceInfo info) {
assert(startedProcesses.containsKey(info.id));
startedProcesses.remove(info.id);
}
static Iterable<Map<String, String>> getStartedProcessesList() =>
new List.from(startedProcesses.values.map((e) => e.referenceValueMap));
static Future<ServiceExtensionResponse> getStartedProcesses(
String function, Map<String, String> params) {
assert(function == '__getProcesses');
var data = {'type': '_startedprocesses', 'data': getStartedProcessesList()};
var json = JSON.encode(data);
return new Future.value(new ServiceExtensionResponse.result(json));
}
static Future<ServiceExtensionResponse> getProcessInfoMapById(
String function, Map<String, String> params) {
var id = int.parse(params['id']);
var result = startedProcesses.containsKey(id)
? startedProcesses[id].fullValueMap
: {};
var json = JSON.encode(result);
return new Future.value(new ServiceExtensionResponse.result(json));
}
}
class _SocketResourceInfo extends _ReadWriteResourceInfo {
static const String TCP_STRING = 'TCP';
static const String UDP_STRING = 'UDP';