diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart index 05208648a9a..cd3ee4ad56d 100644 --- a/sdk/lib/_internal/lib/isolate_helper.dart +++ b/sdk/lib/_internal/lib/isolate_helper.dart @@ -22,8 +22,6 @@ import 'dart:_foreign_helper' show DART_CLOSURE_TO_JS, IsolateContext; import 'dart:_interceptors' show JSExtendableArray; -ReceivePort controlPort; - /** * Called by the compiler to support switching * between isolates when we get a callback from the DOM. @@ -236,6 +234,39 @@ class _IsolateContext implements IsolateContext { // native object containing all globals of an isolate. final isolateStatics = JS_CREATE_ISOLATE(); + final RawReceivePortImpl controlPort = new RawReceivePortImpl._controlPort(); + + final Capability pauseCapability = new Capability(); + + // TODO(lrn): Store these in single "pausestate" object, so they don't take + // up as much room when not pausing. + bool isPaused = false; + List<_IsolateEvent> delayedEvents = []; + Set pauseTokens = new Set(); + + _IsolateContext() { + this.registerWeak(controlPort._id, controlPort); + } + + void addPause(Capability authentification, Capability resume) { + if (pauseCapability != authentification) return; + if (pauseTokens.add(resume) && !isPaused) { + isPaused = true; + } + } + + void removePause(Capability resume) { + if (!isPaused) return; + pauseTokens.remove(resume); + if (pauseTokens.isEmpty) { + while(delayedEvents.isNotEmpty) { + _IsolateEvent event = delayedEvents.removeLast(); + _globalState.topEventLoop.prequeue(event); + } + isPaused = false; + } + } + /** * Run [code] in the context of the isolate represented by [this]. */ @@ -257,15 +288,32 @@ class _IsolateContext implements IsolateContext { JS_SET_CURRENT_ISOLATE(isolateStatics); } + void handleControlMessage(message) { + switch (message[0]) { + case "pause": + addPause(message[1], message[2]); + break; + case "resume": + removePause(message[1]); + break; + default: + print("UNKOWN MESSAGE: $message"); + } + } + /** Looks up a port registered for this isolate. */ RawReceivePortImpl lookup(int portId) => ports[portId]; - /** Registers a port on this isolate. */ - void register(int portId, RawReceivePortImpl port) { + void _addRegistration(int portId, RawReceivePortImpl port) { if (ports.containsKey(portId)) { throw new Exception("Registry: ports must be registered only once."); } ports[portId] = port; + } + + /** Registers a port on this isolate. */ + void register(int portId, RawReceivePortImpl port) { + _addRegistration(portId, port); _updateGlobalState(); } @@ -276,8 +324,7 @@ class _IsolateContext implements IsolateContext { */ void registerWeak(int portId, RawReceivePortImpl port) { weakPorts.add(portId); - // 'register' updates the global state. - register(portId, port); + _addRegistration(portId, port); } _updateGlobalState() { @@ -291,6 +338,7 @@ class _IsolateContext implements IsolateContext { /** Unregister a port on this isolate. */ void unregister(int portId) { ports.remove(portId); + weakPorts.remove(portId); _updateGlobalState(); } } @@ -306,6 +354,10 @@ class _EventLoop { events.addLast(new _IsolateEvent(isolate, fn, msg)); } + void prequeue(_IsolateEvent event) { + events.addFirst(event); + } + _IsolateEvent dequeue() { if (events.isEmpty) return null; return events.removeFirst(); @@ -383,6 +435,10 @@ class _IsolateEvent { _IsolateEvent(this.isolate, this.fn, this.message); void process() { + if (isolate.isPaused) { + isolate.delayedEvents.add(this); + return; + } isolate.eval(fn); } } @@ -582,7 +638,7 @@ class IsolateNatives { return JS("", "new #()", ctor); } - static Future spawnFunction(void topLevelFunction(message), + static Future spawnFunction(void topLevelFunction(message), message) { final name = _getJSFunctionName(topLevelFunction); if (name == null) { @@ -592,25 +648,25 @@ class IsolateNatives { return spawn(name, null, null, message, false, false); } - static Future spawnUri(Uri uri, List args, message) { + static Future spawnUri(Uri uri, List args, message) { return spawn(null, uri.toString(), args, message, false, true); } // TODO(sigmund): clean up above, after we make the new API the default: /// If [uri] is `null` it is replaced with the current script. - static Future spawn(String functionName, String uri, - List args, message, - bool isLight, bool isSpawnUri) { + static Future spawn(String functionName, String uri, + List args, message, + bool isLight, bool isSpawnUri) { // Assume that the compiled version of the Dart file lives just next to the // dart file. // TODO(floitsch): support precompiled version of dart2js output. if (uri != null && uri.endsWith(".dart")) uri += ".js"; ReceivePort port = new ReceivePort(); - Future result = port.first.then((msg) { + Future result = port.first.then((msg) { assert(msg[0] == _SPAWNED_SIGNAL); - return msg[1]; + return msg; }); SendPort signalReply = port.sendPort; @@ -666,8 +722,9 @@ class IsolateNatives { _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT(); Primitives.initializeStatics(context.id); // The isolate's port does not keep the isolate open. - controlPort = new ReceivePortImpl.weak(); - replyTo.send([_SPAWNED_SIGNAL, controlPort.sendPort]); + replyTo.send([_SPAWNED_SIGNAL, + context.controlPort.sendPort, + context.pauseCapability]); if (!isSpawnUri) { topLevel(message); } else if (topLevel is _MainFunctionArgsMessage) { @@ -763,6 +820,10 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { if (shouldSerialize) { msg = _serializeMessage(msg); } + if (isolate.controlPort == _receivePort) { + isolate.handleControlMessage(msg); + return; + } _globalState.topEventLoop.enqueue(isolate, () { if (!_receivePort._isClosed) { if (shouldSerialize) { @@ -823,18 +884,23 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort { class RawReceivePortImpl implements RawReceivePort { static int _nextFreeId = 1; - final int _id = _nextFreeId++; + final int _id; Function _handler; bool _isClosed = false; - RawReceivePortImpl(this._handler) { + RawReceivePortImpl(this._handler) : _id = _nextFreeId++ { _globalState.currentContext.register(_id, this); } - RawReceivePortImpl.weak(this._handler) { + RawReceivePortImpl.weak(this._handler) : _id = _nextFreeId++ { _globalState.currentContext.registerWeak(_id, this); } + // Creates the control port of an isolate. + // This is created before the isolate context object itself, + // so it cannot access the static _nextFreeId field. + RawReceivePortImpl._controlPort() : _handler = null, _id = 0; + void set handler(Function newHandler) { _handler = newHandler; } diff --git a/sdk/lib/_internal/lib/isolate_patch.dart b/sdk/lib/_internal/lib/isolate_patch.dart index 09b79553abb..9d931a95c99 100644 --- a/sdk/lib/_internal/lib/isolate_patch.dart +++ b/sdk/lib/_internal/lib/isolate_patch.dart @@ -15,7 +15,7 @@ patch class Isolate { patch static Future spawn(void entryPoint(message), var message) { try { return IsolateNatives.spawnFunction(entryPoint, message) - .then((controlPort) => new Isolate._fromControlPort(controlPort)); + .then((msg) => new Isolate._fromControlPort(msg[1], msg[2])); } catch (e, st) { return new Future.error(e, st); } @@ -34,7 +34,7 @@ patch class Isolate { throw new ArgumentError("Args must be a list of Strings $args"); } return IsolateNatives.spawnUri(uri, args, message) - .then((controlPort) => new Isolate._fromControlPort(controlPort)); + .then((msg) => new Isolate._fromControlPort(msg[1], msg[2])); } catch (e, st) { return new Future.error(e, st); } diff --git a/tests/isolate/isolate.status b/tests/isolate/isolate.status index a753ec2dbc5..b2517d12f4b 100644 --- a/tests/isolate/isolate.status +++ b/tests/isolate/isolate.status @@ -11,17 +11,17 @@ serialization_test: SkipByDesign # Tests dart2js-specific serialization code isolate_throws_test/01: Skip # Issue 12587 compile_time_error_test/01: Skip # Issue 12587 capability_test: Fail # Not implemented yet -pause_test: Fail +pause_test: Fail # Not implemented yet [ $compiler == dart2js && $jscl ] browser/*: SkipByDesign # Browser specific tests +pause_test: Fail # non-zero timer not supported. [ $compiler == dart2js && $runtime == drt ] isolate_stress_test: Pass, Crash # Issue 14438 [ $compiler == dart2js ] serialization_test: RuntimeError # Issue 1882, tries to access class TestingOnly declared in isolate_patch.dart -pause_test: Fail [ $compiler == dart2js && $runtime == ie9 ] browser/typed_data_message_test: Fail # Issue 12624 diff --git a/tests/isolate/pause_test.dart b/tests/isolate/pause_test.dart index 71691adfc8c..22a4fce6018 100644 --- a/tests/isolate/pause_test.dart +++ b/tests/isolate/pause_test.dart @@ -24,10 +24,10 @@ main(){ reply.handler = completer.complete; Isolate.spawn(isomain1, reply.sendPort).then((Isolate iso) { isolate = iso; - resume = isolate.pause(); return completer.future; }).then((echoPort) { // Isolate has been created, and first response has been returned. + resume = isolate.pause(); echoPort.send(24); reply.handler = (v) { throw "RESPONSE WHILE PAUSED?!?";