Implement Isolate.pause in dart2js.

This is a trivial implementation of Isolate.pause.
It introduces out-of-band control messages to the control port
int a simple way (if receiveport == controlport treat specially).

Control port is now created for each isolate, along with a Capability allowing
pausing.

R=floitsch@google.com

Review URL: https://codereview.chromium.org//153553009

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@32485 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
lrn@google.com 2014-02-10 08:33:14 +00:00
parent 60e39dcfa2
commit f7ac102369
4 changed files with 89 additions and 23 deletions

View file

@ -22,8 +22,6 @@ import 'dart:_foreign_helper' show DART_CLOSURE_TO_JS,
IsolateContext;
import 'dart:_interceptors' show JSExtendableArray;
ReceivePort controlPort;
/**
* Called by the compiler to support switching
* between isolates when we get a callback from the DOM.
@ -236,6 +234,39 @@ class _IsolateContext implements IsolateContext {
// native object containing all globals of an isolate.
final isolateStatics = JS_CREATE_ISOLATE();
final RawReceivePortImpl controlPort = new RawReceivePortImpl._controlPort();
final Capability pauseCapability = new Capability();
// TODO(lrn): Store these in single "pausestate" object, so they don't take
// up as much room when not pausing.
bool isPaused = false;
List<_IsolateEvent> delayedEvents = [];
Set<Capability> pauseTokens = new Set();
_IsolateContext() {
this.registerWeak(controlPort._id, controlPort);
}
void addPause(Capability authentification, Capability resume) {
if (pauseCapability != authentification) return;
if (pauseTokens.add(resume) && !isPaused) {
isPaused = true;
}
}
void removePause(Capability resume) {
if (!isPaused) return;
pauseTokens.remove(resume);
if (pauseTokens.isEmpty) {
while(delayedEvents.isNotEmpty) {
_IsolateEvent event = delayedEvents.removeLast();
_globalState.topEventLoop.prequeue(event);
}
isPaused = false;
}
}
/**
* Run [code] in the context of the isolate represented by [this].
*/
@ -257,15 +288,32 @@ class _IsolateContext implements IsolateContext {
JS_SET_CURRENT_ISOLATE(isolateStatics);
}
void handleControlMessage(message) {
switch (message[0]) {
case "pause":
addPause(message[1], message[2]);
break;
case "resume":
removePause(message[1]);
break;
default:
print("UNKOWN MESSAGE: $message");
}
}
/** Looks up a port registered for this isolate. */
RawReceivePortImpl lookup(int portId) => ports[portId];
/** Registers a port on this isolate. */
void register(int portId, RawReceivePortImpl port) {
void _addRegistration(int portId, RawReceivePortImpl port) {
if (ports.containsKey(portId)) {
throw new Exception("Registry: ports must be registered only once.");
}
ports[portId] = port;
}
/** Registers a port on this isolate. */
void register(int portId, RawReceivePortImpl port) {
_addRegistration(portId, port);
_updateGlobalState();
}
@ -276,8 +324,7 @@ class _IsolateContext implements IsolateContext {
*/
void registerWeak(int portId, RawReceivePortImpl port) {
weakPorts.add(portId);
// 'register' updates the global state.
register(portId, port);
_addRegistration(portId, port);
}
_updateGlobalState() {
@ -291,6 +338,7 @@ class _IsolateContext implements IsolateContext {
/** Unregister a port on this isolate. */
void unregister(int portId) {
ports.remove(portId);
weakPorts.remove(portId);
_updateGlobalState();
}
}
@ -306,6 +354,10 @@ class _EventLoop {
events.addLast(new _IsolateEvent(isolate, fn, msg));
}
void prequeue(_IsolateEvent event) {
events.addFirst(event);
}
_IsolateEvent dequeue() {
if (events.isEmpty) return null;
return events.removeFirst();
@ -383,6 +435,10 @@ class _IsolateEvent {
_IsolateEvent(this.isolate, this.fn, this.message);
void process() {
if (isolate.isPaused) {
isolate.delayedEvents.add(this);
return;
}
isolate.eval(fn);
}
}
@ -582,7 +638,7 @@ class IsolateNatives {
return JS("", "new #()", ctor);
}
static Future<SendPort> spawnFunction(void topLevelFunction(message),
static Future<List> spawnFunction(void topLevelFunction(message),
message) {
final name = _getJSFunctionName(topLevelFunction);
if (name == null) {
@ -592,25 +648,25 @@ class IsolateNatives {
return spawn(name, null, null, message, false, false);
}
static Future<SendPort> spawnUri(Uri uri, List<String> args, message) {
static Future<List> spawnUri(Uri uri, List<String> args, message) {
return spawn(null, uri.toString(), args, message, false, true);
}
// TODO(sigmund): clean up above, after we make the new API the default:
/// If [uri] is `null` it is replaced with the current script.
static Future<SendPort> spawn(String functionName, String uri,
List<String> args, message,
bool isLight, bool isSpawnUri) {
static Future<List> spawn(String functionName, String uri,
List<String> args, message,
bool isLight, bool isSpawnUri) {
// Assume that the compiled version of the Dart file lives just next to the
// dart file.
// TODO(floitsch): support precompiled version of dart2js output.
if (uri != null && uri.endsWith(".dart")) uri += ".js";
ReceivePort port = new ReceivePort();
Future<SendPort> result = port.first.then((msg) {
Future<List> result = port.first.then((msg) {
assert(msg[0] == _SPAWNED_SIGNAL);
return msg[1];
return msg;
});
SendPort signalReply = port.sendPort;
@ -666,8 +722,9 @@ class IsolateNatives {
_IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
Primitives.initializeStatics(context.id);
// The isolate's port does not keep the isolate open.
controlPort = new ReceivePortImpl.weak();
replyTo.send([_SPAWNED_SIGNAL, controlPort.sendPort]);
replyTo.send([_SPAWNED_SIGNAL,
context.controlPort.sendPort,
context.pauseCapability]);
if (!isSpawnUri) {
topLevel(message);
} else if (topLevel is _MainFunctionArgsMessage) {
@ -763,6 +820,10 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
if (shouldSerialize) {
msg = _serializeMessage(msg);
}
if (isolate.controlPort == _receivePort) {
isolate.handleControlMessage(msg);
return;
}
_globalState.topEventLoop.enqueue(isolate, () {
if (!_receivePort._isClosed) {
if (shouldSerialize) {
@ -823,18 +884,23 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
class RawReceivePortImpl implements RawReceivePort {
static int _nextFreeId = 1;
final int _id = _nextFreeId++;
final int _id;
Function _handler;
bool _isClosed = false;
RawReceivePortImpl(this._handler) {
RawReceivePortImpl(this._handler) : _id = _nextFreeId++ {
_globalState.currentContext.register(_id, this);
}
RawReceivePortImpl.weak(this._handler) {
RawReceivePortImpl.weak(this._handler) : _id = _nextFreeId++ {
_globalState.currentContext.registerWeak(_id, this);
}
// Creates the control port of an isolate.
// This is created before the isolate context object itself,
// so it cannot access the static _nextFreeId field.
RawReceivePortImpl._controlPort() : _handler = null, _id = 0;
void set handler(Function newHandler) {
_handler = newHandler;
}

View file

@ -15,7 +15,7 @@ patch class Isolate {
patch static Future<Isolate> spawn(void entryPoint(message), var message) {
try {
return IsolateNatives.spawnFunction(entryPoint, message)
.then((controlPort) => new Isolate._fromControlPort(controlPort));
.then((msg) => new Isolate._fromControlPort(msg[1], msg[2]));
} catch (e, st) {
return new Future<Isolate>.error(e, st);
}
@ -34,7 +34,7 @@ patch class Isolate {
throw new ArgumentError("Args must be a list of Strings $args");
}
return IsolateNatives.spawnUri(uri, args, message)
.then((controlPort) => new Isolate._fromControlPort(controlPort));
.then((msg) => new Isolate._fromControlPort(msg[1], msg[2]));
} catch (e, st) {
return new Future<Isolate>.error(e, st);
}

View file

@ -11,17 +11,17 @@ serialization_test: SkipByDesign # Tests dart2js-specific serialization code
isolate_throws_test/01: Skip # Issue 12587
compile_time_error_test/01: Skip # Issue 12587
capability_test: Fail # Not implemented yet
pause_test: Fail
pause_test: Fail # Not implemented yet
[ $compiler == dart2js && $jscl ]
browser/*: SkipByDesign # Browser specific tests
pause_test: Fail # non-zero timer not supported.
[ $compiler == dart2js && $runtime == drt ]
isolate_stress_test: Pass, Crash # Issue 14438
[ $compiler == dart2js ]
serialization_test: RuntimeError # Issue 1882, tries to access class TestingOnly declared in isolate_patch.dart
pause_test: Fail
[ $compiler == dart2js && $runtime == ie9 ]
browser/typed_data_message_test: Fail # Issue 12624

View file

@ -24,10 +24,10 @@ main(){
reply.handler = completer.complete;
Isolate.spawn(isomain1, reply.sendPort).then((Isolate iso) {
isolate = iso;
resume = isolate.pause();
return completer.future;
}).then((echoPort) {
// Isolate has been created, and first response has been returned.
resume = isolate.pause();
echoPort.send(24);
reply.handler = (v) {
throw "RESPONSE WHILE PAUSED?!?";