From 430b63c2c4d6567a77e77980058ef13b45a9f30e Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Sat, 16 Sep 2023 07:48:31 +0200 Subject: [PATCH] perf: improve async op santizer speed and accuracy (#20501) This commit improves async op sanitizer speed by only delaying metrics collection if there are pending ops. This results in a speedup of around 30% for small CPU bound unit tests. It performs this check and possible delay on every collection now, fixing an issue with parent test leaks into steps. --- cli/js/40_testing.js | 81 ++++++++++++++----- .../testdata/test/ops_sanitizer_step_leak.out | 10 +++ .../testdata/test/ops_sanitizer_step_leak.ts | 10 +++ cli/tests/unit/fetch_test.ts | 23 ++++-- cli/tests/unit/http_test.ts | 22 ++++- cli/tests/unit/net_test.ts | 5 +- cli/tests/unit/serve_test.ts | 42 ++++++++-- cli/tests/unit/streams_test.ts | 3 +- cli/tests/unit_node/child_process_test.ts | 12 +++ cli/tests/unit_node/net_test.ts | 6 ++ ext/broadcast_channel/01_broadcast_channel.js | 5 +- ext/fetch/26_fetch.js | 3 + 12 files changed, 185 insertions(+), 37 deletions(-) create mode 100644 cli/tests/testdata/test/ops_sanitizer_step_leak.out create mode 100644 cli/tests/testdata/test/ops_sanitizer_step_leak.ts diff --git a/cli/js/40_testing.js b/cli/js/40_testing.js index 443b91596d..30905fb547 100644 --- a/cli/js/40_testing.js +++ b/cli/js/40_testing.js @@ -40,25 +40,39 @@ let hasSetOpSanitizerDelayMacrotask = false; // that resolves when it's (probably) fine to run the op sanitizer. // // This is implemented by adding a macrotask callback that runs after the -// timer macrotasks, so we can guarantee that a currently running interval -// will have an associated op. An additional `setTimeout` of 0 is needed -// before that, though, in order to give time for worker message ops to finish -// (since timeouts of 0 don't queue tasks in the timer queue immediately). -function opSanitizerDelay() { +// all ready async ops resolve, and the timer macrotask. Using just a macrotask +// callback without delaying is sufficient, because when the macrotask callback +// runs after async op dispatch, we know that all async ops that can currently +// return `Poll::Ready` have done so, and have been dispatched to JS. +// +// Worker ops are an exception to this, because there is no way for the user to +// await shutdown of the worker from the thread calling `worker.terminate()`. +// Because of this, we give extra leeway for worker ops to complete, by waiting +// for a whole millisecond if there are pending worker ops. +function opSanitizerDelay(hasPendingWorkerOps) { if (!hasSetOpSanitizerDelayMacrotask) { core.setMacrotaskCallback(handleOpSanitizerDelayMacrotask); hasSetOpSanitizerDelayMacrotask = true; } - return new Promise((resolve) => { + const p = new Promise((resolve) => { + // Schedule an async op to complete immediately to ensure the macrotask is + // run. We rely on the fact that enqueueing the resolver callback during the + // timeout callback will mean that the resolver gets called in the same + // event loop tick as the timeout callback. setTimeout(() => { ArrayPrototypePush(opSanitizerDelayResolveQueue, resolve); - }, 1); + }, hasPendingWorkerOps ? 1 : 0); }); + return p; } function handleOpSanitizerDelayMacrotask() { - ArrayPrototypeShift(opSanitizerDelayResolveQueue)?.(); - return opSanitizerDelayResolveQueue.length === 0; + const resolve = ArrayPrototypeShift(opSanitizerDelayResolveQueue); + if (resolve) { + resolve(); + return opSanitizerDelayResolveQueue.length === 0; + } + return undefined; // we performed no work, so can skip microtasks checkpoint } // An async operation to $0 was started in this test, but never completed. This is often caused by not $1. @@ -126,7 +140,8 @@ const OP_DETAILS = { "op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"], "op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"], "op_utime_async": ["change file timestamps", "awaiting the result of a `Deno.utime` call"], - "op_worker_recv_message": ["receive a message from a web worker", "terminating a `Worker`"], + "op_host_recv_message": ["receive a message from a web worker", "terminating a `Worker`"], + "op_host_recv_ctrl": ["receive a message from a web worker", "terminating a `Worker`"], "op_ws_close": ["close a WebSocket", "awaiting until the `close` event is emitted on a `WebSocket`, or the `WebSocketStream#closed` promise resolves"], "op_ws_create": ["create a WebSocket", "awaiting until the `open` event is emitted on a `WebSocket`, or the result of a `WebSocketStream#connection` promise"], "op_ws_next_event": ["receive the next message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"], @@ -136,6 +151,28 @@ const OP_DETAILS = { "op_ws_send_pong": ["send a message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"], }; +function collectReliableOpMetrics() { + let metrics = core.metrics(); + if (metrics.opsDispatched > metrics.opsCompleted) { + // If there are still async ops pending, we drain the event loop to the + // point where all ops that can return `Poll::Ready` have done so, to ensure + // that any ops are ready because of user cleanup code are completed. + const hasPendingWorkerOps = metrics.ops.op_host_recv_message && ( + metrics.ops.op_host_recv_message.opsDispatched > + metrics.ops.op_host_recv_message.opsCompleted || + metrics.ops.op_host_recv_ctrl.opsDispatched > + metrics.ops.op_host_recv_ctrl.opsCompleted + ); + return opSanitizerDelay(hasPendingWorkerOps).then(() => { + metrics = core.metrics(); + const traces = new Map(core.opCallTraces); + return { metrics, traces }; + }); + } + const traces = new Map(core.opCallTraces); + return { metrics, traces }; +} + // Wrap test function in additional assertion that makes sure // the test case does not leak async "ops" - ie. number of async // completed ops after the test is the same as number of dispatched @@ -144,19 +181,26 @@ const OP_DETAILS = { function assertOps(fn) { /** @param desc {TestDescription | TestStepDescription} */ return async function asyncOpSanitizer(desc) { - const pre = core.metrics(); - const preTraces = new Map(core.opCallTraces); + let metrics = collectReliableOpMetrics(); + if (metrics.then) { + // We're delaying so await to get the result asynchronously. + metrics = await metrics; + } + const { metrics: pre, traces: preTraces } = metrics; + let post; + let postTraces; + try { const innerResult = await fn(desc); if (innerResult) return innerResult; } finally { - // Defer until next event loop turn - that way timeouts and intervals - // cleared can actually be removed from resource table, otherwise - // false positives may occur (https://github.com/denoland/deno/issues/4591) - await opSanitizerDelay(); + let metrics = collectReliableOpMetrics(); + if (metrics.then) { + // We're delaying so await to get the result asynchronously. + metrics = await metrics; + } + ({ metrics: post, traces: postTraces } = metrics); } - const post = core.metrics(); - const postTraces = new Map(core.opCallTraces); // We're checking diff because one might spawn HTTP server in the background // that will be a pending async op before test starts. @@ -232,6 +276,7 @@ function assertOps(fn) { ArrayPrototypePush(details, message); } } + return { failed: { leakedOps: [details, core.isOpCallTracingEnabled()] } }; }; } diff --git a/cli/tests/testdata/test/ops_sanitizer_step_leak.out b/cli/tests/testdata/test/ops_sanitizer_step_leak.out new file mode 100644 index 0000000000..296c5e5733 --- /dev/null +++ b/cli/tests/testdata/test/ops_sanitizer_step_leak.out @@ -0,0 +1,10 @@ +Check [WILDCARD]/cli/tests/testdata/test/ops_sanitizer_step_leak.ts +running 1 test from ./cli/tests/testdata/test/ops_sanitizer_step_leak.ts +timeout ... + step ... ok [WILDCARD] +------- output ------- +done +----- output end ----- +timeout ... ok [WILDCARD] + +ok | 1 passed (1 step) | 0 failed [WILDCARD] \ No newline at end of file diff --git a/cli/tests/testdata/test/ops_sanitizer_step_leak.ts b/cli/tests/testdata/test/ops_sanitizer_step_leak.ts new file mode 100644 index 0000000000..3fb9b397e5 --- /dev/null +++ b/cli/tests/testdata/test/ops_sanitizer_step_leak.ts @@ -0,0 +1,10 @@ +Deno.test("timeout", async (t) => { + const timer = setTimeout(() => { + console.log("timeout"); + }, 10000); + clearTimeout(timer); + await t.step("step", async () => { + await new Promise((resolve) => setTimeout(() => resolve(), 10)); + }); + console.log("done"); +}); diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index 83386d2ee5..4b6f3450d4 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -1752,7 +1752,8 @@ Deno.test( // if transfer-encoding is sent, content-length is ignored // even if it has an invalid value (content-length > totalLength) const listener = invalidServer(addr, body); - const response = await fetch(`http://${addr}/`); + const client = Deno.createHttpClient({}); + const response = await fetch(`http://${addr}/`, { client }); const res = await response.arrayBuffer(); const buf = new TextEncoder().encode(data); @@ -1760,6 +1761,7 @@ Deno.test( assertEquals(new Uint8Array(res), buf); listener.close(); + client.close(); }, ); @@ -1781,21 +1783,23 @@ Deno.test( // It should fail if multiple content-length headers with different values are sent const listener = invalidServer(addr, body); + const client = Deno.createHttpClient({}); await assertRejects( async () => { - await fetch(`http://${addr}/`); + await fetch(`http://${addr}/`, { client }); }, TypeError, "invalid content-length parsed", ); listener.close(); + client.close(); }, ); Deno.test( { permissions: { net: true } }, - async function fetchWithInvalidContentLength(): Promise< + async function fetchWithInvalidContentLength2(): Promise< void > { const addr = `127.0.0.1:${listenPort}`; @@ -1807,7 +1811,8 @@ Deno.test( ); const listener = invalidServer(addr, body); - const response = await fetch(`http://${addr}/`); + const client = Deno.createHttpClient({}); + const response = await fetch(`http://${addr}/`, { client }); // If content-length < totalLength, a maximum of content-length bytes // should be returned. @@ -1817,12 +1822,13 @@ Deno.test( assertEquals(new Uint8Array(res), buf.subarray(contentLength)); listener.close(); + client.close(); }, ); Deno.test( { permissions: { net: true } }, - async function fetchWithInvalidContentLength(): Promise< + async function fetchWithInvalidContentLength3(): Promise< void > { const addr = `127.0.0.1:${listenPort}`; @@ -1834,7 +1840,8 @@ Deno.test( ); const listener = invalidServer(addr, body); - const response = await fetch(`http://${addr}/`); + const client = Deno.createHttpClient({}); + const response = await fetch(`http://${addr}/`, { client }); // If content-length > totalLength, a maximum of content-length bytes // should be returned. await assertRejects( @@ -1846,6 +1853,7 @@ Deno.test( ); listener.close(); + client.close(); }, ); @@ -1935,10 +1943,12 @@ Deno.test( }, }); + const client = Deno.createHttpClient({}); const err = await assertRejects(() => fetch(`http://localhost:${listenPort}/`, { body: stream, method: "POST", + client, }) ); @@ -1948,6 +1958,7 @@ Deno.test( assertEquals(err.cause.message, "foo"); await server; + client.close(); }, ); diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 8d246b9796..4fef626d64 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -198,12 +198,14 @@ Deno.test( await respondWith(new Response(stream.readable)); })(); - const resp = await fetch(`http://127.0.0.1:${listenPort}/`); + const client = Deno.createHttpClient({}); + const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { client }); const respBody = await resp.text(); assertEquals("hello world", respBody); await promise; httpConn!.close(); listener.close(); + client.close(); }, ); @@ -216,8 +218,8 @@ Deno.test( writer.write(new TextEncoder().encode("world")); writer.close(); + const listener = Deno.listen({ port: listenPort }); const promise = (async () => { - const listener = Deno.listen({ port: listenPort }); const conn = await listener.accept(); const httpConn = Deno.serveHttp(conn); const evt = await httpConn.nextRequest(); @@ -235,14 +237,17 @@ Deno.test( listener.close(); })(); + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { body: stream.readable, method: "POST", headers: { "connection": "close" }, + client, }); await resp.arrayBuffer(); await promise; + client.close(); }, ); @@ -375,9 +380,11 @@ Deno.test( await respondWith(new Response("response")); })(); + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { method: "POST", body: "request", + client, }); const respBody = await resp.text(); assertEquals("response", respBody); @@ -385,6 +392,7 @@ Deno.test( httpConn!.close(); listener.close(); + client.close(); }, ); @@ -427,9 +435,11 @@ Deno.test( listener.close(); })(); + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${listenPort}/`); await resp.body!.cancel(); await promise; + client.close(); }, ); @@ -788,7 +798,11 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { socket.send(m.data); socket.close(1001); }; + const close = new Promise((resolve) => { + socket.onclose = () => resolve(); + }); await respondWith(response); + await close; })(); const def = deferred(); @@ -1228,11 +1242,15 @@ Deno.test( async function client() { const socket = new WebSocket(`ws://${hostname}:${port}/`); socket.onopen = () => socket.send("bla bla"); + const closed = new Promise((resolve) => { + socket.onclose = () => resolve(); + }); const { data } = await new Promise>((res) => socket.onmessage = res ); assertStrictEquals(data, "bla bla"); socket.close(); + await closed; } await Promise.all([server(), client()]); diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts index ca4c5191cb..54edf31fc7 100644 --- a/cli/tests/unit/net_test.ts +++ b/cli/tests/unit/net_test.ts @@ -901,7 +901,7 @@ Deno.test( ); Deno.test({ permissions: { net: true } }, async function whatwgStreams() { - (async () => { + const server = (async () => { const listener = Deno.listen({ hostname: "127.0.0.1", port: listenPort }); const conn = await listener.accept(); await conn.readable.pipeTo(conn.writable); @@ -920,6 +920,7 @@ Deno.test({ permissions: { net: true } }, async function whatwgStreams() { assert(!done); assertEquals(decoder.decode(value), "Hello World"); await reader.cancel(); + await server; }); Deno.test( @@ -973,7 +974,7 @@ Deno.test( Deno.test( { permissions: { read: true, run: true } }, - async function netListenUnref() { + async function netListenUnref2() { const [statusCode, _output] = await execCode(` async function main() { const listener = Deno.listen({ port: ${listenPort} }); diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index cfe74e6594..2e7836b6a9 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -965,7 +965,8 @@ function createStreamTest(count: number, delay: number, action: string) { try { await listeningPromise; - const resp = await fetch(`http://127.0.0.1:${servePort}/`); + const client = Deno.createHttpClient({}); + const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client }); if (action == "Throw") { await assertRejects(async () => { await resp.text(); @@ -980,6 +981,7 @@ function createStreamTest(count: number, delay: number, action: string) { assertEquals(text, expected); } + client.close(); } finally { ac.abort(); await server.shutdown(); @@ -1096,12 +1098,14 @@ Deno.test( }); await listeningPromise; - const resp = await fetch(`http://127.0.0.1:${servePort}/`); + const client = Deno.createHttpClient({}); + const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client }); const respBody = await resp.text(); assertEquals("", respBody); ac.abort(); await server.finished; + client.close(); }, ); @@ -1138,12 +1142,14 @@ Deno.test( }); await listeningPromise; - const resp = await fetch(`http://127.0.0.1:${servePort}/`); + const client = Deno.createHttpClient({}); + const resp = await fetch(`http://127.0.0.1:${servePort}/`, { client }); // Incorrectly implemented reader ReadableStream should reject. assertStringIncludes(await resp.text(), "Failed to execute 'enqueue'"); await errorPromise; ac.abort(); await server.finished; + client.close(); }, ); @@ -1187,6 +1193,7 @@ Deno.test( Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { const ac = new AbortController(); const listeningPromise = deferred(); + const done = deferred(); const server = Deno.serve({ handler: (request) => { const { @@ -1201,6 +1208,7 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { socket.send(m.data); socket.close(1001); }; + socket.onclose = () => done.resolve(); return response; }, port: servePort, @@ -1221,6 +1229,7 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { ws.onopen = () => ws.send("foo"); await def; + await done; ac.abort(); await server.finished; }); @@ -1308,6 +1317,7 @@ Deno.test( { permissions: { net: true } }, async function httpServerWebSocketUpgradeTwice() { const ac = new AbortController(); + const done = deferred(); const listeningPromise = deferred(); const server = Deno.serve({ handler: (request) => { @@ -1330,6 +1340,7 @@ Deno.test( socket.send(m.data); socket.close(1001); }; + socket.onclose = () => done.resolve(); return response; }, port: servePort, @@ -1350,6 +1361,7 @@ Deno.test( ws.onopen = () => ws.send("foo"); await def; + await done; ac.abort(); await server.finished; }, @@ -1359,6 +1371,7 @@ Deno.test( { permissions: { net: true } }, async function httpServerWebSocketCloseFast() { const ac = new AbortController(); + const done = deferred(); const listeningPromise = deferred(); const server = Deno.serve({ handler: (request) => { @@ -1367,6 +1380,7 @@ Deno.test( socket, } = Deno.upgradeWebSocket(request); socket.onopen = () => socket.close(); + socket.onclose = () => done.resolve(); return response; }, port: servePort, @@ -1385,6 +1399,7 @@ Deno.test( ws.onclose = () => def.resolve(); await def; + await done; ac.abort(); await server.finished; }, @@ -1394,6 +1409,7 @@ Deno.test( { permissions: { net: true } }, async function httpServerWebSocketCanAccessRequest() { const ac = new AbortController(); + const done = deferred(); const listeningPromise = deferred(); const server = Deno.serve({ handler: (request) => { @@ -1409,6 +1425,7 @@ Deno.test( socket.send(request.url.toString()); socket.close(1001); }; + socket.onclose = () => done.resolve(); return response; }, port: servePort, @@ -1430,6 +1447,7 @@ Deno.test( ws.onopen = () => ws.send("foo"); await def; + await done; ac.abort(); await server.finished; }, @@ -1588,9 +1606,11 @@ Deno.test( ); const { readable, writable } = new TransformStream(); + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${servePort}/`, { method: "POST", body: readable, + client, }); await promise; @@ -1598,6 +1618,7 @@ Deno.test( await testDuplex(resp.body.getReader(), writable.getWriter()); ac.abort(); await server.finished; + client.close(); }, ); @@ -1632,9 +1653,11 @@ Deno.test( ); const { readable, writable } = new TransformStream(); + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${servePort}/`, { method: "POST", body: readable, + client, }); await promise; @@ -1642,6 +1665,7 @@ Deno.test( await testDuplex(resp.body.getReader(), writable.getWriter()); ac.abort(); await server.finished; + client.close(); }, ); @@ -2600,9 +2624,12 @@ for (const testCase of compressionTestCases) { }); try { await listeningPromise; + const client = Deno.createHttpClient({}); const resp = await fetch(`http://127.0.0.1:${servePort}/`, { headers: testCase.in as HeadersInit, + client, }); + client.close(); await promise; const body = await resp.arrayBuffer(); if (testCase.expect == null) { @@ -3192,14 +3219,16 @@ Deno.test( let count = 0; const server = Deno.serve({ async onListen({ port }: { port: number }) { - const res1 = await fetch(`http://localhost:${port}/`); + const client = Deno.createHttpClient({}); + const res1 = await fetch(`http://localhost:${port}/`, { client }); assertEquals(await res1.text(), "hello world 1"); - const res2 = await fetch(`http://localhost:${port}/`); + const res2 = await fetch(`http://localhost:${port}/`, { client }); assertEquals(await res2.text(), "hello world 2"); promise.resolve(); ac.abort(); + client.close(); }, signal: ac.signal, }, () => { @@ -3252,13 +3281,16 @@ Deno.test( try { const port = await listeningPromise; + const client = Deno.createHttpClient({}); const resp = await fetch(`http://localhost:${port}/`, { headers: { connection: "close" }, method: "POST", body: '{"sus":true}', + client, }); const text = await resp.text(); assertEquals(text, "ok"); + client.close(); } finally { ac.abort(); await server.finished; diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index 4ecea39da1..3460e8ecd3 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -1,6 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts"; -import { assertEquals, Deferred, deferred } from "./test_util.ts"; +import { assertEquals, Deferred, deferred, fail } from "./test_util.ts"; const { core, diff --git a/cli/tests/unit_node/child_process_test.ts b/cli/tests/unit_node/child_process_test.ts index a282977e01..8d1c9a6b73 100644 --- a/cli/tests/unit_node/child_process_test.ts +++ b/cli/tests/unit_node/child_process_test.ts @@ -497,9 +497,15 @@ Deno.test({ ); const childProcess = spawn(Deno.execPath(), ["run", script]); const p = withTimeout(); + const pStdout = withTimeout(); + const pStderr = withTimeout(); childProcess.on("exit", () => p.resolve()); + childProcess.stdout.on("close", () => pStdout.resolve()); + childProcess.stderr.on("close", () => pStderr.resolve()); childProcess.kill("SIGKILL"); await p; + await pStdout; + await pStderr; assert(childProcess.killed); assertEquals(childProcess.signalCode, "SIGKILL"); assertExists(childProcess.exitCode); @@ -633,9 +639,15 @@ Deno.test({ // Spawn an infinite cat const cp = spawn("cat", ["-"]); const p = withTimeout(); + const pStdout = withTimeout(); + const pStderr = withTimeout(); cp.on("exit", () => p.resolve()); + cp.stdout.on("close", () => pStdout.resolve()); + cp.stderr.on("close", () => pStderr.resolve()); cp.kill("SIGIOT"); await p; + await pStdout; + await pStderr; assert(cp.killed); assertEquals(cp.signalCode, "SIGIOT"); }, diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts index b9d9b796a3..ca9d214e19 100644 --- a/cli/tests/unit_node/net_test.ts +++ b/cli/tests/unit_node/net_test.ts @@ -143,12 +143,17 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async () } const finished = deferred(); + const serverSocketsClosed: Deferred[] = []; const server = net.createServer(); server.on("connection", (socket) => { assert(socket !== undefined); + const i = serverSocketsClosed.push(deferred()); socket.on("data", (data) => { socket.write(new TextDecoder().decode(data)); }); + socket.on("close", () => { + serverSocketsClosed[i - 1].resolve(); + }); }); const sockets: TestSocket[] = []; @@ -190,6 +195,7 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async () }); await finished; + await Promise.all(serverSocketsClosed); for (let i = 0; i < socketCount; i++) { assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]); diff --git a/ext/broadcast_channel/01_broadcast_channel.js b/ext/broadcast_channel/01_broadcast_channel.js index ab315eaa6e..e5e4169b66 100644 --- a/ext/broadcast_channel/01_broadcast_channel.js +++ b/ext/broadcast_channel/01_broadcast_channel.js @@ -15,8 +15,9 @@ import DOMException from "ext:deno_web/01_dom_exception.js"; const primordials = globalThis.__bootstrap.primordials; const { ArrayPrototypeIndexOf, - ArrayPrototypeSplice, ArrayPrototypePush, + ArrayPrototypeSplice, + PromisePrototypeThen, Symbol, Uint8Array, } = primordials; @@ -70,7 +71,7 @@ function dispatch(source, name, data) { // for that reason: it lets promises make forward progress but can // still starve other parts of the event loop. function defer(go) { - setTimeout(go, 1); + PromisePrototypeThen(core.ops.op_void_async_deferred(), () => go()); } class BroadcastChannel extends EventTarget { diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 311a197a87..f1c771dc04 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -284,6 +284,9 @@ async function mainFetch(req, recursive, terminator) { cause: requestSendError, }); } + if (requestBodyRid !== null) { + core.tryClose(requestBodyRid); + } throw err; } finally { if (cancelHandleRid !== null) {