feat(unstable): Add Deno.Conn.ref()/unref() (#17170)

This commit adds "Deno.Conn.ref()" and "Deno.Conn.unref()" methods.

These methods can be used to make connection block or not block the
event loop from finishing. Refing/unrefing only influences "read" 
operations - ie. scheduling writes to a connection _do_ keep event 
loop alive.

Required for https://github.com/denoland/deno/issues/16710
This commit is contained in:
Bartek Iwańczuk 2022-12-28 10:29:48 +01:00 committed by GitHub
parent 7ce2b58bcf
commit bece1ce057
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 110 additions and 16 deletions

View file

@ -906,6 +906,55 @@ Deno.test({
listener.close();
});
Deno.test(
{ permissions: { net: true, read: true, run: true } },
async function netConnUnref() {
const listener = Deno.listen({ port: 3500 });
const intervalId = setInterval(() => {}); // This keeps event loop alive.
const program = execCode(`
async function main() {
const conn = await Deno.connect({ port: 3500 });
conn.unref();
await conn.read(new Uint8Array(10)); // The program exits here
throw new Error(); // The program doesn't reach here
}
main();
`);
const conn = await listener.accept();
const [statusCode, _output] = await program;
conn.close();
listener.close();
clearInterval(intervalId);
assertEquals(statusCode, 0);
},
);
Deno.test(
{ permissions: { net: true, read: true, run: true } },
async function netConnUnrefReadable() {
const listener = Deno.listen({ port: 3500 });
const intervalId = setInterval(() => {}); // This keeps event loop alive.
const program = execCode(`
async function main() {
const conn = await Deno.connect({ port: 3500 });
conn.unref();
const reader = conn.readable.getReader();
await reader.read(); // The program exits here
throw new Error(); // The program doesn't reach here
}
main();
`);
const conn = await listener.accept();
const [statusCode, _output] = await program;
conn.close();
listener.close();
clearInterval(intervalId);
assertEquals(statusCode, 0);
},
);
Deno.test({ permissions: { net: true } }, async function netTcpReuseAddr() {
const listener1 = Deno.listen({
hostname: "127.0.0.1",

View file

@ -4,8 +4,12 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streams;
const {
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
writableStreamForRid,
} = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@ -19,17 +23,6 @@
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
async function read(
rid,
buffer,
) {
if (buffer.length === 0) {
return 0;
}
const nread = await core.read(rid, buffer);
return nread === 0 ? null : nread;
}
async function write(rid, data) {
return await core.write(rid, data);
}
@ -46,6 +39,8 @@
#rid = 0;
#remoteAddr = null;
#localAddr = null;
#unref = false;
#pendingReadPromiseIds = [];
#readable;
#writable;
@ -72,8 +67,25 @@
return write(this.rid, p);
}
read(p) {
return read(this.rid, p);
async read(buffer) {
if (buffer.length === 0) {
return 0;
}
const promise = core.read(this.rid, buffer);
const promiseId = promise[promiseIdSymbol];
if (this.#unref) core.unrefOp(promiseId);
this.#pendingReadPromiseIds.push(promiseId);
let nread;
try {
nread = await promise;
} catch (e) {
throw e;
} finally {
this.#pendingReadPromiseIds = this.#pendingReadPromiseIds.filter((id) =>
id !== promiseId
);
}
return nread === 0 ? null : nread;
}
close() {
@ -86,7 +98,10 @@
get readable() {
if (this.#readable === undefined) {
this.#readable = readableStreamForRid(this.rid);
this.#readable = readableStreamForRidUnrefable(this.rid);
if (this.#unref) {
readableStreamForRidUnrefableUnref(this.#readable);
}
}
return this.#readable;
}
@ -97,6 +112,22 @@
}
return this.#writable;
}
ref() {
this.#unref = false;
if (this.#readable) {
readableStreamForRidUnrefableRef(this.#readable);
}
this.#pendingReadPromiseIds.forEach((id) => core.refOp(id));
}
unref() {
this.#unref = true;
if (this.#readable) {
readableStreamForRidUnrefableUnref(this.#readable);
}
this.#pendingReadPromiseIds.forEach((id) => core.unrefOp(id));
}
}
class TcpConn extends Conn {

View file

@ -61,6 +61,20 @@ declare namespace Deno {
* callers should just use `close()`. */
closeWrite(): Promise<void>;
/** **UNSTABLE**: New API, yet to be vetted.
*
* Make the connection block the event loop from finishing.
*
* Note: the connection blocks the event loop from finishing by default.
* This method is only meaningful after `.unref()` is called.
*/
ref(): void;
/** **UNSTABLE**: New API, yet to be vetted.
*
* Make the connection not block the event loop from finishing.
*/
unref(): void;
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<Uint8Array>;
}