[dart2wasm] Implement micro-task scheduling

Use JS `setTimeout` for events and `queueMicrotask` for micro-tasks.

dart2js event loop implementation is copied in `run_wasm.js` to be able
to use `clearTimeout`, `setInterval`, `clearInterval`, and
`scheduleMicrotask`, which are not not available in d8, and `setTimeout`
in d8 does not wait before calling a callback.

New passing d8 tests:

- co19/LibTest/async/Future/Future.delayed_A01_t02
- co19/LibTest/async/Stream/Stream.periodic_A01_t01
- co19/LibTest/async/Stream/Stream.periodic_all_t01
- co19/LibTest/async/Stream/Stream.periodic_all_t02
- co19/LibTest/async/Stream/timeout_A04_t01
- co19/LibTest/async/StreamController/stream_all_A01_t01
- co19/LibTest/async/StreamController/stream_all_A01_t02
- co19/LibTest/async/StreamController/stream_all_A02_t01
- co19/LibTest/async/StreamController/stream_all_A02_t02
- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A01_t01
- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A01_t02
- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A02_t01
- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A02_t02
- co19/LibTest/async/Timer/Timer.periodic_A01_t01
- co19/LibTest/async/Timer/Timer_A01_t01
- co19/LibTest/core/Stopwatch/elapsedTicks_A01_t01
- language/async/call_test
- language/regress/regress21795_test
- lib/async/multiple_timer_test
- lib/async/periodic_timer2_test
- lib/async/periodic_timer3_test
- lib/async/periodic_timer4_test
- lib/async/schedule_microtask3_test
- lib/async/schedule_microtask_test
- lib/async/stream_timeout_test
- lib/async/timer_isActive_test
- lib/async/timer_repeat_test
- lib/async/timer_test

New passing Chrome tests:

- co19/LibTest/async/Stream/timeout_A04_t01
- language/async/call_test
- lib/async/schedule_microtask3_test

Tests below fail because of async* desugaring issues and will be fixed
separately:

- language/async_star/no_cancel2_test
- language/async_star/no_cancel_test

Tests below fail because of an existing issue (#29615):

- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A01_t03
- co19/LibTest/async/StreamController/StreamController.broadcast_Stream_all_A02_t03
- co19/LibTest/async/StreamController/stream_all_A02_t03

Fixes #51599.

Change-Id: Ib313e99bf3b3cb3bebeddc9e47dc77425ef94481
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/305201
Reviewed-by: Aske Simon Christensen <askesc@google.com>
Reviewed-by: Joshua Litt <joshualitt@google.com>
Commit-Queue: Ömer Ağacan <omersa@google.com>
This commit is contained in:
Ömer Sinan Ağacan 2023-06-19 10:21:35 +00:00 committed by Commit Queue
parent 6dff4755e6
commit 87a3de41b1
5 changed files with 406 additions and 48 deletions

View file

@ -28,6 +28,285 @@ const jsRuntimeArg = 0;
const wasmArg = 1;
const ffiArg = 2;
// d8's `setTimeout` doesn't work as expected (it doesn't wait before calling
// the callback), and d8 also doesn't have `setInterval` and `queueMicrotask`.
// So we define our own event loop with these functions.
//
// The code below is copied form dart2js, with some modifications:
// sdk/lib/_internal/js_runtime/lib/preambles/d8.js
(function(self, scriptArguments) {
// Using strict mode to avoid accidentally defining global variables.
"use strict"; // Should be first statement of this function.
// Task queue as cyclic list queue.
var taskQueue = new Array(8); // Length is power of 2.
var head = 0;
var tail = 0;
var mask = taskQueue.length - 1;
function addTask(elem) {
taskQueue[head] = elem;
head = (head + 1) & mask;
if (head == tail) _growTaskQueue();
}
function removeTask() {
if (head == tail) return;
var result = taskQueue[tail];
taskQueue[tail] = undefined;
tail = (tail + 1) & mask;
return result;
}
function _growTaskQueue() {
// head == tail.
var length = taskQueue.length;
var split = head;
taskQueue.length = length * 2;
if (split * 2 < length) { // split < length / 2
for (var i = 0; i < split; i++) {
taskQueue[length + i] = taskQueue[i];
taskQueue[i] = undefined;
}
head += length;
} else {
for (var i = split; i < length; i++) {
taskQueue[length + i] = taskQueue[i];
taskQueue[i] = undefined;
}
tail += length;
}
mask = taskQueue.length - 1;
}
// Mapping from timer id to timer function.
// The timer id is written on the function as .$timerId.
// That field is cleared when the timer is cancelled, but it is not returned
// from the queue until its time comes.
var timerIds = {};
var timerIdCounter = 1; // Counter used to assign ids.
// Zero-timer queue as simple array queue using push/shift.
var zeroTimerQueue = [];
function addTimer(f, ms) {
var id = timerIdCounter++;
// A callback can be scheduled at most once.
console.assert(f.$timerId === undefined);
f.$timerId = id;
timerIds[id] = f;
if (ms == 0 && !isNextTimerDue()) {
zeroTimerQueue.push(f);
} else {
addDelayedTimer(f, ms);
}
return id;
}
function nextZeroTimer() {
while (zeroTimerQueue.length > 0) {
var action = zeroTimerQueue.shift();
if (action.$timerId !== undefined) return action;
}
}
function nextEvent() {
var action = removeTask();
if (action) {
return action;
}
do {
action = nextZeroTimer();
if (action) break;
var nextList = nextDelayedTimerQueue();
if (!nextList) {
return;
}
var newTime = nextList.shift();
advanceTimeTo(newTime);
zeroTimerQueue = nextList;
} while (true)
var id = action.$timerId;
clearTimerId(action, id);
return action;
}
// Mocking time.
var timeOffset = 0;
var now = function() {
// Install the mock Date object only once.
// Following calls to "now" will just use the new (mocked) Date.now
// method directly.
installMockDate();
now = Date.now;
return Date.now();
};
var originalDate = Date;
var originalNow = originalDate.now;
function advanceTimeTo(time) {
var now = originalNow();
if (timeOffset < time - now) {
timeOffset = time - now;
}
}
function installMockDate() {
var NewDate = function Date(Y, M, D, h, m, s, ms) {
if (this instanceof Date) {
// Assume a construct call.
switch (arguments.length) {
case 0: return new originalDate(originalNow() + timeOffset);
case 1: return new originalDate(Y);
case 2: return new originalDate(Y, M);
case 3: return new originalDate(Y, M, D);
case 4: return new originalDate(Y, M, D, h);
case 5: return new originalDate(Y, M, D, h, m);
case 6: return new originalDate(Y, M, D, h, m, s);
default: return new originalDate(Y, M, D, h, m, s, ms);
}
}
return new originalDate(originalNow() + timeOffset).toString();
};
NewDate.UTC = originalDate.UTC;
NewDate.parse = originalDate.parse;
NewDate.now = function now() { return originalNow() + timeOffset; };
NewDate.prototype = originalDate.prototype;
originalDate.prototype.constructor = NewDate;
Date = NewDate;
}
// Heap priority queue with key index.
// Each entry is list of [timeout, callback1 ... callbackn].
var timerHeap = [];
var timerIndex = {};
function addDelayedTimer(f, ms) {
var timeout = now() + ms;
var timerList = timerIndex[timeout];
if (timerList == null) {
timerList = [timeout, f];
timerIndex[timeout] = timerList;
var index = timerHeap.length;
timerHeap.length += 1;
bubbleUp(index, timeout, timerList);
} else {
timerList.push(f);
}
}
function isNextTimerDue() {
if (timerHeap.length == 0) return false;
var head = timerHeap[0];
return head[0] < originalNow() + timeOffset;
}
function nextDelayedTimerQueue() {
if (timerHeap.length == 0) return null;
var result = timerHeap[0];
var last = timerHeap.pop();
if (timerHeap.length > 0) {
bubbleDown(0, last[0], last);
}
return result;
}
function bubbleUp(index, key, value) {
while (index != 0) {
var parentIndex = (index - 1) >> 1;
var parent = timerHeap[parentIndex];
var parentKey = parent[0];
if (key > parentKey) break;
timerHeap[index] = parent;
index = parentIndex;
}
timerHeap[index] = value;
}
function bubbleDown(index, key, value) {
while (true) {
var leftChildIndex = index * 2 + 1;
if (leftChildIndex >= timerHeap.length) break;
var minChildIndex = leftChildIndex;
var minChild = timerHeap[leftChildIndex];
var minChildKey = minChild[0];
var rightChildIndex = leftChildIndex + 1;
if (rightChildIndex < timerHeap.length) {
var rightChild = timerHeap[rightChildIndex];
var rightKey = rightChild[0];
if (rightKey < minChildKey) {
minChildIndex = rightChildIndex;
minChild = rightChild;
minChildKey = rightKey;
}
}
if (minChildKey > key) break;
timerHeap[index] = minChild;
index = minChildIndex;
}
timerHeap[index] = value;
}
function addInterval(f, ms) {
var id = timerIdCounter++;
function repeat() {
// Reactivate with the same id.
repeat.$timerId = id;
timerIds[id] = repeat;
addDelayedTimer(repeat, ms);
f();
}
repeat.$timerId = id;
timerIds[id] = repeat;
addDelayedTimer(repeat, ms);
return id;
}
function cancelTimer(id) {
var f = timerIds[id];
if (f == null) return;
clearTimerId(f, id);
}
function clearTimerId(f, id) {
f.$timerId = undefined;
delete timerIds[id];
}
async function eventLoop(action) {
while (action) {
try {
await action();
} catch (e) {
if (typeof onerror == "function") {
onerror(e, null, -1);
} else {
throw e;
}
}
action = nextEvent();
}
}
// Global properties. "self" refers to the global object, so adding a
// property to "self" defines a global variable.
self.self = self;
self.dartMainRunner = function(main, ignored_args) {
// Initialize.
var action = async function() { await main(scriptArguments, null); }
eventLoop(action);
};
self.setTimeout = addTimer;
self.clearTimeout = cancelTimer;
self.setInterval = addInterval;
self.clearInterval = cancelTimer;
self.queueMicrotask = addTask;
// Signals `Stopwatch._initTicker` to use `Date.now` to get ticks instead of
// `performance.now`, as it's not available in d8.
self.dartUseDateNowForTicks = true;
})(this, []);
// We would like this itself to be a ES module rather than a script, but
// unfortunately d8 does not return a failed error code if an unhandled
// exception occurs asynchronously in an ES module.
@ -71,4 +350,5 @@ const main = async () => {
// alive even after `main` returns.
await dart2wasm.invoke(dartInstance, ...dartArgs);
};
main();
dartMainRunner(main, []);

View file

@ -1,4 +1,6 @@
import 'dart:_internal' show scheduleCallback, patch, _AsyncCompleter;
import 'dart:_internal' show _AsyncCompleter, patch;
import 'dart:_js_helper' show JS;
import 'dart:_wasm';

View file

@ -182,13 +182,5 @@ List<String> _makeStringList() => <String>[];
@pragma("wasm:export", "\$listAdd")
void _listAdd(List<dynamic> list, dynamic item) => list.add(item);
// Schedule a callback from JS via setTimeout.
void scheduleCallback(double millis, dynamic Function() callback) {
JS<void>(r"""(ms, c) =>
setTimeout(
() => dartInstance.exports.$invokeCallback(c),ms)""", millis,
callback);
}
String jsonEncode(String object) => JS<String>(
"s => stringToDartString(JSON.stringify(stringFromDartString(s)))", object);

View file

@ -4,21 +4,43 @@
part of "core_patch.dart";
double _performanceNow() => JS<double>("() => performance.now()");
@patch
class Stopwatch {
static int Function() _timerTicks = () {
return JS<double>("() => Date.now()").toInt();
};
@patch
static int _initTicker() {
if (JS<bool>("() => typeof dartUseDateNowForTicks !== \"undefined\"")) {
// Millisecond precision, as int.
return 1000;
} else {
// Microsecond precision as double. Convert to int without losing
// precision.
_timerTicks = () {
return JS<double>("() => 1000 * performance.now()").toInt();
};
return 1000000;
}
}
@patch
static int _now() => _performanceNow().toInt();
static int _now() => _timerTicks();
@patch
int get elapsedMicroseconds => 1000 * elapsedTicks;
int get elapsedMicroseconds {
int ticks = elapsedTicks;
if (_frequency == 1000000) return ticks;
assert(_frequency == 1000);
return ticks * 1000;
}
@patch
int get elapsedMilliseconds => elapsedTicks;
int get elapsedMilliseconds {
int ticks = elapsedTicks;
if (_frequency == 1000) return ticks;
assert(_frequency == 1000000);
return ticks ~/ 1000;
}
}

View file

@ -6,6 +6,44 @@ part of "async_patch.dart";
// Implementation of `Timer` and `scheduleMicrotask` via the JS event loop.
/// JS event loop and timer functions, in a private class to avoid leaking the
/// definitions to users.
class _JSEventLoop {
/// Schedule a callback from JS via `setTimeout`.
static int _setTimeout(double ms, dynamic Function() callback) => JS<double>(
r"""(ms, c) =>
setTimeout(() => dartInstance.exports.$invokeCallback(c),ms)""",
ms,
callback)
.toInt();
/// Cancel a callback scheduled with `setTimeout`.
static void _clearTimeout(int handle) =>
JS<void>(r"""(handle) => clearTimeout(handle)""", handle.toDouble());
/// Schedule a periodic callback from JS via `setInterval`.
static int _setInterval(double ms, dynamic Function() callback) => JS<double>(
r"""(ms, c) =>
setInterval(() => dartInstance.exports.$invokeCallback(c), ms)""",
ms,
callback)
.toInt();
/// Cancel a callback scheduled with `setInterval`.
static void _clearInterval(int handle) =>
JS<void>(r"""(handle) => clearInterval(handle)""", handle.toDouble());
/// Schedule a callback from JS via `queueMicrotask`.
static void _queueMicrotask(dynamic Function() callback) => JS<void>(
r"""(c) =>
queueMicrotask(() => dartInstance.exports.$invokeCallback(c))""",
callback);
/// JS `Date.now()`, returns the number of milliseconds elapsed since the
/// epoch.
static int _dateNow() => JS<double>('() => Date.now()').toInt();
}
@patch
class Timer {
@patch
@ -21,53 +59,77 @@ class Timer {
}
abstract class _Timer implements Timer {
final double milliseconds;
bool isActive;
int tick;
final int _milliseconds;
int _tick;
int? _handle;
@override
int get tick => _tick;
@override
bool get isActive => _handle != null;
_Timer(Duration duration)
: milliseconds = duration.inMilliseconds.toDouble(),
isActive = true,
tick = 0 {
: _milliseconds = duration.inMilliseconds,
_tick = 0,
_handle = null {
_schedule();
}
void _schedule() {
scheduleCallback(milliseconds, () {
if (isActive) {
tick++;
_run();
}
});
}
void _run();
@override
void cancel() {
isActive = false;
}
void _schedule();
}
class _OneShotTimer extends _Timer {
final void Function() callback;
final void Function() _callback;
_OneShotTimer(Duration duration, this.callback) : super(duration);
_OneShotTimer(Duration duration, this._callback) : super(duration);
void _run() {
isActive = false;
callback();
@override
void _schedule() {
_handle = _JSEventLoop._setTimeout(_milliseconds.toDouble(), () {
_tick++;
_handle = null;
_callback();
});
}
@override
void cancel() {
final int? handle = _handle;
if (handle != null) {
_JSEventLoop._clearTimeout(handle);
_handle = null;
}
}
}
class _PeriodicTimer extends _Timer {
final void Function(Timer) callback;
final void Function(Timer) _callback;
_PeriodicTimer(Duration duration, this.callback) : super(duration);
_PeriodicTimer(Duration duration, this._callback) : super(duration);
void _run() {
_schedule();
callback(this);
@override
void _schedule() {
final int start = _JSEventLoop._dateNow();
_handle = _JSEventLoop._setInterval(_milliseconds.toDouble(), () {
_tick++;
if (_milliseconds > 0) {
final int duration = _JSEventLoop._dateNow() - start;
if (duration > _tick * _milliseconds) {
_tick = duration ~/ _milliseconds;
}
}
_callback(this);
});
}
@override
void cancel() {
final int? handle = _handle;
if (handle != null) {
_JSEventLoop._clearInterval(handle);
_handle = null;
}
}
}
@ -75,6 +137,6 @@ class _PeriodicTimer extends _Timer {
class _AsyncRun {
@patch
static void _scheduleImmediate(void callback()) {
scheduleCallback(0, callback);
_JSEventLoop._queueMicrotask(callback);
}
}