perf: improve async op santizer speed and accuracy (#20501)

This commit improves async op sanitizer speed by only delaying metrics
collection if there are pending ops. This
results in a speedup of around 30% for small CPU bound unit tests.

It performs this check and possible delay on every collection now,
fixing an issue with parent test leaks into steps.
This commit is contained in:
Luca Casonato 2023-09-16 07:48:31 +02:00 committed by GitHub
parent bf07604113
commit 430b63c2c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 185 additions and 37 deletions

View file

@ -40,25 +40,39 @@ let hasSetOpSanitizerDelayMacrotask = false;
// that resolves when it's (probably) fine to run the op sanitizer.
//
// This is implemented by adding a macrotask callback that runs after the
// timer macrotasks, so we can guarantee that a currently running interval
// will have an associated op. An additional `setTimeout` of 0 is needed
// before that, though, in order to give time for worker message ops to finish
// (since timeouts of 0 don't queue tasks in the timer queue immediately).
function opSanitizerDelay() {
// all ready async ops resolve, and the timer macrotask. Using just a macrotask
// callback without delaying is sufficient, because when the macrotask callback
// runs after async op dispatch, we know that all async ops that can currently
// return `Poll::Ready` have done so, and have been dispatched to JS.
//
// Worker ops are an exception to this, because there is no way for the user to
// await shutdown of the worker from the thread calling `worker.terminate()`.
// Because of this, we give extra leeway for worker ops to complete, by waiting
// for a whole millisecond if there are pending worker ops.
function opSanitizerDelay(hasPendingWorkerOps) {
if (!hasSetOpSanitizerDelayMacrotask) {
core.setMacrotaskCallback(handleOpSanitizerDelayMacrotask);
hasSetOpSanitizerDelayMacrotask = true;
}
return new Promise((resolve) => {
const p = new Promise((resolve) => {
// Schedule an async op to complete immediately to ensure the macrotask is
// run. We rely on the fact that enqueueing the resolver callback during the
// timeout callback will mean that the resolver gets called in the same
// event loop tick as the timeout callback.
setTimeout(() => {
ArrayPrototypePush(opSanitizerDelayResolveQueue, resolve);
}, 1);
}, hasPendingWorkerOps ? 1 : 0);
});
return p;
}
function handleOpSanitizerDelayMacrotask() {
ArrayPrototypeShift(opSanitizerDelayResolveQueue)?.();
return opSanitizerDelayResolveQueue.length === 0;
const resolve = ArrayPrototypeShift(opSanitizerDelayResolveQueue);
if (resolve) {
resolve();
return opSanitizerDelayResolveQueue.length === 0;
}
return undefined; // we performed no work, so can skip microtasks checkpoint
}
// An async operation to $0 was started in this test, but never completed. This is often caused by not $1.
@ -126,7 +140,8 @@ const OP_DETAILS = {
"op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"],
"op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"],
"op_utime_async": ["change file timestamps", "awaiting the result of a `Deno.utime` call"],
"op_worker_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_ctrl": ["receive a message from a web worker", "terminating a `Worker`"],
"op_ws_close": ["close a WebSocket", "awaiting until the `close` event is emitted on a `WebSocket`, or the `WebSocketStream#closed` promise resolves"],
"op_ws_create": ["create a WebSocket", "awaiting until the `open` event is emitted on a `WebSocket`, or the result of a `WebSocketStream#connection` promise"],
"op_ws_next_event": ["receive the next message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
@ -136,6 +151,28 @@ const OP_DETAILS = {
"op_ws_send_pong": ["send a message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
};
function collectReliableOpMetrics() {
let metrics = core.metrics();
if (metrics.opsDispatched > metrics.opsCompleted) {
// If there are still async ops pending, we drain the event loop to the
// point where all ops that can return `Poll::Ready` have done so, to ensure
// that any ops are ready because of user cleanup code are completed.
const hasPendingWorkerOps = metrics.ops.op_host_recv_message && (
metrics.ops.op_host_recv_message.opsDispatched >
metrics.ops.op_host_recv_message.opsCompleted ||
metrics.ops.op_host_recv_ctrl.opsDispatched >
metrics.ops.op_host_recv_ctrl.opsCompleted
);
return opSanitizerDelay(hasPendingWorkerOps).then(() => {
metrics = core.metrics();
const traces = new Map(core.opCallTraces);
return { metrics, traces };
});
}
const traces = new Map(core.opCallTraces);
return { metrics, traces };
}
// Wrap test function in additional assertion that makes sure
// the test case does not leak async "ops" - ie. number of async
// completed ops after the test is the same as number of dispatched
@ -144,19 +181,26 @@ const OP_DETAILS = {
function assertOps(fn) {
/** @param desc {TestDescription | TestStepDescription} */
return async function asyncOpSanitizer(desc) {
const pre = core.metrics();
const preTraces = new Map(core.opCallTraces);
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
const { metrics: pre, traces: preTraces } = metrics;
let post;
let postTraces;
try {
const innerResult = await fn(desc);
if (innerResult) return innerResult;
} finally {
// Defer until next event loop turn - that way timeouts and intervals
// cleared can actually be removed from resource table, otherwise
// false positives may occur (https://github.com/denoland/deno/issues/4591)
await opSanitizerDelay();
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
({ metrics: post, traces: postTraces } = metrics);
}
const post = core.metrics();
const postTraces = new Map(core.opCallTraces);
// We're checking diff because one might spawn HTTP server in the background
// that will be a pending async op before test starts.
@ -232,6 +276,7 @@ function assertOps(fn) {
ArrayPrototypePush(details, message);
}
}
return { failed: { leakedOps: [details, core.isOpCallTracingEnabled()] } };
};
}

View file

@ -0,0 +1,10 @@
Check [WILDCARD]/cli/tests/testdata/test/ops_sanitizer_step_leak.ts
running 1 test from ./cli/tests/testdata/test/ops_sanitizer_step_leak.ts
timeout ...
step ... ok [WILDCARD]
------- output -------
done
----- output end -----
timeout ... ok [WILDCARD]
ok | 1 passed (1 step) | 0 failed [WILDCARD]

View file

@ -0,0 +1,10 @@
Deno.test("timeout", async (t) => {
const timer = setTimeout(() => {
console.log("timeout");
}, 10000);
clearTimeout(timer);
await t.step("step", async () => {
await new Promise<void>((resolve) => setTimeout(() => resolve(), 10));
});
console.log("done");
});

View file

@ -1752,7 +1752,8 @@ Deno.test(
// if transfer-encoding is sent, content-length is ignored
// even if it has an invalid value (content-length > totalLength)
const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });
const res = await response.arrayBuffer();
const buf = new TextEncoder().encode(data);
@ -1760,6 +1761,7 @@ Deno.test(
assertEquals(new Uint8Array(res), buf);
listener.close();
client.close();
},
);
@ -1781,21 +1783,23 @@ Deno.test(
// It should fail if multiple content-length headers with different values are sent
const listener = invalidServer(addr, body);
const client = Deno.createHttpClient({});
await assertRejects(
async () => {
await fetch(`http://${addr}/`);
await fetch(`http://${addr}/`, { client });
},
TypeError,
"invalid content-length parsed",
);
listener.close();
client.close();
},
);
Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength2(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
@ -1807,7 +1811,8 @@ Deno.test(
);
const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });
// If content-length < totalLength, a maximum of content-length bytes
// should be returned.
@ -1817,12 +1822,13 @@ Deno.test(
assertEquals(new Uint8Array(res), buf.subarray(contentLength));
listener.close();
client.close();
},
);
Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength3(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
@ -1834,7 +1840,8 @@ Deno.test(
);
const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });
// If content-length > totalLength, a maximum of content-length bytes
// should be returned.
await assertRejects(
@ -1846,6 +1853,7 @@ Deno.test(
);
listener.close();
client.close();
},
);
@ -1935,10 +1943,12 @@ Deno.test(
},
});
const client = Deno.createHttpClient({});
const err = await assertRejects(() =>
fetch(`http://localhost:${listenPort}/`, {
body: stream,
method: "POST",
client,
})
);
@ -1948,6 +1958,7 @@ Deno.test(
assertEquals(err.cause.message, "foo");
await server;
client.close();
},
);

View file

@ -198,12 +198,14 @@ Deno.test(
await respondWith(new Response(stream.readable));
})();
const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { client });
const respBody = await resp.text();
assertEquals("hello world", respBody);
await promise;
httpConn!.close();
listener.close();
client.close();
},
);
@ -216,8 +218,8 @@ Deno.test(
writer.write(new TextEncoder().encode("world"));
writer.close();
const listener = Deno.listen({ port: listenPort });
const promise = (async () => {
const listener = Deno.listen({ port: listenPort });
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const evt = await httpConn.nextRequest();
@ -235,14 +237,17 @@ Deno.test(
listener.close();
})();
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
body: stream.readable,
method: "POST",
headers: { "connection": "close" },
client,
});
await resp.arrayBuffer();
await promise;
client.close();
},
);
@ -375,9 +380,11 @@ Deno.test(
await respondWith(new Response("response"));
})();
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
method: "POST",
body: "request",
client,
});
const respBody = await resp.text();
assertEquals("response", respBody);
@ -385,6 +392,7 @@ Deno.test(
httpConn!.close();
listener.close();
client.close();
},
);
@ -427,9 +435,11 @@ Deno.test(
listener.close();
})();
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
await resp.body!.cancel();
await promise;
client.close();
},
);
@ -788,7 +798,11 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
socket.send(m.data);
socket.close(1001);
};
const close = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
await respondWith(response);
await close;
})();
const def = deferred();
@ -1228,11 +1242,15 @@ Deno.test(
async function client() {
const socket = new WebSocket(`ws://${hostname}:${port}/`);
socket.onopen = () => socket.send("bla bla");
const closed = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
const { data } = await new Promise<MessageEvent<string>>((res) =>
socket.onmessage = res
);
assertStrictEquals(data, "bla bla");
socket.close();
await closed;
}
await Promise.all([server(), client()]);

View file

@ -901,7 +901,7 @@ Deno.test(
);
Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
(async () => {
const server = (async () => {
const listener = Deno.listen({ hostname: "127.0.0.1", port: listenPort });
const conn = await listener.accept();
await conn.readable.pipeTo(conn.writable);
@ -920,6 +920,7 @@ Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
assert(!done);
assertEquals(decoder.decode(value), "Hello World");
await reader.cancel();
await server;
});
Deno.test(
@ -973,7 +974,7 @@ Deno.test(
Deno.test(
{ permissions: { read: true, run: true } },
async function netListenUnref() {
async function netListenUnref2() {
const [statusCode, _output] = await execCode(`
async function main() {
const listener = Deno.listen({ port: ${listenPort} });

View file

@ -965,7 +965,8 @@ function createStreamTest(count: number, delay: number, action: string) {
try {
await listeningPromise;
const resp = await fetch(`http://127.0.0.1:${servePort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client });
if (action == "Throw") {
await assertRejects(async () => {
await resp.text();
@ -980,6 +981,7 @@ function createStreamTest(count: number, delay: number, action: string) {
assertEquals(text, expected);
}
client.close();
} finally {
ac.abort();
await server.shutdown();
@ -1096,12 +1098,14 @@ Deno.test(
});
await listeningPromise;
const resp = await fetch(`http://127.0.0.1:${servePort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client });
const respBody = await resp.text();
assertEquals("", respBody);
ac.abort();
await server.finished;
client.close();
},
);
@ -1138,12 +1142,14 @@ Deno.test(
});
await listeningPromise;
const resp = await fetch(`http://127.0.0.1:${servePort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client });
// Incorrectly implemented reader ReadableStream should reject.
assertStringIncludes(await resp.text(), "Failed to execute 'enqueue'");
await errorPromise;
ac.abort();
await server.finished;
client.close();
},
);
@ -1187,6 +1193,7 @@ Deno.test(
Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
const ac = new AbortController();
const listeningPromise = deferred();
const done = deferred();
const server = Deno.serve({
handler: (request) => {
const {
@ -1201,6 +1208,7 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
socket.send(m.data);
socket.close(1001);
};
socket.onclose = () => done.resolve();
return response;
},
port: servePort,
@ -1221,6 +1229,7 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
ws.onopen = () => ws.send("foo");
await def;
await done;
ac.abort();
await server.finished;
});
@ -1308,6 +1317,7 @@ Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketUpgradeTwice() {
const ac = new AbortController();
const done = deferred();
const listeningPromise = deferred();
const server = Deno.serve({
handler: (request) => {
@ -1330,6 +1340,7 @@ Deno.test(
socket.send(m.data);
socket.close(1001);
};
socket.onclose = () => done.resolve();
return response;
},
port: servePort,
@ -1350,6 +1361,7 @@ Deno.test(
ws.onopen = () => ws.send("foo");
await def;
await done;
ac.abort();
await server.finished;
},
@ -1359,6 +1371,7 @@ Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketCloseFast() {
const ac = new AbortController();
const done = deferred();
const listeningPromise = deferred();
const server = Deno.serve({
handler: (request) => {
@ -1367,6 +1380,7 @@ Deno.test(
socket,
} = Deno.upgradeWebSocket(request);
socket.onopen = () => socket.close();
socket.onclose = () => done.resolve();
return response;
},
port: servePort,
@ -1385,6 +1399,7 @@ Deno.test(
ws.onclose = () => def.resolve();
await def;
await done;
ac.abort();
await server.finished;
},
@ -1394,6 +1409,7 @@ Deno.test(
{ permissions: { net: true } },
async function httpServerWebSocketCanAccessRequest() {
const ac = new AbortController();
const done = deferred();
const listeningPromise = deferred();
const server = Deno.serve({
handler: (request) => {
@ -1409,6 +1425,7 @@ Deno.test(
socket.send(request.url.toString());
socket.close(1001);
};
socket.onclose = () => done.resolve();
return response;
},
port: servePort,
@ -1430,6 +1447,7 @@ Deno.test(
ws.onopen = () => ws.send("foo");
await def;
await done;
ac.abort();
await server.finished;
},
@ -1588,9 +1606,11 @@ Deno.test(
);
const { readable, writable } = new TransformStream();
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
method: "POST",
body: readable,
client,
});
await promise;
@ -1598,6 +1618,7 @@ Deno.test(
await testDuplex(resp.body.getReader(), writable.getWriter());
ac.abort();
await server.finished;
client.close();
},
);
@ -1632,9 +1653,11 @@ Deno.test(
);
const { readable, writable } = new TransformStream();
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
method: "POST",
body: readable,
client,
});
await promise;
@ -1642,6 +1665,7 @@ Deno.test(
await testDuplex(resp.body.getReader(), writable.getWriter());
ac.abort();
await server.finished;
client.close();
},
);
@ -2600,9 +2624,12 @@ for (const testCase of compressionTestCases) {
});
try {
await listeningPromise;
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
headers: testCase.in as HeadersInit,
client,
});
client.close();
await promise;
const body = await resp.arrayBuffer();
if (testCase.expect == null) {
@ -3192,14 +3219,16 @@ Deno.test(
let count = 0;
const server = Deno.serve({
async onListen({ port }: { port: number }) {
const res1 = await fetch(`http://localhost:${port}/`);
const client = Deno.createHttpClient({});
const res1 = await fetch(`http://localhost:${port}/`, { client });
assertEquals(await res1.text(), "hello world 1");
const res2 = await fetch(`http://localhost:${port}/`);
const res2 = await fetch(`http://localhost:${port}/`, { client });
assertEquals(await res2.text(), "hello world 2");
promise.resolve();
ac.abort();
client.close();
},
signal: ac.signal,
}, () => {
@ -3252,13 +3281,16 @@ Deno.test(
try {
const port = await listeningPromise;
const client = Deno.createHttpClient({});
const resp = await fetch(`http://localhost:${port}/`, {
headers: { connection: "close" },
method: "POST",
body: '{"sus":true}',
client,
});
const text = await resp.text();
assertEquals(text, "ok");
client.close();
} finally {
ac.abort();
await server.finished;

View file

@ -1,6 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
import { assertEquals, Deferred, deferred } from "./test_util.ts";
import { assertEquals, Deferred, deferred, fail } from "./test_util.ts";
const {
core,

View file

@ -497,9 +497,15 @@ Deno.test({
);
const childProcess = spawn(Deno.execPath(), ["run", script]);
const p = withTimeout();
const pStdout = withTimeout();
const pStderr = withTimeout();
childProcess.on("exit", () => p.resolve());
childProcess.stdout.on("close", () => pStdout.resolve());
childProcess.stderr.on("close", () => pStderr.resolve());
childProcess.kill("SIGKILL");
await p;
await pStdout;
await pStderr;
assert(childProcess.killed);
assertEquals(childProcess.signalCode, "SIGKILL");
assertExists(childProcess.exitCode);
@ -633,9 +639,15 @@ Deno.test({
// Spawn an infinite cat
const cp = spawn("cat", ["-"]);
const p = withTimeout();
const pStdout = withTimeout();
const pStderr = withTimeout();
cp.on("exit", () => p.resolve());
cp.stdout.on("close", () => pStdout.resolve());
cp.stderr.on("close", () => pStderr.resolve());
cp.kill("SIGIOT");
await p;
await pStdout;
await pStderr;
assert(cp.killed);
assertEquals(cp.signalCode, "SIGIOT");
},

View file

@ -143,12 +143,17 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async ()
}
const finished = deferred();
const serverSocketsClosed: Deferred<undefined>[] = [];
const server = net.createServer();
server.on("connection", (socket) => {
assert(socket !== undefined);
const i = serverSocketsClosed.push(deferred());
socket.on("data", (data) => {
socket.write(new TextDecoder().decode(data));
});
socket.on("close", () => {
serverSocketsClosed[i - 1].resolve();
});
});
const sockets: TestSocket[] = [];
@ -190,6 +195,7 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async ()
});
await finished;
await Promise.all(serverSocketsClosed);
for (let i = 0; i < socketCount; i++) {
assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]);

View file

@ -15,8 +15,9 @@ import DOMException from "ext:deno_web/01_dom_exception.js";
const primordials = globalThis.__bootstrap.primordials;
const {
ArrayPrototypeIndexOf,
ArrayPrototypeSplice,
ArrayPrototypePush,
ArrayPrototypeSplice,
PromisePrototypeThen,
Symbol,
Uint8Array,
} = primordials;
@ -70,7 +71,7 @@ function dispatch(source, name, data) {
// for that reason: it lets promises make forward progress but can
// still starve other parts of the event loop.
function defer(go) {
setTimeout(go, 1);
PromisePrototypeThen(core.ops.op_void_async_deferred(), () => go());
}
class BroadcastChannel extends EventTarget {

View file

@ -284,6 +284,9 @@ async function mainFetch(req, recursive, terminator) {
cause: requestSendError,
});
}
if (requestBodyRid !== null) {
core.tryClose(requestBodyRid);
}
throw err;
} finally {
if (cancelHandleRid !== null) {