[VM/isolate] Migrate 'dart:isolate' and patch files for NNBD

Change-Id: I1e894815599d9b1dbb0551428eb7a679a23edd6e
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/131483
Commit-Queue: Siva Annamalai <asiva@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
Reviewed-by: Alexander Markov <alexmarkov@google.com>
This commit is contained in:
asiva 2020-01-15 18:09:04 +00:00 committed by commit-bot@chromium.org
parent 55637a41a8
commit 1564d596bb
8 changed files with 314 additions and 298 deletions

View file

@ -112,19 +112,13 @@ void _isolateScheduleImmediate(void callback()) {
@pragma("vm:entry-point", "call")
void _runPendingImmediateCallback() {
if (_pendingImmediateCallback != null) {
var callback = _pendingImmediateCallback;
final callback = _pendingImmediateCallback;
if (callback != null) {
_pendingImmediateCallback = null;
callback();
}
}
_ImmediateCallback _removePendingImmediateCallback() {
var callback = _pendingImmediateCallback;
_pendingImmediateCallback = null;
return callback;
}
/// The embedder can execute this function to get hold of
/// [_isolateScheduleImmediate] above.
@pragma("vm:entry-point", "call")

View file

@ -18,7 +18,8 @@ class _TimerHeap {
List<_Timer> _list;
int _used = 0;
_TimerHeap([int initSize = 7]) : _list = new List<_Timer>(initSize);
_TimerHeap([int initSize = 7])
: _list = List<_Timer>.filled(initSize, _Timer._sentinelTimer);
bool get isEmpty => _used == 0;
@ -63,14 +64,15 @@ class _TimerHeap {
}
void _resize() {
var newList = new List<_Timer>(_list.length * 2 + 1);
var newList =
List<_Timer>.filled(_list.length * 2 + 1, _Timer._sentinelTimer);
newList.setRange(0, _used, _list);
_list = newList;
}
void _bubbleUp(_Timer timer) {
while (!isFirst(timer)) {
Timer parent = _parent(timer);
_Timer parent = _parent(timer);
if (timer._compareTo(parent) < 0) {
_swap(timer, parent);
} else {
@ -106,10 +108,7 @@ class _TimerHeap {
_list[second._indexOrNext] = second;
}
Timer _parent(_Timer timer) => _list[_parentIndex(timer._indexOrNext)];
Timer _leftChild(_Timer timer) => _list[_leftChildIndex(timer._indexOrNext)];
Timer _rightChild(_Timer timer) =>
_list[_rightChildIndex(timer._indexOrNext)];
_Timer _parent(_Timer timer) => _list[_parentIndex(timer._indexOrNext)];
static int _parentIndex(int index) => (index - 1) ~/ 2;
static int _leftChildIndex(int index) => 2 * index + 1;
@ -120,6 +119,9 @@ class _Timer implements Timer {
// Cancels the timer in the event handler.
static const _NO_TIMER = -1;
// A sentinel timer.
static _Timer _sentinelTimer = _Timer._sentinel();
// We distinguish what kind of message arrived based on the value being sent.
static const _ZERO_EVENT = 1;
static const _TIMEOUT_EVENT = null;
@ -128,7 +130,7 @@ class _Timer implements Timer {
// end up on the TimerHeap. Timers with a timeout of 0 are queued in a list.
static final _heap = new _TimerHeap();
static _Timer _firstZeroTimer;
static _Timer _lastZeroTimer;
static _Timer _lastZeroTimer = _sentinelTimer;
// We use an id to be able to sort timers with the same expiration time.
// ids are recycled after ID_MASK enqueues or when the timer queue is empty.
@ -137,7 +139,7 @@ class _Timer implements Timer {
static RawReceivePort _receivePort;
static SendPort _sendPort;
static int _scheduledWakeupTime;
static int _scheduledWakeupTime = 0;
static bool _handlingCallbacks = false;
@ -158,11 +160,19 @@ class _Timer implements Timer {
return result;
}
_Timer._sentinel()
: _callback = null,
_wakeupTime = 0,
_milliSeconds = 0,
_repeating = false,
_indexOrNext = null,
_id = -1;
_Timer._internal(
this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
: _id = _nextId();
static Timer _createTimer(
static _Timer _createTimer(
void callback(Timer timer), int milliSeconds, bool repeating) {
// Negative timeouts are treated as if 0 timeout.
if (milliSeconds < 0) {
@ -253,29 +263,33 @@ class _Timer implements Timer {
// Enqueue one message for each zero timer. To be able to distinguish from
// EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
static void _notifyZeroHandler() {
if (_sendPort == null) {
_createTimerHandler();
var port = _sendPort;
if (port == null) {
port = _createTimerHandler();
_sendPort = port;
}
_sendPort.send(_ZERO_EVENT);
port.send(_ZERO_EVENT);
}
// Handle the notification of a zero timer. Make sure to also execute non-zero
// timers with a lower expiration time.
static List _queueFromZeroEvent() {
var pendingTimers = new List();
assert(_firstZeroTimer != null);
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
_Timer ftimer = _firstZeroTimer;
if (ftimer != null) {
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(ftimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
return pendingTimers;
}
@ -305,15 +319,15 @@ class _Timer implements Timer {
// Only send a message if the requested wakeup time differs from the
// already scheduled wakeup time.
var wakeupTime = _heap.first._wakeupTime;
if ((_scheduledWakeupTime == null) ||
(wakeupTime != _scheduledWakeupTime)) {
if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) {
_scheduleWakeup(wakeupTime);
}
}
static List _queueFromTimeoutEvent() {
var pendingTimers = new List();
if (_firstZeroTimer != null) {
_Timer ftimer = _firstZeroTimer;
if (ftimer != null) {
// Collect pending timers from the timer heap that have an expiration
// prior to the next zero timer.
// By definition the first zero timer has been scheduled before the
@ -321,7 +335,7 @@ class _Timer implements Timer {
// timer are expired. The first zero timer will be dispatched when its
// corresponding message is delivered.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
while (!_heap.isEmpty && (_heap.first._compareTo(ftimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
@ -388,10 +402,7 @@ class _Timer implements Timer {
timer._enqueue();
}
// Execute pending micro tasks.
var immediateCallback = _removePendingImmediateCallback();
if (immediateCallback != null) {
immediateCallback();
}
_runPendingImmediateCallback();
}
}
} finally {
@ -412,7 +423,7 @@ class _Timer implements Timer {
assert(pendingTimers.length > 0);
} else {
assert(msg == _TIMEOUT_EVENT);
_scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
_scheduledWakeupTime = 0; // Consumed the last scheduled wakeup now.
pendingTimers = _queueFromTimeoutEvent();
}
_runTimers(pendingTimers);
@ -423,10 +434,12 @@ class _Timer implements Timer {
// Tell the event handler to wake this isolate at a specific time.
static void _scheduleWakeup(int wakeupTime) {
if (_sendPort == null) {
_createTimerHandler();
var port = _sendPort;
if (port == null) {
port = _createTimerHandler();
_sendPort = port;
}
VMLibraryHooks.eventHandlerSendData(null, _sendPort, wakeupTime);
VMLibraryHooks.eventHandlerSendData(null, port, wakeupTime);
_scheduledWakeupTime = wakeupTime;
}
@ -434,25 +447,30 @@ class _Timer implements Timer {
static void _cancelWakeup() {
if (_sendPort != null) {
VMLibraryHooks.eventHandlerSendData(null, _sendPort, _NO_TIMER);
_scheduledWakeupTime = null;
_scheduledWakeupTime = 0;
}
}
// Create a receive port and register a message handler for the timer
// events.
static void _createTimerHandler() {
static SendPort _createTimerHandler() {
assert(_receivePort == null);
assert(_sendPort == null);
_receivePort = new RawReceivePort(_handleMessage);
_sendPort = _receivePort.sendPort;
_scheduledWakeupTime = null;
RawReceivePort port = new RawReceivePort(_handleMessage);
_receivePort = port;
_sendPort = port.sendPort;
_scheduledWakeupTime = 0;
return port.sendPort;
}
static void _shutdownTimerHandler() {
_receivePort.close();
_receivePort = null;
var port = _receivePort;
if (port != null) {
port.close();
_receivePort = null;
}
_sendPort = null;
_scheduledWakeupTime = null;
_scheduledWakeupTime = 0;
}
// The Timer factory registered with the dart:async library by the embedder.

View file

@ -61,7 +61,7 @@ class RawReceivePort {
* event is received.
*/
@patch
factory RawReceivePort([Function handler]) {
factory RawReceivePort([Function? handler]) {
_RawReceivePortImpl result = new _RawReceivePortImpl();
result.handler = handler;
return result;
@ -71,8 +71,9 @@ class RawReceivePort {
class _ReceivePortImpl extends Stream implements ReceivePort {
_ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());
_ReceivePortImpl.fromRawReceivePort(this._rawPort) {
_controller = new StreamController(onCancel: close, sync: true);
_ReceivePortImpl.fromRawReceivePort(this._rawPort)
: _controller = new StreamController(sync: true) {
_controller.onCancel = close;
_rawPort.handler = _controller.add;
}
@ -80,8 +81,8 @@ class _ReceivePortImpl extends Stream implements ReceivePort {
return _rawPort.sendPort;
}
StreamSubscription listen(void onData(var message),
{Function onError, void onDone(), bool cancelOnError}) {
StreamSubscription listen(void onData(var message)?,
{Function? onError, void onDone()?, bool cancelOnError = true}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@ -92,13 +93,13 @@ class _ReceivePortImpl extends Stream implements ReceivePort {
}
final RawReceivePort _rawPort;
StreamController _controller;
final StreamController _controller;
}
typedef void _ImmediateCallback();
/// The callback that has been registered through `scheduleImmediate`.
_ImmediateCallback _pendingImmediateCallback;
_ImmediateCallback? _pendingImmediateCallback;
/// The closure that should be used as scheduleImmediateClosure, when the VM
/// is responsible for the event loop.
@ -110,19 +111,13 @@ void _isolateScheduleImmediate(void callback()) {
@pragma("vm:entry-point", "call")
void _runPendingImmediateCallback() {
if (_pendingImmediateCallback != null) {
var callback = _pendingImmediateCallback;
final callback = _pendingImmediateCallback;
if (callback != null) {
_pendingImmediateCallback = null;
callback();
}
}
_ImmediateCallback _removePendingImmediateCallback() {
var callback = _pendingImmediateCallback;
_pendingImmediateCallback = null;
return callback;
}
/// The embedder can execute this function to get hold of
/// [_isolateScheduleImmediate] above.
@pragma("vm:entry-point", "call")
@ -176,7 +171,7 @@ class _RawReceivePortImpl implements RawReceivePort {
// Call into the VM to close the VM maintained mappings.
_closeInternal() native "RawReceivePortImpl_closeInternal";
void set handler(Function value) {
void set handler(Function? value) {
_handlerMap[this._get_id()] = value;
}
@ -224,8 +219,8 @@ class _SendPortImpl implements SendPort {
}
typedef _NullaryFunction();
typedef _UnaryFunction(Null args);
typedef _BinaryFunction(Null args, Null message);
typedef _UnaryFunction(Never args);
typedef _BinaryFunction(Never args, Never message);
/**
* Takes the real entry point as argument and invokes it with the
@ -261,31 +256,31 @@ Function _getStartMainIsolateFunction() {
*/
@pragma("vm:entry-point", "call")
void _startIsolate(
SendPort parentPort,
SendPort? parentPort,
Function entryPoint,
List<String> args,
var message,
List<String>? args,
Object? message,
bool isSpawnUri,
RawReceivePort controlPort,
List capabilities) {
RawReceivePort? controlPort,
List? capabilities) {
// The control port (aka the main isolate port) does not handle any messages.
if (controlPort != null) {
controlPort.handler = (_) {}; // Nobody home on the control port.
}
if (parentPort != null) {
// Build a message to our parent isolate providing access to the
// current isolate's control port and capabilities.
//
// TODO(floitsch): Send an error message if we can't find the entry point.
var readyMessage = new List(2);
readyMessage[0] = controlPort.sendPort;
readyMessage[1] = capabilities;
if (parentPort != null) {
// Build a message to our parent isolate providing access to the
// current isolate's control port and capabilities.
//
// TODO(floitsch): Send an error message if we can't find the entry point.
var readyMessage = new List<Object?>.filled(2, null);
readyMessage[0] = controlPort.sendPort;
readyMessage[1] = capabilities;
// Out of an excess of paranoia we clear the capabilities from the
// stack. Not really necessary.
capabilities = null;
parentPort.send(readyMessage);
// Out of an excess of paranoia we clear the capabilities from the
// stack. Not really necessary.
capabilities = null;
parentPort.send(readyMessage);
}
}
assert(capabilities == null);
@ -353,35 +348,34 @@ class Isolate {
@patch
static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
String debugName}) async {
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
String? debugName}) async {
// `paused` isn't handled yet.
RawReceivePort readyPort;
// Check for the type of `entryPoint` on the spawning isolate to make
// error-handling easier.
if (entryPoint is! _UnaryFunction) {
throw new ArgumentError(entryPoint);
}
// The VM will invoke [_startIsolate] with entryPoint as argument.
// We do not inherit the package config settings from the parent isolate,
// instead we use the values that were set on the command line.
var packageConfig = VMLibraryHooks.packageConfigString;
var script = VMLibraryHooks.platformScript;
if (script == null) {
// We do not have enough information to support spawning the new
// isolate.
throw new UnsupportedError("Isolate.spawn");
}
if (script.isScheme("package")) {
script = await Isolate.resolvePackageUri(script);
}
final RawReceivePort readyPort = new RawReceivePort();
try {
// Check for the type of `entryPoint` on the spawning isolate to make
// error-handling easier.
if (entryPoint is! _UnaryFunction) {
throw new ArgumentError(entryPoint);
}
// The VM will invoke [_startIsolate] with entryPoint as argument.
readyPort = new RawReceivePort();
// We do not inherit the package config settings from the parent isolate,
// instead we use the values that were set on the command line.
var packageConfig = VMLibraryHooks.packageConfigString;
var script = VMLibraryHooks.platformScript;
if (script == null) {
// We do not have enough information to support spawning the new
// isolate.
throw new UnsupportedError("Isolate.spawn");
}
if (script.scheme == "package") {
script = await Isolate.resolvePackageUri(script);
}
_spawnFunction(
readyPort.sendPort,
script.toString(),
@ -396,26 +390,23 @@ class Isolate {
debugName);
return await _spawnCommon(readyPort);
} catch (e, st) {
if (readyPort != null) {
readyPort.close();
}
readyPort.close();
return await new Future<Isolate>.error(e, st);
}
}
@patch
static Future<Isolate> spawnUri(Uri uri, List<String> args, var message,
{bool paused: false,
SendPort onExit,
SendPort onError,
bool errorsAreFatal,
bool checked,
Map<String, String> environment,
Uri packageRoot,
Uri packageConfig,
bool automaticPackageResolution: false,
String debugName}) async {
RawReceivePort readyPort;
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
Uri? packageRoot,
Uri? packageConfig,
bool automaticPackageResolution = false,
String? debugName}) async {
if (environment != null) {
throw new UnimplementedError("environment");
}
@ -438,38 +429,38 @@ class Isolate {
"packageRoot and a packageConfig.");
}
}
// Resolve the uri against the current isolate's root Uri first.
var spawnedUri = _rootUri!.resolveUri(uri);
// Inherit this isolate's package resolution setup if not overridden.
if (!automaticPackageResolution &&
(packageRoot == null) &&
(packageConfig == null)) {
if (Isolate._packageSupported()) {
packageRoot = await Isolate.packageRoot;
packageConfig = await Isolate.packageConfig;
}
}
// Ensure to resolve package: URIs being handed in as parameters.
if (packageRoot != null) {
// `packages/` directory is no longer supported. Force it null.
// TODO(mfairhurst) Should this throw an exception?
packageRoot = null;
} else if (packageConfig != null) {
// Avoid calling resolvePackageUri if not strictly necessary in case
// the API is not supported.
if (packageConfig.isScheme("package")) {
packageConfig = await Isolate.resolvePackageUri(packageConfig);
}
}
// The VM will invoke [_startIsolate] and not `main`.
var packageRootString = packageRoot?.toString();
var packageConfigString = packageConfig?.toString();
final RawReceivePort readyPort = new RawReceivePort();
try {
// Resolve the uri against the current isolate's root Uri first.
var spawnedUri = _rootUri.resolveUri(uri);
// Inherit this isolate's package resolution setup if not overridden.
if (!automaticPackageResolution &&
(packageRoot == null) &&
(packageConfig == null)) {
if (Isolate._packageSupported()) {
packageRoot = await Isolate.packageRoot;
packageConfig = await Isolate.packageConfig;
}
}
// Ensure to resolve package: URIs being handed in as parameters.
if (packageRoot != null) {
// `packages/` directory is no longer supported. Force it null.
// TODO(mfairhurst) Should this throw an exception?
packageRoot = null;
} else if (packageConfig != null) {
// Avoid calling resolvePackageUri if not strictly necessary in case
// the API is not supported.
if (packageConfig.scheme == "package") {
packageConfig = await Isolate.resolvePackageUri(packageConfig);
}
}
// The VM will invoke [_startIsolate] and not `main`.
readyPort = new RawReceivePort();
var packageRootString = packageRoot?.toString();
var packageConfigString = packageConfig?.toString();
_spawnUri(
readyPort.sendPort,
spawnedUri.toString(),
@ -487,15 +478,13 @@ class Isolate {
debugName);
return await _spawnCommon(readyPort);
} catch (e) {
if (readyPort != null) {
readyPort.close();
}
readyPort.close();
rethrow;
}
}
static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
Completer completer = new Completer<Isolate>.sync();
final completer = new Completer<Isolate>.sync();
readyPort.handler = (readyMessage) {
readyPort.close();
if (readyMessage is List && readyMessage.length == 2) {
@ -538,11 +527,11 @@ class Isolate {
var message,
bool paused,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
String packageRoot,
String packageConfig,
String debugName) native "Isolate_spawnFunction";
SendPort? onExit,
SendPort? onError,
String? packageRoot,
String? packageConfig,
String? debugName) native "Isolate_spawnFunction";
static void _spawnUri(
SendPort readyPort,
@ -550,14 +539,14 @@ class Isolate {
List<String> args,
var message,
bool paused,
SendPort onExit,
SendPort onError,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal,
bool checked,
List environment,
String packageRoot,
String packageConfig,
String debugName) native "Isolate_spawnUri";
bool? checked,
List? environment,
String? packageRoot,
String? packageConfig,
String? debugName) native "Isolate_spawnUri";
static void _sendOOB(port, msg) native "Isolate_sendOOB";
@ -566,7 +555,9 @@ class Isolate {
@patch
void _pause(Capability resumeCapability) {
var msg = new List(4)
// _sendOOB expects a fixed length array and hence we create a fixed
// length array and assign values to it instead of using [ ... ].
var msg = new List<Object?>.filled(4, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _PAUSE
..[2] = pauseCapability
@ -576,7 +567,7 @@ class Isolate {
@patch
void resume(Capability resumeCapability) {
var msg = new List(4)
var msg = new List<Object?>.filled(4, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _RESUME
..[2] = pauseCapability
@ -585,8 +576,8 @@ class Isolate {
}
@patch
void addOnExitListener(SendPort responsePort, {Object response}) {
var msg = new List(4)
void addOnExitListener(SendPort responsePort, {Object? response}) {
var msg = new List<Object?>.filled(4, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _ADD_EXIT
..[2] = responsePort
@ -596,7 +587,7 @@ class Isolate {
@patch
void removeOnExitListener(SendPort responsePort) {
var msg = new List(3)
var msg = new List<Object?>.filled(3, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _DEL_EXIT
..[2] = responsePort;
@ -605,7 +596,7 @@ class Isolate {
@patch
void setErrorsFatal(bool errorsAreFatal) {
var msg = new List(4)
var msg = new List<Object?>.filled(4, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _ERROR_FATAL
..[2] = terminateCapability
@ -615,7 +606,7 @@ class Isolate {
@patch
void kill({int priority: beforeNextEvent}) {
var msg = new List(4)
var msg = new List<Object?>.filled(4, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _KILL
..[2] = terminateCapability
@ -624,8 +615,9 @@ class Isolate {
}
@patch
void ping(SendPort responsePort, {Object response, int priority: immediate}) {
var msg = new List(5)
void ping(SendPort responsePort,
{Object? response, int priority: immediate}) {
var msg = new List<Object?>.filled(5, null)
..[0] = 0 // Make room for OOM message type.
..[1] = _PING
..[2] = responsePort
@ -636,7 +628,7 @@ class Isolate {
@patch
void addErrorListener(SendPort port) {
var msg = new List(3)
var msg = new List<Object?>.filled(3, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _ADD_ERROR
..[2] = port;
@ -645,7 +637,7 @@ class Isolate {
@patch
void removeErrorListener(SendPort port) {
var msg = new List(3)
var msg = new List<Object?>.filled(3, null)
..[0] = 0 // Make room for OOB message type.
..[1] = _DEL_ERROR
..[2] = port;
@ -662,7 +654,7 @@ class Isolate {
static List _getPortAndCapabilitiesOfCurrentIsolate()
native "Isolate_getPortAndCapabilitiesOfCurrentIsolate";
static Uri _getCurrentRootUri() {
static Uri? _getCurrentRootUri() {
try {
return Uri.parse(_getCurrentRootUriStr());
} catch (e) {

View file

@ -16,7 +16,8 @@ class _TimerHeap {
List<_Timer> _list;
int _used = 0;
_TimerHeap([int initSize = 7]) : _list = new List<_Timer>(initSize);
_TimerHeap([int initSize = 7])
: _list = List<_Timer>.filled(initSize, _Timer._sentinelTimer);
bool get isEmpty => _used == 0;
@ -61,14 +62,15 @@ class _TimerHeap {
}
void _resize() {
var newList = new List<_Timer>(_list.length * 2 + 1);
var newList =
List<_Timer>.filled(_list.length * 2 + 1, _Timer._sentinelTimer);
newList.setRange(0, _used, _list);
_list = newList;
}
void _bubbleUp(_Timer timer) {
while (!isFirst(timer)) {
Timer parent = _parent(timer);
_Timer parent = _parent(timer);
if (timer._compareTo(parent) < 0) {
_swap(timer, parent);
} else {
@ -104,10 +106,7 @@ class _TimerHeap {
_list[second._indexOrNext] = second;
}
Timer _parent(_Timer timer) => _list[_parentIndex(timer._indexOrNext)];
Timer _leftChild(_Timer timer) => _list[_leftChildIndex(timer._indexOrNext)];
Timer _rightChild(_Timer timer) =>
_list[_rightChildIndex(timer._indexOrNext)];
_Timer _parent(_Timer timer) => _list[_parentIndex(timer._indexOrNext)];
static int _parentIndex(int index) => (index - 1) ~/ 2;
static int _leftChildIndex(int index) => 2 * index + 1;
@ -118,6 +117,10 @@ class _Timer implements Timer {
// Cancels the timer in the event handler.
static const _NO_TIMER = -1;
// A generic null timer object that is used to populate unused slots
// in TimerHeap.
static final _sentinelTimer = _Timer._sentinel();
// We distinguish what kind of message arrived based on the value being sent.
static const _ZERO_EVENT = 1;
static const _TIMEOUT_EVENT = null;
@ -125,21 +128,21 @@ class _Timer implements Timer {
// Timers are ordered by wakeup time. Timers with a timeout value of > 0 do
// end up on the TimerHeap. Timers with a timeout of 0 are queued in a list.
static final _heap = new _TimerHeap();
static _Timer _firstZeroTimer;
static _Timer _lastZeroTimer;
static _Timer? _firstZeroTimer;
static _Timer _lastZeroTimer = _sentinelTimer;
// We use an id to be able to sort timers with the same expiration time.
// ids are recycled after ID_MASK enqueues or when the timer queue is empty.
static const _ID_MASK = 0x1fffffff;
static int _idCount = 0;
static RawReceivePort _receivePort;
static SendPort _sendPort;
static int _scheduledWakeupTime;
static RawReceivePort? _receivePort;
static SendPort? _sendPort;
static int _scheduledWakeupTime = 0;
static bool _handlingCallbacks = false;
Function _callback; // Closure to call when timer fires. null if canceled.
Function? _callback; // Closure to call when timer fires. null if canceled.
int _wakeupTime; // Expiration time.
final int _milliSeconds; // Duration specified at creation.
final bool _repeating; // Indicates periodic timers.
@ -156,11 +159,19 @@ class _Timer implements Timer {
return result;
}
_Timer._sentinel()
: _callback = null,
_wakeupTime = 0,
_milliSeconds = 0,
_repeating = false,
_indexOrNext = null,
_id = -1;
_Timer._internal(
this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
: _id = _nextId();
static Timer _createTimer(
static _Timer _createTimer(
void callback(Timer timer), int milliSeconds, bool repeating) {
// Negative timeouts are treated as if 0 timeout.
if (milliSeconds < 0) {
@ -251,29 +262,33 @@ class _Timer implements Timer {
// Enqueue one message for each zero timer. To be able to distinguish from
// EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
static void _notifyZeroHandler() {
if (_sendPort == null) {
_createTimerHandler();
var port = _sendPort;
if (port == null) {
port = _createTimerHandler();
_sendPort = port;
}
_sendPort.send(_ZERO_EVENT);
port.send(_ZERO_EVENT);
}
// Handle the notification of a zero timer. Make sure to also execute non-zero
// timers with a lower expiration time.
static List _queueFromZeroEvent() {
var pendingTimers = new List();
assert(_firstZeroTimer != null);
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
timer = _heap.removeFirst();
var pendingTimers = [];
final firstTimer = _firstZeroTimer;
if (firstTimer != null) {
// Collect pending timers from the timer heap that have an expiration prior
// to the currently notified zero timer.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(firstTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
}
// Append the first zero timer to the pending timers.
timer = _firstZeroTimer;
_firstZeroTimer = timer._indexOrNext;
timer._indexOrNext = null;
pendingTimers.add(timer);
return pendingTimers;
}
@ -303,15 +318,15 @@ class _Timer implements Timer {
// Only send a message if the requested wakeup time differs from the
// already scheduled wakeup time.
var wakeupTime = _heap.first._wakeupTime;
if ((_scheduledWakeupTime == null) ||
(wakeupTime != _scheduledWakeupTime)) {
if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) {
_scheduleWakeup(wakeupTime);
}
}
static List _queueFromTimeoutEvent() {
var pendingTimers = new List();
if (_firstZeroTimer != null) {
final firstTimer = _firstZeroTimer;
if (firstTimer != null) {
// Collect pending timers from the timer heap that have an expiration
// prior to the next zero timer.
// By definition the first zero timer has been scheduled before the
@ -319,7 +334,7 @@ class _Timer implements Timer {
// timer are expired. The first zero timer will be dispatched when its
// corresponding message is delivered.
var timer;
while (!_heap.isEmpty && (_heap.first._compareTo(_firstZeroTimer) < 0)) {
while (!_heap.isEmpty && (_heap.first._compareTo(firstTimer) < 0)) {
timer = _heap.removeFirst();
pendingTimers.add(timer);
}
@ -386,10 +401,7 @@ class _Timer implements Timer {
timer._enqueue();
}
// Execute pending micro tasks.
var immediateCallback = _removePendingImmediateCallback();
if (immediateCallback != null) {
immediateCallback();
}
_runPendingImmediateCallback();
}
}
} finally {
@ -410,7 +422,7 @@ class _Timer implements Timer {
assert(pendingTimers.length > 0);
} else {
assert(msg == _TIMEOUT_EVENT);
_scheduledWakeupTime = null; // Consumed the last scheduled wakeup now.
_scheduledWakeupTime = 0; // Consumed the last scheduled wakeup now.
pendingTimers = _queueFromTimeoutEvent();
}
_runTimers(pendingTimers);
@ -421,10 +433,12 @@ class _Timer implements Timer {
// Tell the event handler to wake this isolate at a specific time.
static void _scheduleWakeup(int wakeupTime) {
if (_sendPort == null) {
_createTimerHandler();
var port = _sendPort;
if (port == null) {
port = _createTimerHandler();
_sendPort = port;
}
VMLibraryHooks.eventHandlerSendData(null, _sendPort, wakeupTime);
VMLibraryHooks.eventHandlerSendData(null, port, wakeupTime);
_scheduledWakeupTime = wakeupTime;
}
@ -432,25 +446,27 @@ class _Timer implements Timer {
static void _cancelWakeup() {
if (_sendPort != null) {
VMLibraryHooks.eventHandlerSendData(null, _sendPort, _NO_TIMER);
_scheduledWakeupTime = null;
_scheduledWakeupTime = 0;
}
}
// Create a receive port and register a message handler for the timer
// events.
static void _createTimerHandler() {
static SendPort _createTimerHandler() {
assert(_receivePort == null);
assert(_sendPort == null);
_receivePort = new RawReceivePort(_handleMessage);
_sendPort = _receivePort.sendPort;
_scheduledWakeupTime = null;
final port = new RawReceivePort(_handleMessage);
final sendPort = port.sendPort;
_receivePort = port;
_sendPort = sendPort;
_scheduledWakeupTime = 0;
return sendPort;
}
static void _shutdownTimerHandler() {
_receivePort.close();
_receivePort = null;
_sendPort = null;
_scheduledWakeupTime = null;
_scheduledWakeupTime = 0;
_receivePort!.close();
}
// The Timer factory registered with the dart:async library by the embedder.

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.5
part of dart.isolate;
/**

View file

@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.5
/**
* Concurrent programming using _isolates_:
* independent workers that are similar to threads
@ -103,7 +101,7 @@ class Isolate {
* If the isolate is spawned in a paused state, use this capability as
* argument to the [resume] method in order to resume the paused isolate.
*/
final Capability pauseCapability;
final Capability? pauseCapability;
/**
* Capability granting the ability to terminate the isolate.
@ -113,7 +111,7 @@ class Isolate {
* capability of the isolate identified by [controlPort],
* then calls to those methods will have no effect.
*/
final Capability terminateCapability;
final Capability? terminateCapability;
/**
* The name of the [Isolate] displayed for debug purposes.
@ -128,7 +126,7 @@ class Isolate {
* [IsolateMirror].
*/
@Since("2.3")
external String get debugName;
external String? get debugName;
/**
* Create a new [Isolate] object with a restricted set of capabilities.
@ -180,7 +178,7 @@ class Isolate {
* in Dart 2.
*/
@Deprecated('packages/ directory resolution is not supported in Dart 2.')
external static Future<Uri> get packageRoot;
external static Future<Uri?> get packageRoot;
/**
* The package root of the current isolate, if any.
@ -189,7 +187,7 @@ class Isolate {
* setup for package resolution, this getter returns `null`, otherwise it
* returns the package config URI.
*/
external static Future<Uri> get packageConfig;
external static Future<Uri?> get packageConfig;
/**
* Maps a package: URI to a non-package Uri.
@ -198,7 +196,7 @@ class Isolate {
* isolate, then this call returns `null`. Non-package: URIs are
* returned unmodified.
*/
external static Future<Uri> resolvePackageUri(Uri packageUri);
external static Future<Uri?> resolvePackageUri(Uri packageUri);
/**
* Creates and spawns an isolate that shares the same code as the current
@ -250,11 +248,11 @@ class Isolate {
*/
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
@Since("2.3") String debugName});
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
@Since("2.3") String? debugName});
/**
* Creates and spawns an isolate that runs the code from the library with
@ -332,18 +330,18 @@ class Isolate {
Uri uri,
List<String> args,
var message,
{bool paused: false,
SendPort onExit,
SendPort onError,
bool errorsAreFatal,
bool checked,
Map<String, String> environment,
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
@Deprecated('The packages/ dir is not supported in Dart 2')
Uri packageRoot,
Uri packageConfig,
bool automaticPackageResolution: false,
Uri? packageConfig,
bool automaticPackageResolution = false,
@Since("2.3")
String debugName});
String? debugName});
/**
* Requests the isolate to pause.
@ -378,7 +376,7 @@ class Isolate {
* of the isolate identified by [controlPort],
* the pause request is ignored by the receiving isolate.
*/
Capability pause([Capability resumeCapability]) {
Capability pause([Capability? resumeCapability]) {
resumeCapability ??= new Capability();
_pause(resumeCapability);
return resumeCapability;
@ -431,7 +429,7 @@ class Isolate {
/* TODO(lrn): Can we do better? Can the system recognize this message and
* send a reply if the receiving isolate is dead?
*/
external void addOnExitListener(SendPort responsePort, {Object response});
external void addOnExitListener(SendPort responsePort, {Object? response});
/**
* Stops listening for exit messages from the isolate.
@ -498,7 +496,7 @@ class Isolate {
* of the isolate identified by [controlPort],
* the kill request is ignored by the receiving isolate.
*/
external void kill({int priority: beforeNextEvent});
external void kill({int priority = beforeNextEvent});
/**
* Requests that the isolate send [response] on the [responsePort].
@ -524,7 +522,7 @@ class Isolate {
* are completed.
*/
external void ping(SendPort responsePort,
{Object response, int priority: immediate});
{Object response, int priority = immediate});
/**
* Requests that uncaught errors of the isolate are sent back to [port].
@ -579,27 +577,27 @@ class Isolate {
* This stream is based on [addErrorListener] and [removeErrorListener].
*/
Stream get errors {
StreamController controller;
RawReceivePort port;
void handleError(message) {
List listMessage = message;
String errorDescription = listMessage[0];
String stackDescription = listMessage[1];
StreamController controller = StreamController.broadcast(sync: true);
RawReceivePort? port;
void handleError(Object? message) {
var listMessage = message as List<Object?>;
var errorDescription = listMessage[0] as String;
var stackDescription = listMessage[1] as String;
var error = new RemoteError(errorDescription, stackDescription);
controller.addError(error, error.stackTrace);
}
controller = new StreamController.broadcast(
sync: true,
onListen: () {
port = new RawReceivePort(handleError);
this.addErrorListener(port.sendPort);
},
onCancel: () {
this.removeErrorListener(port.sendPort);
port.close();
port = null;
});
controller.onListen = () {
RawReceivePort receivePort = new RawReceivePort(handleError);
port = receivePort;
this.addErrorListener(receivePort.sendPort);
};
controller.onCancel = () {
var listenPort = port!;
port = null;
this.removeErrorListener(listenPort.sendPort);
listenPort.close();
};
return controller.stream;
}
}
@ -632,7 +630,7 @@ abstract class SendPort implements Capability {
* port can receive the message as soon as its isolate's event loop is ready
* to deliver it, independently of what the sending isolate is doing.
*/
void send(var message);
void send(Object? message);
/**
* Tests whether [other] is a [SendPort] pointing to the same
@ -662,7 +660,7 @@ abstract class SendPort implements Capability {
*
* A [ReceivePort] may have many [SendPort]s.
*/
abstract class ReceivePort implements Stream {
abstract class ReceivePort implements Stream<dynamic> {
/**
* Opens a long-lived port for receiving messages.
*
@ -692,8 +690,8 @@ abstract class ReceivePort implements Stream {
* The [onDone] handler will be called when the stream closes.
* The stream closes when [close] is called.
*/
StreamSubscription listen(void onData(var message),
{Function onError, void onDone(), bool cancelOnError});
StreamSubscription<dynamic> listen(void onData(var message)?,
{Function? onError, void onDone()?, bool cancelOnError = true});
/**
* Closes `this`.
@ -719,14 +717,14 @@ abstract class RawReceivePort {
* can not be paused. The data-handler must be set before the first
* event is received.
*/
external factory RawReceivePort([Function handler]);
external factory RawReceivePort([Function? handler]);
/**
* Sets the handler that is invoked for every incoming message.
*
* The handler is invoked in the root-zone ([Zone.root]).
*/
void set handler(Function newHandler);
void set handler(Function? newHandler);
/**
* Closes the port.

View file

@ -119,10 +119,10 @@
]
},
"isolate": {
"uri": "../../sdk/lib/isolate/isolate.dart",
"uri": "isolate/isolate.dart",
"patches": [
"../../sdk/lib/_internal/vm/lib/isolate_patch.dart",
"../../sdk/lib/_internal/vm/lib/timer_impl.dart"
"_internal/vm/lib/isolate_patch.dart",
"_internal/vm/lib/timer_impl.dart"
]
},
"math": {
@ -437,8 +437,8 @@
"supported": false
},
"isolate": {
"uri": "isolate/isolate.dart",
"patches": "_internal/js_dev_runtime/patch/isolate_patch.dart",
"uri": "../../sdk/lib/isolate/isolate.dart",
"patches": "../../sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart",
"supported": false
},
"mirrors": {

View file

@ -121,10 +121,10 @@ vm:
- "../../sdk/lib/_internal/vm/bin/sync_socket_patch.dart"
isolate:
uri: "../../sdk/lib/isolate/isolate.dart"
uri: "isolate/isolate.dart"
patches:
- "../../sdk/lib/_internal/vm/lib/isolate_patch.dart"
- "../../sdk/lib/_internal/vm/lib/timer_impl.dart"
- "_internal/vm/lib/isolate_patch.dart"
- "_internal/vm/lib/timer_impl.dart"
math:
uri: "../../sdk/lib/math/math.dart"
@ -430,8 +430,8 @@ dartdevc:
supported: false
isolate:
uri: "isolate/isolate.dart"
patches: "_internal/js_dev_runtime/patch/isolate_patch.dart"
uri: "../../sdk/lib/isolate/isolate.dart"
patches: "../../sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart"
supported: false
mirrors: