feat: Support Unix Domain Sockets (#4176)

This commit is contained in:
João Souto 2020-03-23 22:02:51 +00:00 committed by GitHub
parent b924e5ab7e
commit 70a5034431
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 740 additions and 268 deletions

View file

@ -54,7 +54,7 @@ sys-info = "=0.5.8" # 0.5.9 and 0.5.10 are broken on windows.
sourcemap = "5.0.0"
tempfile = "3.1.0"
termcolor = "1.1.0"
tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio = { version = "0.2.13", features = ["rt-core", "tcp", "udp", "uds", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] }
tokio-rustls = "0.13.0"
url = "2.1.1"
utime = "0.2.1"

View file

@ -70,12 +70,9 @@ export {
export { metrics, Metrics } from "./ops/runtime.ts";
export { mkdirSync, mkdir, MkdirOptions } from "./ops/fs/mkdir.ts";
export {
Addr,
connect,
listen,
recvfrom,
UDPConn,
UDPAddr,
DatagramConn,
Listener,
Conn,
ShutdownMode,

View file

@ -1546,21 +1546,18 @@ declare namespace Deno {
*
* Requires `allow-plugin` permission. */
export function openPlugin(filename: string): Plugin;
export type Transport = "tcp" | "udp";
export interface Addr {
transport: Transport;
export interface NetAddr {
transport: "tcp" | "udp";
hostname: string;
port: number;
}
export interface UDPAddr {
port: number;
transport?: Transport;
hostname?: string;
export interface UnixAddr {
transport: "unix" | "unixpacket";
address: string;
}
export type Addr = NetAddr | UnixAddr;
/** **UNSTABLE**: Maybe remove `ShutdownMode` entirely.
*
* Corresponds to `SHUT_RD`, `SHUT_WR`, `SHUT_RDWR` on POSIX-like systems.
@ -1585,18 +1582,10 @@ declare namespace Deno {
*/
export function shutdown(rid: number, how: ShutdownMode): void;
/** **UNSTABLE**: new API, yet to be vetted.
*
* Waits for the next message to the passed `rid` and writes it on the passed
* `Uint8Array`.
*
* Resolves to the number of bytes written and the remote address. */
export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>;
/** **UNSTABLE**: new API, yet to be vetted.
*
* A generic transport listener for message-oriented protocols. */
export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
/** **UNSTABLE**: new API, yet to be vetted.
*
* Waits for and resolves to the next message to the `UDPConn`. */
@ -1604,7 +1593,7 @@ declare namespace Deno {
/** UNSTABLE: new API, yet to be vetted.
*
* Sends a message to the target. */
send(p: Uint8Array, addr: UDPAddr): Promise<void>;
send(p: Uint8Array, addr: Addr): Promise<void>;
/** UNSTABLE: new API, yet to be vetted.
*
* Close closes the socket. Any pending message promises will be rejected
@ -1624,6 +1613,7 @@ declare namespace Deno {
close(): void;
/** Return the address of the `Listener`. */
readonly addr: Addr;
[Symbol.asyncIterator](): AsyncIterator<Conn>;
}
@ -1648,13 +1638,12 @@ declare namespace Deno {
/** A literal IP address or host name that can be resolved to an IP address.
* If not specified, defaults to `0.0.0.0`. */
hostname?: string;
/** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`.
*
* In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`,
* `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */
transport?: Transport;
}
export interface UnixListenOptions {
/** A Path to the Unix Socket. */
address: string;
}
/** **UNSTABLE**: new API
*
* Listen announces on the local transport address.
@ -1672,32 +1661,41 @@ declare namespace Deno {
*
* Listen announces on the local transport address.
*
* Deno.listen({ port: 80 })
* Deno.listen({ hostname: "192.0.2.1", port: 80 })
* Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
* Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
* Deno.listen({ address: "/foo/bar.sock", transport: "unix" })
*
* Requires `allow-net` permission. */
* Requires `allow-read` permission. */
export function listen(
options: ListenOptions & { transport: "udp" }
): UDPConn;
options: UnixListenOptions & { transport: "unix" }
): Listener;
/** **UNSTABLE**: new API
*
* Listen announces on the local transport address.
*
* Deno.listen({ port: 80 })
* Deno.listen({ hostname: "192.0.2.1", port: 80 })
* Deno.listen({ hostname: "[2001:db8::1]", port: 80 });
* Deno.listen({ hostname: "golang.org", port: 80, transport: "tcp" });
* Deno.listen({ port: 80, transport: "udp" })
* Deno.listen({ hostname: "golang.org", port: 80, transport: "udp" });
*
* Requires `allow-net` permission. */
export function listen(options: ListenOptions): Listener | UDPConn;
export function listen(
options: ListenOptions & { transport: "udp" }
): DatagramConn;
/** **UNSTABLE**: new API
*
* Listen announces on the local transport address.
*
* Deno.listen({ address: "/foo/bar.sock", transport: "unixpacket" })
*
* Requires `allow-read` permission. */
export function listen(
options: UnixListenOptions & { transport: "unixpacket" }
): DatagramConn;
export interface ListenTLSOptions extends ListenOptions {
/** Server certificate file. */
certFile: string;
/** Server public key file. */
keyFile: string;
transport?: "tcp";
}
/** Listen announces on the local transport address over TLS (transport layer
@ -1714,11 +1712,12 @@ declare namespace Deno {
/** A literal IP address or host name that can be resolved to an IP address.
* If not specified, defaults to `127.0.0.1`. */
hostname?: string;
/** Either `"tcp"` or `"udp"`. Defaults to `"tcp"`.
*
* In the future: `"tcp4"`, `"tcp6"`, `"udp4"`, `"udp6"`, `"ip"`, `"ip4"`,
* `"ip6"`, `"unix"`, `"unixgram"`, and `"unixpacket"`. */
transport?: Transport;
transport?: "tcp";
}
export interface UnixConnectOptions {
transport: "unix";
address: string;
}
/**
@ -1728,10 +1727,13 @@ declare namespace Deno {
* const conn1 = await Deno.connect({ port: 80 })
* const conn2 = await Deno.connect({ hostname: "192.0.2.1", port: 80 })
* const conn3 = await Deno.connect({ hostname: "[2001:db8::1]", port: 80 });
* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" })
* const conn4 = await Deno.connect({ hostname: "golang.org", port: 80, transport: "tcp" });
* const conn5 = await Deno.connect({ address: "/foo/bar.sock", transport: "unix" });
*
* Requires `allow-net` permission. */
export function connect(options: ConnectOptions): Promise<Conn>;
* Requires `allow-net` permission for "tcp" and `allow-read` for unix. */
export function connect(
options: ConnectOptions | UnixConnectOptions
): Promise<Conn>;
export interface ConnectTLSOptions {
/** The port to connect to. */

View file

@ -4,25 +4,13 @@ import { EOF, Reader, Writer, Closer } from "./io.ts";
import { read, write } from "./ops/io.ts";
import { close } from "./ops/resources.ts";
import * as netOps from "./ops/net.ts";
import { Transport } from "./ops/net.ts";
export { ShutdownMode, shutdown, Transport } from "./ops/net.ts";
import { Addr } from "./ops/net.ts";
export { ShutdownMode, shutdown, NetAddr, UnixAddr } from "./ops/net.ts";
export interface Addr {
transport: Transport;
hostname: string;
port: number;
}
export interface UDPAddr {
transport?: Transport;
hostname?: string;
port: number;
}
export interface UDPConn extends AsyncIterable<[Uint8Array, Addr]> {
export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>;
send(p: Uint8Array, addr: UDPAddr): Promise<void>;
send(p: Uint8Array, addr: Addr): Promise<void>;
close(): void;
@ -73,7 +61,7 @@ export class ListenerImpl implements Listener {
constructor(readonly rid: number, readonly addr: Addr) {}
async accept(): Promise<Conn> {
const res = await netOps.accept(this.rid);
const res = await netOps.accept(this.rid, this.addr.transport);
return new ConnImpl(res.rid, res.remoteAddr, res.localAddr);
}
@ -95,15 +83,7 @@ export class ListenerImpl implements Listener {
}
}
export async function recvfrom(
rid: number,
p: Uint8Array
): Promise<[number, Addr]> {
const { size, remoteAddr } = await netOps.receive(rid, p);
return [size, remoteAddr];
}
export class UDPConnImpl implements UDPConn {
export class DatagramImpl implements DatagramConn {
constructor(
readonly rid: number,
readonly addr: Addr,
@ -112,14 +92,18 @@ export class UDPConnImpl implements UDPConn {
async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> {
const buf = p || new Uint8Array(this.bufSize);
const [size, remoteAddr] = await recvfrom(this.rid, buf);
const { size, remoteAddr } = await netOps.receive(
this.rid,
this.addr.transport,
buf
);
const sub = buf.subarray(0, size);
return [sub, remoteAddr];
}
async send(p: Uint8Array, addr: UDPAddr): Promise<void> {
async send(p: Uint8Array, addr: Addr): Promise<void> {
const remote = { hostname: "127.0.0.1", transport: "udp", ...addr };
if (remote.transport !== "udp") throw Error("Remote transport must be UDP");
const args = { ...remote, rid: this.rid };
await netOps.send(args as netOps.SendRequest, p);
}
@ -153,38 +137,77 @@ export interface Conn extends Reader, Writer, Closer {
export interface ListenOptions {
port: number;
hostname?: string;
transport?: Transport;
transport?: "tcp" | "udp";
}
export interface UnixListenOptions {
transport: "unix" | "unixpacket";
address: string;
}
export function listen(
options: ListenOptions & { transport?: "tcp" }
): Listener;
export function listen(options: ListenOptions & { transport: "udp" }): UDPConn;
export function listen({
port,
hostname = "0.0.0.0",
transport = "tcp"
}: ListenOptions): Listener | UDPConn {
const res = netOps.listen({ port, hostname, transport });
export function listen(
options: UnixListenOptions & { transport: "unix" }
): Listener;
export function listen(
options: ListenOptions & { transport: "udp" }
): DatagramConn;
export function listen(
options: UnixListenOptions & { transport: "unixpacket" }
): DatagramConn;
export function listen(
options: ListenOptions | UnixListenOptions
): Listener | DatagramConn {
let res;
if (transport === "tcp") {
if (options.transport === "unix" || options.transport === "unixpacket") {
res = netOps.listen(options);
} else {
res = netOps.listen({
transport: "tcp",
hostname: "127.0.0.1",
...(options as ListenOptions)
});
}
if (
!options.transport ||
options.transport === "tcp" ||
options.transport === "unix"
) {
return new ListenerImpl(res.rid, res.localAddr);
} else {
return new UDPConnImpl(res.rid, res.localAddr);
return new DatagramImpl(res.rid, res.localAddr);
}
}
export interface ConnectOptions {
port: number;
hostname?: string;
transport?: Transport;
transport?: "tcp";
}
export interface UnixConnectOptions {
transport: "unix";
address: string;
}
export async function connect(options: UnixConnectOptions): Promise<Conn>;
export async function connect(options: ConnectOptions): Promise<Conn>;
export async function connect(
options: ConnectOptions | UnixConnectOptions
): Promise<Conn> {
let res;
if (options.transport === "unix") {
res = await netOps.connect(options);
} else {
res = await netOps.connect({
transport: "tcp",
hostname: "127.0.0.1",
...options
});
}
export async function connect({
port,
hostname = "127.0.0.1",
transport = "tcp"
}: ConnectOptions): Promise<Conn> {
const res = await netOps.connect({ port, hostname, transport });
return new ConnImpl(res.rid, res.remoteAddr!, res.localAddr!);
}

View file

@ -1,9 +1,18 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { sendSync, sendAsync } from "./dispatch_json.ts";
export type Transport = "tcp" | "udp";
// TODO support other types:
// export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
export interface NetAddr {
transport: "tcp" | "udp";
hostname: string;
port: number;
}
export interface UnixAddr {
transport: "unix" | "unixpacket";
address: string;
}
export type Addr = NetAddr | UnixAddr;
export enum ShutdownMode {
// See http://man7.org/linux/man-pages/man2/shutdown.2.html
@ -19,35 +28,22 @@ export function shutdown(rid: number, how: ShutdownMode): void {
interface AcceptResponse {
rid: number;
localAddr: {
hostname: string;
port: number;
transport: Transport;
};
remoteAddr: {
hostname: string;
port: number;
transport: Transport;
};
localAddr: Addr;
remoteAddr: Addr;
}
export function accept(rid: number): Promise<AcceptResponse> {
return sendAsync("op_accept", { rid });
export function accept(
rid: number,
transport: string
): Promise<AcceptResponse> {
return sendAsync("op_accept", { rid, transport });
}
export interface ListenRequest {
transport: Transport;
hostname: string;
port: number;
}
export type ListenRequest = Addr;
interface ListenResponse {
rid: number;
localAddr: {
hostname: string;
port: number;
transport: Transport;
};
localAddr: Addr;
}
export function listen(args: ListenRequest): ListenResponse {
@ -56,23 +52,11 @@ export function listen(args: ListenRequest): ListenResponse {
interface ConnectResponse {
rid: number;
localAddr: {
hostname: string;
port: number;
transport: Transport;
};
remoteAddr: {
hostname: string;
port: number;
transport: Transport;
};
localAddr: Addr;
remoteAddr: Addr;
}
export interface ConnectRequest {
transport: Transport;
hostname: string;
port: number;
}
export type ConnectRequest = Addr;
export function connect(args: ConnectRequest): Promise<ConnectResponse> {
return sendAsync("op_connect", args);
@ -80,26 +64,20 @@ export function connect(args: ConnectRequest): Promise<ConnectResponse> {
interface ReceiveResponse {
size: number;
remoteAddr: {
hostname: string;
port: number;
transport: Transport;
};
remoteAddr: Addr;
}
export function receive(
rid: number,
transport: string,
zeroCopy: Uint8Array
): Promise<ReceiveResponse> {
return sendAsync("op_receive", { rid }, zeroCopy);
return sendAsync("op_receive", { rid, transport }, zeroCopy);
}
export interface SendRequest {
export type SendRequest = {
rid: number;
hostname: string;
port: number;
transport: Transport;
}
} & Addr;
export async function send(
args: SendRequest,

View file

@ -1,9 +1,8 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { sendAsync, sendSync } from "./dispatch_json.ts";
import { Transport } from "./net.ts";
export interface ConnectTLSRequest {
transport: Transport;
transport: "tcp";
hostname: string;
port: number;
certFile?: string;
@ -14,12 +13,12 @@ interface ConnectTLSResponse {
localAddr: {
hostname: string;
port: number;
transport: Transport;
transport: "tcp";
};
remoteAddr: {
hostname: string;
port: number;
transport: Transport;
transport: "tcp";
};
}
@ -34,12 +33,12 @@ interface AcceptTLSResponse {
localAddr: {
hostname: string;
port: number;
transport: Transport;
transport: "tcp";
};
remoteAddr: {
hostname: string;
port: number;
transport: Transport;
transport: "tcp";
};
}
@ -50,7 +49,7 @@ export function acceptTLS(rid: number): Promise<AcceptTLSResponse> {
export interface ListenTLSRequest {
port: number;
hostname: string;
transport: Transport;
transport: "tcp";
certFile: string;
keyFile: string;
}
@ -60,7 +59,7 @@ interface ListenTLSResponse {
localAddr: {
hostname: string;
port: number;
transport: Transport;
transport: "tcp";
};
}

View file

@ -10,7 +10,7 @@ import {
unitTest({ perms: { net: true } }, function netTcpListenClose(): void {
const port = randomPort();
const listener = Deno.listen({ hostname: "127.0.0.1", port });
assertEquals(listener.addr.transport, "tcp");
assert(listener.addr.transport === "tcp");
assertEquals(listener.addr.hostname, "127.0.0.1");
assertEquals(listener.addr.port, port);
listener.close();
@ -29,13 +29,41 @@ unitTest(
port,
transport: "udp"
});
assertEquals(socket.addr.transport, "udp");
assert(socket.addr.transport === "udp");
assertEquals(socket.addr.hostname, "127.0.0.1");
assertEquals(socket.addr.port, port);
socket.close();
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
function netUnixListenClose(): void {
const filePath = Deno.makeTempFileSync();
const socket = Deno.listen({
address: filePath,
transport: "unix"
});
assert(socket.addr.transport === "unix");
assertEquals(socket.addr.address, filePath);
socket.close();
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
function netUnixPacketListenClose(): void {
const filePath = Deno.makeTempFileSync();
const socket = Deno.listen({
address: filePath,
transport: "unixpacket"
});
assert(socket.addr.transport === "unixpacket");
assertEquals(socket.addr.address, filePath);
socket.close();
}
);
unitTest(
{
perms: { net: true }
@ -57,6 +85,28 @@ unitTest(
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
async function netUnixCloseWhileAccept(): Promise<void> {
const filePath = await Deno.makeTempFile();
const listener = Deno.listen({
address: filePath,
transport: "unix"
});
const p = listener.accept();
listener.close();
let err;
try {
await p;
} catch (e) {
err = e;
}
assert(!!err);
assert(err instanceof Error);
assertEquals(err.message, "Listener has been closed");
}
);
unitTest(
{ perms: { net: true } },
async function netTcpConcurrentAccept(): Promise<void> {
@ -81,6 +131,31 @@ unitTest(
}
);
// TODO(jsouto): Enable when tokio updates mio to v0.7!
unitTest(
{ ignore: true, perms: { read: true, write: true } },
async function netUnixConcurrentAccept(): Promise<void> {
const filePath = await Deno.makeTempFile();
const listener = Deno.listen({ transport: "unix", address: filePath });
let acceptErrCount = 0;
const checkErr = (e: Error): void => {
if (e.message === "Listener has been closed") {
assertEquals(acceptErrCount, 1);
} else if (e.message === "Another accept task is ongoing") {
acceptErrCount++;
} else {
throw new Error("Unexpected error message");
}
};
const p = listener.accept().catch(checkErr);
const p1 = listener.accept().catch(checkErr);
await Promise.race([p, p1]);
listener.close();
await [p, p1];
assertEquals(acceptErrCount, 1);
}
);
unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
void
> {
@ -89,6 +164,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
listener.accept().then(
async (conn): Promise<void> => {
assert(conn.remoteAddr != null);
assert(conn.localAddr.transport === "tcp");
assertEquals(conn.localAddr.hostname, "127.0.0.1");
assertEquals(conn.localAddr.port, port);
await conn.write(new Uint8Array([1, 2, 3]));
@ -96,6 +172,7 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
}
);
const conn = await Deno.connect({ hostname: "127.0.0.1", port });
assert(conn.remoteAddr.transport === "tcp");
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
assertEquals(conn.remoteAddr.port, port);
assert(conn.localAddr != null);
@ -116,25 +193,62 @@ unitTest({ perms: { net: true } }, async function netTcpDialListen(): Promise<
conn.close();
});
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
async function netUnixDialListen(): Promise<void> {
const filePath = await Deno.makeTempFile();
const listener = Deno.listen({ address: filePath, transport: "unix" });
listener.accept().then(
async (conn): Promise<void> => {
assert(conn.remoteAddr != null);
assert(conn.localAddr.transport === "unix");
assertEquals(conn.localAddr.address, filePath);
await conn.write(new Uint8Array([1, 2, 3]));
conn.close();
}
);
const conn = await Deno.connect({ address: filePath, transport: "unix" });
assert(conn.remoteAddr.transport === "unix");
assertEquals(conn.remoteAddr.address, filePath);
assert(conn.remoteAddr != null);
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEquals(3, readResult);
assertEquals(1, buf[0]);
assertEquals(2, buf[1]);
assertEquals(3, buf[2]);
assert(conn.rid > 0);
assert(readResult !== Deno.EOF);
const readResult2 = await conn.read(buf);
assertEquals(Deno.EOF, readResult2);
listener.close();
conn.close();
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { net: true } },
async function netUdpSendReceive(): Promise<void> {
const alicePort = randomPort();
const alice = Deno.listen({ port: alicePort, transport: "udp" });
assert(alice.addr.transport === "udp");
assertEquals(alice.addr.port, alicePort);
assertEquals(alice.addr.hostname, "0.0.0.0");
assertEquals(alice.addr.transport, "udp");
assertEquals(alice.addr.hostname, "127.0.0.1");
const bobPort = randomPort();
const bob = Deno.listen({ port: bobPort, transport: "udp" });
assert(bob.addr.transport === "udp");
assertEquals(bob.addr.port, bobPort);
assertEquals(bob.addr.hostname, "0.0.0.0");
assertEquals(bob.addr.transport, "udp");
assertEquals(bob.addr.hostname, "127.0.0.1");
const sent = new Uint8Array([1, 2, 3]);
await alice.send(sent, bob.addr);
const [recvd, remote] = await bob.receive();
assert(remote.transport === "udp");
assertEquals(remote.port, alicePort);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
@ -145,6 +259,33 @@ unitTest(
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
async function netUnixPacketSendReceive(): Promise<void> {
const filePath = await Deno.makeTempFile();
const alice = Deno.listen({ address: filePath, transport: "unixpacket" });
assert(alice.addr.transport === "unixpacket");
assertEquals(alice.addr.address, filePath);
const bob = Deno.listen({ address: filePath, transport: "unixpacket" });
assert(bob.addr.transport === "unixpacket");
assertEquals(bob.addr.address, filePath);
const sent = new Uint8Array([1, 2, 3]);
await alice.send(sent, bob.addr);
const [recvd, remote] = await bob.receive();
assert(remote.transport === "unixpacket");
assertEquals(remote.address, filePath);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
assertEquals(2, recvd[1]);
assertEquals(3, recvd[2]);
alice.close();
bob.close();
}
);
unitTest(
{ perms: { net: true } },
async function netTcpListenCloseWhileIterating(): Promise<void> {
@ -173,6 +314,34 @@ unitTest(
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
async function netUnixListenCloseWhileIterating(): Promise<void> {
const filePath = Deno.makeTempFileSync();
const socket = Deno.listen({ address: filePath, transport: "unix" });
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
socket.close();
assertEquals(await nextWhileClosing, { value: undefined, done: true });
const nextAfterClosing = socket[Symbol.asyncIterator]().next();
assertEquals(await nextAfterClosing, { value: undefined, done: true });
}
);
unitTest(
{ ignore: Deno.build.os === "win", perms: { read: true, write: true } },
async function netUnixPacketListenCloseWhileIterating(): Promise<void> {
const filePath = Deno.makeTempFileSync();
const socket = Deno.listen({ address: filePath, transport: "unixpacket" });
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
socket.close();
assertEquals(await nextWhileClosing, { value: undefined, done: true });
const nextAfterClosing = socket[Symbol.asyncIterator]().next();
assertEquals(await nextAfterClosing, { value: undefined, done: true });
}
);
unitTest(
{
// FIXME(bartlomieju)

View file

@ -1,11 +1,11 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import * as tlsOps from "./ops/tls.ts";
import { Listener, Transport, Conn, ConnImpl, ListenerImpl } from "./net.ts";
import { Listener, Conn, ConnImpl, ListenerImpl } from "./net.ts";
// TODO(ry) There are many configuration options to add...
// https://docs.rs/rustls/0.16.0/rustls/struct.ClientConfig.html
interface ConnectTLSOptions {
transport?: Transport;
transport?: "tcp";
port: number;
hostname?: string;
certFile?: string;
@ -36,7 +36,7 @@ class TLSListenerImpl extends ListenerImpl {
export interface ListenTLSOptions {
port: number;
hostname?: string;
transport?: Transport;
transport?: "tcp";
certFile: string;
keyFile: string;
}

View file

@ -148,6 +148,8 @@ pub enum StreamResource {
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File, FileMetadata),
TcpStream(tokio::net::TcpStream),
#[cfg(not(windows))]
UnixStream(tokio::net::UnixStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>),
@ -183,6 +185,8 @@ impl DenoAsyncRead for StreamResource {
FsFile(f, _) => f,
Stdin(f, _) => f,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdout(f) => f,
@ -262,6 +266,8 @@ impl DenoAsyncWrite for StreamResource {
Stdout(f) => f,
Stderr(f) => f,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdin(f) => f,
@ -279,6 +285,8 @@ impl DenoAsyncWrite for StreamResource {
Stdout(f) => f,
Stderr(f) => f,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
ClientTlsStream(f) => f,
ServerTlsStream(f) => f,
ChildStdin(f) => f,

View file

@ -15,6 +15,8 @@ pub mod fs;
pub mod fs_events;
pub mod io;
pub mod net;
#[cfg(unix)]
mod net_unix;
pub mod os;
pub mod permissions;
pub mod plugins;

View file

@ -18,6 +18,9 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("op_accept", s.stateful_json_op(op_accept));
i.register_op("op_connect", s.stateful_json_op(op_connect));
@ -30,14 +33,14 @@ pub fn init(i: &mut Isolate, s: &State) {
#[derive(Deserialize)]
struct AcceptArgs {
rid: i32,
transport: String,
}
fn op_accept(
fn accept_tcp(
state: &State,
args: Value,
args: AcceptArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
{
@ -102,20 +105,36 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
#[derive(Deserialize)]
struct ReceiveArgs {
rid: i32,
}
fn op_receive(
fn op_accept(
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"tcp" => accept_tcp(state, args, zero_copy),
#[cfg(unix)]
"unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy),
_ => Err(OpError::other(format!(
"Unsupported transport protocol {}",
args.transport
))),
}
}
#[derive(Deserialize)]
struct ReceiveArgs {
rid: i32,
transport: String,
}
fn receive_udp(
state: &State,
args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let mut buf = zero_copy.unwrap();
let args: ReceiveArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
@ -145,12 +164,32 @@ fn op_receive(
Ok(JsonOp::Async(op.boxed_local()))
}
fn op_receive(
state: &State,
args: Value,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"udp" => receive_udp(state, args, zero_copy),
#[cfg(unix)]
"unixpacket" => {
net_unix::receive_unix_packet(state, args.rid as u32, zero_copy)
}
_ => Err(OpError::other(format!(
"Unsupported transport protocol {}",
args.transport
))),
}
}
#[derive(Deserialize)]
struct SendArgs {
rid: i32,
hostname: String,
port: u16,
transport: String,
#[serde(flatten)]
transport_args: ArgsEnum,
}
fn op_send(
@ -160,38 +199,67 @@ fn op_send(
) -> Result<JsonOp, OpError> {
assert!(zero_copy.is_some());
let buf = zero_copy.unwrap();
let args: SendArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "udp");
let rid = args.rid as u32;
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
match serde_json::from_value(args)? {
SendArgs {
rid,
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "udp" => {
state.check_net(&args.hostname, args.port)?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid as u32)
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port).await?;
socket.send_to(&buf, addr).await?;
Ok(json!({}))
};
let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port).await?;
socket.send_to(&buf, addr).await?;
Ok(JsonOp::Async(op.boxed_local()))
}
#[cfg(unix)]
SendArgs {
rid,
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.address);
state.check_read(&address_path)?;
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
OpError::other("Socket has been closed".to_string())
})?;
Ok(json!({}))
};
let socket = &mut resource.socket;
socket
.send_to(&buf, &resource.local_addr.as_pathname().unwrap())
.await?;
Ok(JsonOp::Async(op.boxed_local()))
Ok(json!({}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
_ => Err(OpError::other("Wrong argument format!".to_owned())),
}
}
#[derive(Deserialize)]
struct ConnectArgs {
transport: String,
hostname: String,
port: u16,
#[serde(flatten)]
transport_args: ArgsEnum,
}
fn op_connect(
@ -199,39 +267,78 @@ fn op_connect(
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: ConnectArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = async move {
let addr = resolve_addr(&args.hostname, args.port).await?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": args.transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": args.transport,
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
match serde_json::from_value(args)? {
ConnectArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "tcp" => {
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = async move {
let addr = resolve_addr(&args.hostname, args.port).await?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": transport,
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
#[cfg(unix)]
ConnectArgs {
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" => {
let address_path = net_unix::Path::new(&args.address);
let state_ = state.clone();
state.check_read(&address_path)?;
let op = async move {
let address = args.address;
let unix_stream =
net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"address": local_addr.as_pathname(),
"transport": transport,
},
"remoteAddr": {
"address": remote_addr.as_pathname(),
"transport": transport,
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
_ => Err(OpError::other("Wrong argument format!".to_owned())),
}
}
#[derive(Deserialize)]
@ -265,19 +372,17 @@ fn op_shutdown(
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
#[cfg(unix)]
StreamResource::UnixStream(ref mut stream) => {
net_unix::UnixStream::shutdown(stream, shutdown_mode)
.map_err(OpError::from)?;
}
_ => return Err(OpError::bad_resource_id()),
}
Ok(JsonOp::Sync(json!({})))
}
#[derive(Deserialize)]
struct ListenArgs {
transport: String,
hostname: String,
port: u16,
}
#[allow(dead_code)]
struct TcpListenerResource {
listener: TcpListener,
@ -331,6 +436,27 @@ struct UdpSocketResource {
socket: UdpSocket,
}
#[derive(Deserialize)]
struct IpListenArgs {
hostname: String,
port: u16,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum ArgsEnum {
Ip(IpListenArgs),
#[cfg(unix)]
Unix(net_unix::UnixListenArgs),
}
#[derive(Deserialize)]
struct ListenArgs {
transport: String,
#[serde(flatten)]
transport_args: ArgsEnum,
}
fn listen_tcp(
state: &State,
addr: SocketAddr,
@ -370,33 +496,60 @@ fn op_listen(
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: ListenArgs = serde_json::from_value(args)?;
assert!(args.transport == "tcp" || args.transport == "udp");
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let (rid, local_addr) = if args.transport == "tcp" {
listen_tcp(state, addr)?
} else {
listen_udp(state, addr)?
};
debug!(
"New listener {} {}:{}",
rid,
local_addr.ip().to_string(),
local_addr.port()
);
Ok(JsonOp::Sync(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": args.transport,
},
})))
match serde_json::from_value(args)? {
ListenArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} => {
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let (rid, local_addr) = if transport == "tcp" {
listen_tcp(state, addr)?
} else {
listen_udp(state, addr)?
};
debug!(
"New listener {} {}:{}",
rid,
local_addr.ip().to_string(),
local_addr.port()
);
Ok(JsonOp::Sync(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
})))
}
#[cfg(unix)]
ListenArgs {
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" || transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.address);
state.check_read(&address_path)?;
let (rid, local_addr) = if transport == "unix" {
net_unix::listen_unix(state, &address_path)?
} else {
net_unix::listen_unix_packet(state, &address_path)?
};
debug!(
"New listener {} {}",
rid,
local_addr.as_pathname().unwrap().display(),
);
Ok(JsonOp::Sync(json!({
"rid": rid,
"localAddr": {
"address": local_addr.as_pathname(),
"transport": transport,
},
})))
}
#[cfg(unix)]
_ => Err(OpError::other("Wrong argument format!".to_owned())),
}
}

142
cli/ops/net_unix.rs Normal file
View file

@ -0,0 +1,142 @@
use super::dispatch_json::{Deserialize, JsonOp};
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::state::State;
use futures::future::FutureExt;
use deno_core::*;
use std::fs::remove_file;
use std::os::unix;
pub use std::path::Path;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
struct UnixListenerResource {
listener: UnixListener,
}
pub struct UnixDatagramResource {
pub socket: UnixDatagram,
pub local_addr: unix::net::SocketAddr,
}
#[derive(Deserialize)]
pub struct UnixListenArgs {
pub address: String,
}
pub fn accept_unix(
state: &State,
rid: u32,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let state_ = state.clone();
{
let state = state.borrow();
state
.resource_table
.get::<UnixListenerResource>(rid)
.ok_or_else(OpError::bad_resource_id)?;
}
let op = async move {
let mut state = state_.borrow_mut();
let listener_resource = state
.resource_table
.get_mut::<UnixListenerResource>(rid)
.ok_or_else(|| {
OpError::bad_resource("Listener has been closed".to_string())
})?;
let (unix_stream, _socket_addr) =
listener_resource.listener.accept().await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let rid = state.resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
"address": local_addr.as_pathname(),
"transport": "unix",
},
"remoteAddr": {
"address": remote_addr.as_pathname(),
"transport": "unix",
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
pub fn receive_unix_packet(
state: &State,
rid: u32,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let mut buf = zero_copy.unwrap();
let state_ = state.clone();
let op = async move {
let mut state = state_.borrow_mut();
let resource = state
.resource_table
.get_mut::<UnixDatagramResource>(rid)
.ok_or_else(|| {
OpError::bad_resource("Socket has been closed".to_string())
})?;
let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
Ok(json!({
"size": size,
"remoteAddr": {
"address": remote_addr.as_pathname(),
"transport": "unixpacket",
}
}))
};
Ok(JsonOp::Async(op.boxed_local()))
}
pub fn listen_unix(
state: &State,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), OpError> {
let mut state = state.borrow_mut();
if addr.exists() {
remove_file(&addr).unwrap();
}
let listener = UnixListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
let listener_resource = UnixListenerResource { listener };
let rid = state
.resource_table
.add("unixListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
pub fn listen_unix_packet(
state: &State,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), OpError> {
let mut state = state.borrow_mut();
if addr.exists() {
remove_file(&addr).unwrap();
}
let socket = UnixDatagram::bind(&addr)?;
let local_addr = socket.local_addr()?;
let datagram_resource = UnixDatagramResource {
socket,
local_addr: local_addr.clone(),
};
let rid = state
.resource_table
.add("unixDatagram", Box::new(datagram_resource));
Ok((rid, local_addr))
}

View file

@ -8,7 +8,6 @@ const listener = Deno.listen({ hostname, port: Number(port) });
const response = new TextEncoder().encode(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
);
async function handle(conn: Deno.Conn): Promise<void> {
const buffer = new Uint8Array(1024);
try {