feat: support UDP sockets (#3946)

This commit is contained in:
hazæ41 2020-02-21 17:26:54 +01:00 committed by GitHub
parent dd8a109481
commit 08686cbc3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 389 additions and 46 deletions

View file

@ -56,7 +56,7 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
termcolor = "1.0.5"
tokio = { version = "0.2", features = ["rt-core", "tcp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio = { version = "0.2", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio-rustls = "0.12.1"
url = "2.1.0"
utime = "0.2.1"

View file

@ -73,8 +73,12 @@ export {
export { metrics, Metrics } from "./metrics.ts";
export { mkdirSync, mkdir } from "./mkdir.ts";
export {
Addr,
connect,
listen,
recvfrom,
UDPConn,
UDPAddr,
Listener,
Conn,
ShutdownMode,

View file

@ -30,6 +30,8 @@ export let OP_REPL_START: number;
export let OP_REPL_READLINE: number;
export let OP_ACCEPT: number;
export let OP_ACCEPT_TLS: number;
export let OP_RECEIVE: number;
export let OP_SEND: number;
export let OP_CONNECT: number;
export let OP_SHUTDOWN: number;
export let OP_LISTEN: number;

View file

@ -1387,14 +1387,20 @@ declare namespace Deno {
*/
export function openPlugin(filename: string): Plugin;
type Transport = "tcp";
export type Transport = "tcp" | "udp";
interface Addr {
export interface Addr {
transport: Transport;
hostname: string;
port: number;
}
export interface UDPAddr {
transport?: Transport;
hostname?: string;
port: number;
}
/** UNSTABLE: Maybe remove ShutdownMode entirely. */
export enum ShutdownMode {
// See http://man7.org/linux/man-pages/man2/shutdown.2.html
@ -1417,6 +1423,36 @@ declare namespace Deno {
*/
export function shutdown(rid: number, how: ShutdownMode): void;
/** UNSTABLE: new API
* Waits for the next message to the passed rid and writes it on the passed buffer.
* Returns the number of bytes written and the remote address.
*/
export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>;
/** UNSTABLE: new API
* A socket is a generic transport listener for message-oriented protocols
*/
export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
/** UNSTABLE: new API
* Waits for and resolves to the next message to the `Socket`. */
receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
/** UNSTABLE: new API
* Sends a message to the target. */
send(p: Uint8Array, addr: UDPAddr): Promise<void>;
/** UNSTABLE: new API
* Close closes the socket. Any pending message promises will be rejected
* with errors.
*/
close(): void;
/** Return the address of the `Socket`. */
addr: Addr;
[Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>;
}
/** A Listener is a generic network listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
@ -1457,7 +1493,9 @@ declare namespace Deno {
transport?: Transport;
}
/** Listen announces on the local transport address.
/** UNSTABLE: new API
*
* Listen announces on the local transport address.
*
* Requires the allow-net permission.
*
@ -1476,7 +1514,13 @@ declare namespace Deno {
* listen({ hostname: "[2001:db8::1]", port: 80 });
* listen({ hostname: "golang.org", port: 80, transport: "tcp" })
*/
export function listen(options: ListenOptions): Listener;
export function listen(
options: ListenOptions & { transport?: "tcp" }
): Listener;
export function listen(
options: ListenOptions & { transport: "udp" }
): UDPConn;
export function listen(options: ListenOptions): Listener | UDPConn;
export interface ListenTLSOptions {
port: number;

View file

@ -4,7 +4,7 @@ import { read, write, close } from "./files.ts";
import * as dispatch from "./dispatch.ts";
import { sendSync, sendAsync } from "./dispatch_json.ts";
export type Transport = "tcp";
export type Transport = "tcp" | "udp";
// TODO support other types:
// export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
@ -14,6 +14,31 @@ export interface Addr {
port: number;
}
export interface UDPAddr {
transport?: Transport;
hostname?: string;
port: number;
}
/** A socket is a generic transport listener for message-oriented protocols */
export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> {
/** Waits for and resolves to the next message to the `Socket`. */
receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
/** Sends a message to the target. */
send(p: Uint8Array, addr: UDPAddr): Promise<void>;
/** Close closes the socket. Any pending message promises will be rejected
* with errors.
*/
close(): void;
/** Return the address of the `Socket`. */
addr: Addr;
[Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>;
}
/** A Listener is a generic transport listener for stream-oriented protocols. */
export interface Listener extends AsyncIterator<Conn> {
/** Waits for and resolves to the next connection to the `Listener`. */
@ -87,7 +112,7 @@ export class ConnImpl implements Conn {
export class ListenerImpl implements Listener {
constructor(
readonly rid: number,
public addr: Addr,
readonly addr: Addr,
private closing: boolean = false
) {}
@ -123,6 +148,63 @@ export class ListenerImpl implements Listener {
}
}
export async function recvfrom(
rid: number,
p: Uint8Array
): Promise<[number, Addr]> {
const { size, remoteAddr } = await sendAsync(dispatch.OP_RECEIVE, { rid }, p);
return [size, remoteAddr];
}
export class UDPConnImpl implements UDPConn {
constructor(
readonly rid: number,
readonly addr: Addr,
public bufSize: number = 1024,
private closing: boolean = false
) {}
async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> {
const buf = p || new Uint8Array(this.bufSize);
const [size, remoteAddr] = await recvfrom(this.rid, buf);
const sub = buf.subarray(0, size);
return [sub, remoteAddr];
}
async send(p: Uint8Array, addr: UDPAddr): Promise<void> {
const remote = { hostname: "127.0.0.1", transport: "udp", ...addr };
if (remote.transport !== "udp") throw Error("Remote transport must be UDP");
const args = { ...remote, rid: this.rid };
await sendAsync(dispatch.OP_SEND, args, p);
}
close(): void {
this.closing = true;
close(this.rid);
}
async next(): Promise<IteratorResult<[Uint8Array, Addr]>> {
if (this.closing) {
return { value: undefined, done: true };
}
return await this.receive()
.then(value => ({ value, done: false }))
.catch(e => {
// It wouldn't be correct to simply check this.closing here.
// TODO: Get a proper error kind for this case, don't check the message.
// The current error kind is Other.
if (e.message == "Socket has been closed") {
return { value: undefined, done: true };
}
throw e;
});
}
[Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]> {
return this;
}
}
export interface Conn extends Reader, Writer, Closer {
/** The local address of the connection. */
localAddr: Addr;
@ -146,14 +228,16 @@ export interface ListenOptions {
transport?: Transport;
}
const listenDefaults = { hostname: "0.0.0.0", transport: "tcp" };
/** Listen announces on the local transport address.
*
* @param options
* @param options.port The port to connect to. (Required.)
* @param options.hostname A literal IP address or host name that can be
* resolved to an IP address. If not specified, defaults to 0.0.0.0
* @param options.transport Defaults to "tcp". Later we plan to add "tcp4",
* "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4",
* "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* "unixpacket".
*
* Examples:
@ -163,16 +247,19 @@ export interface ListenOptions {
* listen({ hostname: "[2001:db8::1]", port: 80 });
* listen({ hostname: "golang.org", port: 80, transport: "tcp" })
*/
export function listen(options: ListenOptions): Listener {
const hostname = options.hostname || "0.0.0.0";
const transport = options.transport || "tcp";
export function listen(
options: ListenOptions & { transport?: "tcp" }
): Listener;
export function listen(options: ListenOptions & { transport: "udp" }): UDPConn;
export function listen(options: ListenOptions): Listener | UDPConn {
const args = { ...listenDefaults, ...options };
const res = sendSync(dispatch.OP_LISTEN, args);
const res = sendSync(dispatch.OP_LISTEN, {
hostname,
port: options.port,
transport
});
return new ListenerImpl(res.rid, res.localAddr);
if (args.transport === "tcp") {
return new ListenerImpl(res.rid, res.localAddr);
} else {
return new UDPConnImpl(res.rid, res.localAddr);
}
}
export interface ConnectOptions {
@ -189,8 +276,8 @@ const connectDefaults = { hostname: "127.0.0.1", transport: "tcp" };
* @param options.port The port to connect to. (Required.)
* @param options.hostname A literal IP address or host name that can be
* resolved to an IP address. If not specified, defaults to 127.0.0.1
* @param options.transport Defaults to "tcp". Later we plan to add "tcp4",
* "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4",
* "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and
* "unixpacket".
*
* Examples:

View file

@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { testPerm, assert, assertEquals } from "./test_util.ts";
testPerm({ net: true }, function netListenClose(): void {
testPerm({ net: true }, function netTcpListenClose(): void {
const listener = Deno.listen({ hostname: "127.0.0.1", port: 4500 });
assertEquals(listener.addr.transport, "tcp");
assertEquals(listener.addr.hostname, "127.0.0.1");
@ -9,7 +9,21 @@ testPerm({ net: true }, function netListenClose(): void {
listener.close();
});
testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> {
testPerm({ net: true }, function netUdpListenClose(): void {
if (Deno.build.os === "win") return; // TODO
const socket = Deno.listen({
hostname: "127.0.0.1",
port: 4500,
transport: "udp"
});
assertEquals(socket.addr.transport, "udp");
assertEquals(socket.addr.hostname, "127.0.0.1");
assertEquals(socket.addr.port, 4500);
socket.close();
});
testPerm({ net: true }, async function netTcpCloseWhileAccept(): Promise<void> {
const listener = Deno.listen({ port: 4501 });
const p = listener.accept();
listener.close();
@ -24,7 +38,7 @@ testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> {
assertEquals(err.message, "Listener has been closed");
});
testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
testPerm({ net: true }, async function netTcpConcurrentAccept(): Promise<void> {
const listener = Deno.listen({ port: 4502 });
let acceptErrCount = 0;
const checkErr = (e: Error): void => {
@ -44,7 +58,7 @@ testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
assertEquals(acceptErrCount, 1);
});
testPerm({ net: true }, async function netDialListen(): Promise<void> {
testPerm({ net: true }, async function netTcpDialListen(): Promise<void> {
const listener = Deno.listen({ port: 4500 });
listener.accept().then(
async (conn): Promise<void> => {
@ -76,18 +90,58 @@ testPerm({ net: true }, async function netDialListen(): Promise<void> {
conn.close();
});
testPerm({ net: true }, async function netListenCloseWhileIterating(): Promise<
void
> {
const listener = Deno.listen({ port: 8000 });
const nextWhileClosing = listener[Symbol.asyncIterator]().next();
listener.close();
assertEquals(await nextWhileClosing, { value: undefined, done: true });
testPerm({ net: true }, async function netUdpSendReceive(): Promise<void> {
if (Deno.build.os === "win") return; // TODO
const nextAfterClosing = listener[Symbol.asyncIterator]().next();
assertEquals(await nextAfterClosing, { value: undefined, done: true });
const alice = Deno.listen({ port: 4500, transport: "udp" });
assertEquals(alice.addr.port, 4500);
assertEquals(alice.addr.hostname, "0.0.0.0");
assertEquals(alice.addr.transport, "udp");
const bob = Deno.listen({ port: 4501, transport: "udp" });
assertEquals(bob.addr.port, 4501);
assertEquals(bob.addr.hostname, "0.0.0.0");
assertEquals(bob.addr.transport, "udp");
const sent = new Uint8Array([1, 2, 3]);
await alice.send(sent, bob.addr);
const [recvd, remote] = await bob.receive();
assertEquals(remote.port, 4500);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
assertEquals(2, recvd[1]);
assertEquals(3, recvd[2]);
});
testPerm(
{ net: true },
async function netTcpListenCloseWhileIterating(): Promise<void> {
const listener = Deno.listen({ port: 8000 });
const nextWhileClosing = listener[Symbol.asyncIterator]().next();
listener.close();
assertEquals(await nextWhileClosing, { value: undefined, done: true });
const nextAfterClosing = listener[Symbol.asyncIterator]().next();
assertEquals(await nextAfterClosing, { value: undefined, done: true });
}
);
testPerm(
{ net: true },
async function netUdpListenCloseWhileIterating(): Promise<void> {
if (Deno.build.os === "win") return; // TODO
const socket = Deno.listen({ port: 8000, transport: "udp" });
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
socket.close();
assertEquals(await nextWhileClosing, { value: undefined, done: true });
const nextAfterClosing = socket[Symbol.asyncIterator]().next();
assertEquals(await nextAfterClosing, { value: undefined, done: true });
}
);
/* TODO(ry) Re-enable this test.
testPerm({ net: true }, async function netListenAsyncIterator(): Promise<void> {
const listener = Deno.listen(":4500");

View file

@ -18,12 +18,15 @@ use std::task::Poll;
use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept))));
i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect))));
i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown))));
i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen))));
i.register_op("receive", s.core_op(json_op(s.stateful_op(op_receive))));
i.register_op("send", s.core_op(json_op(s.stateful_op(op_send))));
}
#[derive(Debug, PartialEq)]
@ -137,6 +140,121 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
pub struct Receive<'a> {
state: &'a State,
rid: ResourceId,
buf: ZeroCopyBuf,
}
impl Future for Receive<'_> {
type Output = Result<(usize, SocketAddr), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut state = inner.state.borrow_mut();
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(inner.rid)
.ok_or_else(|| {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
"Socket has been closed",
);
ErrBox::from(e)
})?;
let socket = &mut resource.socket;
socket
.poll_recv_from(cx, &mut inner.buf)
.map_err(ErrBox::from)
}
}
#[derive(Deserialize)]
struct ReceiveArgs {
rid: i32,
}
fn receive(state: &State, rid: ResourceId, buf: ZeroCopyBuf) -> Receive {
Receive { state, rid, buf }
}
fn op_receive(
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
let args: ReceiveArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
let op = async move {
let (size, remote_addr) = receive(&state_, rid, buf).await?;
Ok(json!({
"size": size,
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "udp",
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
#[derive(Deserialize)]
struct SendArgs {
rid: i32,
hostname: String,
port: u16,
transport: String,
}
fn op_send(
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
let args: SendArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "udp");
let rid = args.rid as u32;
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
"Socket has been closed",
);
ErrBox::from(e)
})?;
let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port).await?;
socket.send_to(&buf, addr).await?;
Ok(json!({}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
#[derive(Deserialize)]
struct ConnectArgs {
transport: String,
@ -278,18 +396,15 @@ impl TcpListenerResource {
}
}
fn op_listen(
struct UdpSocketResource {
socket: UdpSocket,
}
fn listen_tcp(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: ListenArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp");
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let mut state = state.borrow_mut();
let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
@ -297,10 +412,47 @@ fn op_listen(
waker: None,
local_addr,
};
let mut state = state.borrow_mut();
let rid = state
.resource_table
.add("tcpListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
fn listen_udp(
state: &State,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let mut state = state.borrow_mut();
let socket = futures::executor::block_on(UdpSocket::bind(&addr))?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource { socket };
let rid = state
.resource_table
.add("udpSocket", Box::new(socket_resource));
Ok((rid, local_addr))
}
fn op_listen(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: ListenArgs = serde_json::from_value(args)?;
assert!(args.transport == "tcp" || args.transport == "udp");
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let (rid, local_addr) = if args.transport == "tcp" {
listen_tcp(state, addr)?
} else {
listen_udp(state, addr)?
};
debug!(
"New listener {} {}:{}",
rid,

View file

@ -138,7 +138,7 @@ def http_benchmark(build_dir):
return {
# "deno_tcp" was once called "deno"
"deno_tcp": deno_tcp(deno_exe),
# "deno_http" was once called "deno_net_http"
# "deno_udp": deno_udp(deno_exe),
"deno_http": deno_http(deno_exe),
"deno_proxy": deno_http_proxy(deno_exe, hyper_hello_exe),
"deno_proxy_tcp": deno_tcp_proxy(deno_exe, hyper_hello_exe),