fix(ext/node): UdpSocket ref and unref (#21777)

This commit is contained in:
Divy Srivastava 2024-01-04 08:51:39 +05:30 committed by GitHub
parent a0b6872359
commit ad65440092
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 4 deletions

View file

@ -58,6 +58,7 @@ util::unit_test_factory!(
crypto_key_test = crypto / crypto_key_test,
crypto_sign_test = crypto / crypto_sign_test,
events_test,
dgram_test,
fs_test,
http_test,
http2_test,

View file

@ -0,0 +1,59 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assertEquals } from "../../../test_util/std/assert/mod.ts";
import { execCode } from "../unit/test_util.ts";
import { createSocket } from "node:dgram";
const listenPort = 4503;
const listenPort2 = 4504;
Deno.test("[node/dgram] udp ref and unref", {
permissions: { read: true, run: true, net: true },
}, async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const udpSocket = createSocket("udp4");
udpSocket.bind(listenPort);
udpSocket.unref();
udpSocket.ref();
let data;
udpSocket.on("message", (buffer, _rinfo) => {
data = Uint8Array.from(buffer);
udpSocket.close();
});
udpSocket.on("close", () => {
resolve();
});
const conn = await Deno.listenDatagram({
port: listenPort2,
transport: "udp",
});
await conn.send(new Uint8Array([0, 1, 2, 3]), {
transport: "udp",
port: listenPort,
hostname: "127.0.0.1",
});
await promise;
conn.close();
assertEquals(data, new Uint8Array([0, 1, 2, 3]));
});
Deno.test("[node/dgram] udp unref", {
permissions: { read: true, run: true, net: true },
}, async () => {
const [statusCode, _output] = await execCode(`
import { createSocket } from "node:dgram";
const udpSocket = createSocket('udp4');
udpSocket.bind(${listenPort2});
// This should let the program to exit without waiting for the
// udp socket to close.
udpSocket.unref();
udpSocket.on('message', (buffer, rinfo) => {
});
`);
assertEquals(statusCode, 0);
});

View file

@ -295,6 +295,8 @@ class Listener {
class Datagram {
#rid = 0;
#addr = null;
#unref = false;
#promise = null;
constructor(rid, addr, bufSize = 1024) {
this.#rid = rid;
@ -367,10 +369,12 @@ class Datagram {
let remoteAddr;
switch (this.addr.transport) {
case "udp": {
({ 0: nread, 1: remoteAddr } = await op_net_recv_udp(
this.#promise = op_net_recv_udp(
this.rid,
buf,
));
);
if (this.#unref) core.unrefOpPromise(this.#promise);
({ 0: nread, 1: remoteAddr } = await this.#promise);
remoteAddr.transport = "udp";
break;
}
@ -413,6 +417,20 @@ class Datagram {
core.close(this.rid);
}
ref() {
this.#unref = false;
if (this.#promise !== null) {
core.refOpPromise(this.#promise);
}
}
unref() {
this.#unref = true;
if (this.#promise !== null) {
core.unrefOpPromise(this.#promise);
}
}
async *[SymbolAsyncIterator]() {
while (true) {
try {

View file

@ -78,6 +78,7 @@ export class UDP extends HandleWrap {
#listener?: Deno.DatagramConn;
#receiving = false;
#unrefed = false;
#recvBufferSize = UDP_DGRAM_MAXSIZE;
#sendBufferSize = UDP_DGRAM_MAXSIZE;
@ -273,7 +274,8 @@ export class UDP extends HandleWrap {
}
override ref() {
notImplemented("udp.UDP.prototype.ref");
this.#listener?.ref();
this.#unrefed = false;
}
send(
@ -315,7 +317,8 @@ export class UDP extends HandleWrap {
}
override unref() {
notImplemented("udp.UDP.prototype.unref");
this.#listener?.unref();
this.#unrefed = true;
}
#doBind(ip: string, port: number, _flags: number, family: number): number {
@ -443,6 +446,10 @@ export class UDP extends HandleWrap {
let remoteAddr: Deno.NetAddr | null;
let nread: number | null;
if (this.#unrefed) {
this.#listener!.unref();
}
try {
[buf, remoteAddr] = (await this.#listener!.receive(p)) as [
Uint8Array,