refactor: remove "ext/flash" (#18578)

With https://github.com/denoland/deno/pull/18568 landed we no longer
need "ext/flash". 

This commit removes "deno_flash" extension completely.

This should have some impact on the binary and snapshot size.

Closes https://github.com/denoland/deno/issues/17356
This commit is contained in:
Bartek Iwańczuk 2023-04-03 19:01:02 +02:00 committed by GitHub
parent 2846bbe0a3
commit 51d3fb78ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 1 additions and 3125 deletions

20
Cargo.lock generated
View file

@ -957,25 +957,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "deno_flash"
version = "0.33.0"
dependencies = [
"deno_core",
"deno_tls",
"deno_websocket",
"http",
"httparse",
"libc",
"log",
"mio",
"rustls",
"rustls-pemfile",
"serde",
"socket2",
"tokio",
]
[[package]]
name = "deno_fs"
version = "0.7.0"
@ -1181,7 +1162,6 @@ dependencies = [
"deno_crypto",
"deno_fetch",
"deno_ffi",
"deno_flash",
"deno_fs",
"deno_http",
"deno_io",

View file

@ -18,7 +18,6 @@ members = [
"ext/console",
"ext/crypto",
"ext/fetch",
"ext/flash",
"ext/ffi",
"ext/fs",
"ext/http",
@ -62,7 +61,6 @@ deno_console = { version = "0.97.0", path = "./ext/console" }
deno_crypto = { version = "0.111.0", path = "./ext/crypto" }
deno_fetch = { version = "0.121.0", path = "./ext/fetch" }
deno_ffi = { version = "0.84.0", path = "./ext/ffi" }
deno_flash = { version = "0.33.0", path = "./ext/flash" }
deno_fs = { version = "0.7.0", path = "./ext/fs" }
deno_http = { version = "0.92.0", path = "./ext/http" }
deno_io = { version = "0.7.0", path = "./ext/io" }
@ -265,8 +263,6 @@ opt-level = 3
opt-level = 3
[profile.release.package.deno_http]
opt-level = 3
[profile.release.package.deno_flash]
opt-level = 3
[profile.release.package.deno_net]
opt-level = 3
[profile.release.package.deno_web]

View file

@ -1,37 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// deno-lint-ignore-file
const {
opAsync,
ops: { op_flash_make_request, op_flash_serve },
encode,
} = Deno[Deno.internal].core;
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const serverId = op_flash_serve({ hostname, port, reuseport: true });
const serverPromise = opAsync("op_flash_drive_server", serverId);
const fastOps = op_flash_make_request();
function nextRequest() {
return fastOps.nextRequest();
}
function respond(token, response) {
return fastOps.respond(token, response, true);
}
const response = encode(
"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World",
);
let offset = 0;
while (true) {
let token = nextRequest();
if (token === 0) token = await opAsync("op_flash_next_async", serverId);
for (let i = offset; i < offset + token; i++) {
respond(
i,
response,
);
}
offset += token;
}

View file

@ -1,18 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
if (Deno.build.os !== "linux") {
throw new Error("SO_REUSEPORT is only supported on Linux");
}
const executable = Deno.execPath();
const path = new URL("./deno_http_flash_ops.js", import.meta.url).pathname;
// single flash instance runs on ~1.8 cores
const cpus = navigator.hardwareConcurrency / 2;
const processes = new Array(cpus);
for (let i = 0; i < cpus; i++) {
const proc = Deno.run({
cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]],
});
processes.push(proc.status());
}
await Promise.all(processes);

View file

@ -1,16 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;
async function handler(request) {
try {
const buffer = await request.arrayBuffer();
return new Response(buffer.byteLength);
} catch (e) {
console.log(e);
}
}
serve(handler, { hostname, port });

View file

@ -1,5 +0,0 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"
file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")

View file

@ -1,18 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
if (Deno.build.os !== "linux") {
throw new Error("SO_REUSEPORT is only supported on Linux");
}
const executable = Deno.execPath();
const path = new URL("./deno_http_flash.js", import.meta.url).pathname;
// single flash instance runs on ~1.8 cores
const cpus = navigator.hardwareConcurrency / 2;
const processes = new Array(cpus);
for (let i = 0; i < cpus; i++) {
const proc = Deno.run({
cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]],
});
processes.push(proc.status());
}
await Promise.all(processes);

View file

@ -362,7 +362,6 @@ fn create_cli_snapshot(snapshot_path: PathBuf) {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Default::default()),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(false),
deno_flash::deno_flash::init_ops::<PermissionsContainer>(false), // No --unstable
deno_node::deno_node::init_ops::<deno_runtime::RuntimeNodeEnv>(None),
cli::init_ops_and_esm(), // NOTE: This needs to be init_ops_and_esm!
];

View file

@ -4230,13 +4230,6 @@ itest!(config_file_lock_true {
exit_code: 10,
});
// TODO(bartlomieju): this test is flaky on CI, reenable it after debugging
// // Check https://github.com/denoland/deno_std/issues/2882
// itest!(flash_shutdown {
// args: "run --unstable --allow-net run/flash_shutdown/main.ts",
// exit_code: 0,
// });
itest!(permission_args {
args: "run run/001_hello.js --allow-net",
output: "run/permission_args.out",

View file

@ -1,23 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Deno.serve caused segfault with this example after #16383
// refs:
// - https://github.com/denoland/deno/pull/16383
// - https://github.com/denoland/deno_std/issues/2882
// - revert https://github.com/denoland/deno/pull/16610
const ctl = new AbortController();
Deno.serve(() =>
new Promise((resolve) => {
resolve(new Response(new TextEncoder().encode("ok")));
ctl.abort();
}), {
signal: ctl.signal,
async onListen({ port }) {
const a = await fetch(`http://localhost:${port}`, {
method: "POST",
body: "",
});
await a.text();
},
});

View file

@ -1,793 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const core = globalThis.Deno.core;
const ops = core.ops;
const primordials = globalThis.__bootstrap.primordials;
import { BlobPrototype } from "ext:deno_web/09_file.js";
import { TcpConn } from "ext:deno_net/01_net.js";
import { toInnerResponse } from "ext:deno_fetch/23_response.js";
import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js";
import { Event } from "ext:deno_web/02_event.js";
import {
_state,
getReadableStreamResourceBacking,
ReadableStream,
readableStreamClose,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
import {
_eventLoop,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_protocol,
_readyState,
_rid,
_serverHandleIdleTimeout,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
import { _ws } from "ext:deno_http/01_http.js";
const {
ObjectPrototypeIsPrototypeOf,
PromisePrototype,
PromisePrototypeCatch,
PromisePrototypeThen,
SafePromiseAll,
TypedArrayPrototypeGetByteLength,
TypedArrayPrototypeGetSymbolToStringTag,
TypedArrayPrototypeSet,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = primordials;
const statusCodes = {
100: "Continue",
101: "Switching Protocols",
102: "Processing",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
207: "Multi-Status",
208: "Already Reported",
226: "IM Used",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Found",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
307: "Temporary Redirect",
308: "Permanent Redirect",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Timeout",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Payload Too Large",
414: "URI Too Long",
415: "Unsupported Media Type",
416: "Range Not Satisfiable",
418: "I'm a teapot",
421: "Misdirected Request",
422: "Unprocessable Entity",
423: "Locked",
424: "Failed Dependency",
426: "Upgrade Required",
428: "Precondition Required",
429: "Too Many Requests",
431: "Request Header Fields Too Large",
451: "Unavailable For Legal Reasons",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Timeout",
505: "HTTP Version Not Supported",
506: "Variant Also Negotiates",
507: "Insufficient Storage",
508: "Loop Detected",
510: "Not Extended",
511: "Network Authentication Required",
};
const methods = {
0: "GET",
1: "HEAD",
2: "CONNECT",
3: "PUT",
4: "DELETE",
5: "OPTIONS",
6: "TRACE",
7: "POST",
8: "PATCH",
};
let dateInterval;
let date;
/**
* Construct an HTTP response message.
* All HTTP/1.1 messages consist of a start-line followed by a sequence
* of octets.
*
* HTTP-message = start-line
* *( header-field CRLF )
* CRLF
* [ message-body ]
*
* @param {keyof typeof methods} method
* @param {keyof typeof statusCodes} status
* @param {[name: string, value: string][]} headerList
* @param {Uint8Array | string | null} body
* @param {number} bodyLen
* @param {boolean} earlyEnd
* @returns {Uint8Array | string}
*/
function http1Response(
method,
status,
headerList,
body,
bodyLen,
earlyEnd = false,
) {
// HTTP uses a "<major>.<minor>" numbering scheme
// HTTP-version = HTTP-name "/" DIGIT "." DIGIT
// HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
//
// status-line = HTTP-version SP status-code SP reason-phrase CRLF
// Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
for (let i = 0; i < headerList.length; ++i) {
const { 0: name, 1: value } = headerList[i];
// header-field = field-name ":" OWS field-value OWS
str += `${name}: ${value}\r\n`;
}
// https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
if (status === 205 || status === 304) {
// MUST NOT generate a payload in a 205 response.
// indicate a zero-length body for the response by
// including a Content-Length header field with a value of 0.
str += "Content-Length: 0\r\n\r\n";
return str;
}
// MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
if (status === 204 || status < 200) {
str += "\r\n";
return str;
}
if (earlyEnd === true) {
return str;
}
// null body status is validated by inititalizeAResponse in ext/fetch
if (body !== null && body !== undefined) {
str += `Content-Length: ${bodyLen}\r\n\r\n`;
} else {
str += "Transfer-Encoding: chunked\r\n\r\n";
return str;
}
// A HEAD request.
if (method === 1) return str;
if (typeof body === "string") {
str += body ?? "";
} else {
const head = core.encode(str);
const response = new Uint8Array(
TypedArrayPrototypeGetByteLength(head) + bodyLen,
);
TypedArrayPrototypeSet(response, head, 0);
TypedArrayPrototypeSet(
response,
body,
TypedArrayPrototypeGetByteLength(head),
);
return response;
}
return str;
}
function prepareFastCalls() {
return ops.op_flash_make_request();
}
function hostnameForDisplay(hostname) {
// If the hostname is "0.0.0.0", we display "localhost" in console
// because browsers in Windows don't resolve "0.0.0.0".
// See the discussion in https://github.com/denoland/deno_std/issues/1165
return hostname === "0.0.0.0" ? "localhost" : hostname;
}
function writeFixedResponse(
server,
requestId,
response,
responseLen,
end,
respondFast,
) {
let nwritten = 0;
// TypedArray
if (typeof response !== "string") {
nwritten = respondFast(requestId, response, end);
} else {
// string
nwritten = ops.op_flash_respond(
server,
requestId,
response,
end,
);
}
if (nwritten < responseLen) {
core.opAsync(
"op_flash_respond_async",
server,
requestId,
response.slice(nwritten),
end,
);
}
}
// TODO(@littledivy): Woah woah, cut down the number of arguments.
async function handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
tryRespondChunked,
) {
// there might've been an HTTP upgrade.
if (resp === undefined) {
return;
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and
// we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | string | null} */
let respBody = null;
let isStreamingResponseBody = false;
if (innerResp.body !== null) {
if (typeof innerResp.body.streamOrStatic?.body === "string") {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
isStreamingResponseBody = false;
} else if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
innerResp.body.streamOrStatic,
)
) {
if (innerResp.body.unusable()) {
throw new TypeError("Body is unusable.");
}
if (
innerResp.body.length === null ||
ObjectPrototypeIsPrototypeOf(
BlobPrototype,
innerResp.body.source,
)
) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
isStreamingResponseBody = !(
typeof respBody === "string" ||
TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array"
);
} else {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
const ws = resp[_ws];
if (isStreamingResponseBody === false) {
const length = typeof respBody === "string"
? core.byteLength(respBody)
: TypedArrayPrototypeGetByteLength(respBody);
const responseStr = http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody,
length,
);
// A HEAD request always ignores body, but includes the correct content-length size.
const responseLen = method === 1 ? core.byteLength(responseStr) : length;
writeFixedResponse(
serverId,
i,
responseStr,
responseLen,
!ws, // Don't close socket if there is a deferred websocket upgrade.
respondFast,
);
}
return (async () => {
if (!ws) {
if (hasBody && body[_state] !== "closed") {
// TODO(@littledivy): Optimize by draining in a single op.
try {
await req.arrayBuffer();
} catch { /* pass */ }
}
}
if (isStreamingResponseBody === true) {
const resourceBacking = getReadableStreamResourceBacking(respBody);
if (resourceBacking) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
PromisePrototypeThen(
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
0, // Content-Length will be set by the op.
true,
),
serverId,
i,
resourceBacking.rid,
resourceBacking.autoClose,
),
() => {
// Release JS lock.
readableStreamClose(respBody);
},
);
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
// Best case: sends headers + first chunk in a single go.
const { value, done } = await reader.read();
writeFixedResponse(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
// deno-lint-ignore prefer-primordials
respBody.byteLength,
),
// deno-lint-ignore prefer-primordials
respBody.byteLength,
false,
respondFast,
);
await tryRespondChunked(
i,
value,
done,
);
if (!done) {
while (true) {
const chunk = await reader.read();
await respondChunked(
i,
chunk.value,
chunk.done,
);
if (chunk.done) break;
}
}
}
}
if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
})();
}
function createServe(opFn) {
return async function serve(arg1, arg2) {
let options = undefined;
let handler = undefined;
if (typeof arg1 === "function") {
handler = arg1;
options = arg2;
} else if (typeof arg2 === "function") {
handler = arg2;
options = arg1;
} else {
options = arg1;
}
if (handler === undefined) {
if (options === undefined) {
throw new TypeError(
"No handler was provided, so an options bag is mandatory.",
);
}
handler = options.handler;
}
if (typeof handler !== "function") {
throw new TypeError("A handler function must be provided.");
}
if (options === undefined) {
options = {};
}
const signal = options.signal;
const onError = options.onError ?? function (error) {
console.error(error);
return new Response("Internal Server Error", { status: 500 });
};
const onListen = options.onListen ?? function ({ port }) {
console.log(
`Listening on http://${
hostnameForDisplay(listenOpts.hostname)
}:${port}/`,
);
};
const listenOpts = {
hostname: options.hostname ?? "127.0.0.1",
port: options.port ?? 9000,
reuseport: options.reusePort ?? false,
};
if (options.cert || options.key) {
if (!options.cert || !options.key) {
throw new TypeError(
"Both cert and key must be provided to enable HTTPS.",
);
}
listenOpts.cert = options.cert;
listenOpts.key = options.key;
}
const serverId = opFn(listenOpts);
const serverPromise = core.opAsync("op_flash_drive_server", serverId);
PromisePrototypeCatch(
PromisePrototypeThen(
core.opAsync("op_flash_wait_for_listening", serverId),
(port) => {
onListen({ hostname: listenOpts.hostname, port });
},
),
() => {},
);
const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
const server = {
id: serverId,
transport: listenOpts.cert && listenOpts.key ? "https" : "http",
hostname: listenOpts.hostname,
port: listenOpts.port,
closed: false,
finished: finishedPromise,
async close() {
if (server.closed) {
return;
}
server.closed = true;
await core.opAsync("op_flash_close_server", serverId);
await server.finished;
},
async serve() {
let offset = 0;
while (true) {
if (server.closed) {
break;
}
let tokens = nextRequestSync();
if (tokens === 0) {
tokens = await core.opAsync("op_flash_next_async", serverId);
if (server.closed) {
break;
}
}
for (let i = offset; i < offset + tokens; i++) {
let body = null;
// There might be a body, but we don't expose it for GET/HEAD requests.
// It will be closed automatically once the request has been handled and
// the response has been sent.
const method = getMethodSync(i);
let hasBody = method > 2; // Not GET/HEAD/CONNECT
if (hasBody) {
body = createRequestBodyStream(serverId, i);
if (body === null) {
hasBody = false;
}
}
const req = fromFlashRequest(
serverId,
/* streamRid */
i,
body,
/* methodCb */
() => methods[method],
/* urlCb */
() => {
const path = ops.op_flash_path(serverId, i);
return `${server.transport}://${server.hostname}:${server.port}${path}`;
},
/* headersCb */
() => ops.op_flash_headers(serverId, i),
);
let resp;
let remoteAddr;
try {
resp = handler(req, {
get remoteAddr() {
if (!remoteAddr) {
const { 0: hostname, 1: port } = core.ops.op_flash_addr(
serverId,
i,
);
remoteAddr = { hostname, port };
}
return remoteAddr;
},
});
if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
PromisePrototypeCatch(
PromisePrototypeThen(
resp,
(resp) =>
handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
tryRespondChunked,
),
),
onError,
);
} else if (typeof resp?.then === "function") {
resp.then((resp) =>
handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
tryRespondChunked,
)
).catch(onError);
} else {
handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
tryRespondChunked,
).catch(onError);
}
} catch (e) {
resp = await onError(e);
}
}
offset += tokens;
}
await server.finished;
},
};
signal?.addEventListener("abort", () => {
clearInterval(dateInterval);
PromisePrototypeThen(server.close(), () => {}, () => {});
}, {
once: true,
});
function tryRespondChunked(token, chunk, shutdown) {
const nwritten = ops.op_try_flash_respond_chunked(
serverId,
token,
chunk ?? new Uint8Array(),
shutdown,
);
if (nwritten > 0) {
return core.opAsync(
"op_flash_respond_chunked",
serverId,
token,
chunk,
shutdown,
nwritten,
);
}
}
function respondChunked(token, chunk, shutdown) {
return core.opAsync(
"op_flash_respond_chunked",
serverId,
token,
chunk,
shutdown,
);
}
const fastOp = prepareFastCalls();
let nextRequestSync = () => fastOp.nextRequest();
let getMethodSync = (token) => fastOp.getMethod(token);
let respondFast = (token, response, shutdown) =>
fastOp.respond(token, response, shutdown);
if (serverId > 0) {
nextRequestSync = () => ops.op_flash_next_server(serverId);
getMethodSync = (token) => ops.op_flash_method(serverId, token);
respondFast = (token, response, shutdown) =>
ops.op_flash_respond(serverId, token, response, null, shutdown);
}
if (!dateInterval) {
date = new Date().toUTCString();
dateInterval = setInterval(() => {
date = new Date().toUTCString();
}, 1000);
}
await SafePromiseAll([
PromisePrototypeCatch(server.serve(), console.error),
serverPromise,
]);
};
}
function createRequestBodyStream(serverId, token) {
// The first packet is left over bytes after parsing the request
const firstRead = ops.op_flash_first_packet(
serverId,
token,
);
if (!firstRead) return null;
let firstEnqueued = TypedArrayPrototypeGetByteLength(firstRead) === 0;
return new ReadableStream({
type: "bytes",
async pull(controller) {
try {
if (firstEnqueued === false) {
controller.enqueue(firstRead);
firstEnqueued = true;
return;
}
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await core.opAsync(
"op_flash_read_body",
serverId,
token,
chunk,
);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
}
},
});
}
function upgradeHttpRaw(req) {
if (!req[_flash]) {
throw new TypeError(
"Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.",
);
}
// NOTE(bartlomieju):
// Access these fields so they are cached on `req` object, otherwise
// they wouldn't be available after the connection gets upgraded.
req.url;
req.method;
req.headers;
const { serverId, streamRid } = req[_flash];
const connRid = ops.op_flash_upgrade_http(streamRid, serverId);
// TODO(@littledivy): return already read first packet too.
return [new TcpConn(connRid), new Uint8Array()];
}
export { createServe, upgradeHttpRaw };

View file

@ -1,30 +0,0 @@
# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
[package]
name = "deno_flash"
version = "0.33.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
repository.workspace = true
description = "Fast HTTP/1 server implementation for Deno"
[lib]
path = "lib.rs"
[dependencies]
deno_core.workspace = true
deno_tls.workspace = true
# For HTTP/2 and websocket upgrades
deno_websocket.workspace = true
http.workspace = true
httparse = "1.8"
libc.workspace = true
log.workspace = true
mio = { version = "0.8.1", features = ["os-poll", "net"] }
rustls.workspace = true
rustls-pemfile.workspace = true
serde.workspace = true
socket2.workspace = true
tokio.workspace = true

View file

@ -1,7 +0,0 @@
# flash
Flash is a fast HTTP/1.1 server implementation for Deno.
```js
serve({ fetch: (req) => new Response("Hello World") });
```

View file

@ -1,273 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
//
// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs
// Copyright 2015 The tiny-http Contributors
// Copyright 2015 The rust-chunked-transfer Contributors
use std::error::Error;
use std::fmt;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Result as IoResult;
pub struct Decoder<R> {
pub source: R,
// remaining size of the chunk being read
// none if we are not in a chunk
pub remaining_chunks_size: Option<usize>,
pub end: bool,
}
impl<R> Decoder<R>
where
R: Read,
{
pub fn new(source: R, remaining_chunks_size: Option<usize>) -> Decoder<R> {
Decoder {
source,
remaining_chunks_size,
end: false,
}
}
fn read_chunk_size(&mut self) -> IoResult<usize> {
let mut chunk_size_bytes = Vec::new();
let mut has_ext = false;
loop {
let byte = match self.source.by_ref().bytes().next() {
Some(b) => b?,
None => {
return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
}
};
if byte == b'\r' {
break;
}
if byte == b';' {
has_ext = true;
break;
}
chunk_size_bytes.push(byte);
}
// Ignore extensions for now
if has_ext {
loop {
let byte = match self.source.by_ref().bytes().next() {
Some(b) => b?,
None => {
return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
}
};
if byte == b'\r' {
break;
}
}
}
self.read_line_feed()?;
let chunk_size = String::from_utf8(chunk_size_bytes)
.ok()
.and_then(|c| usize::from_str_radix(c.trim(), 16).ok())
.ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?;
Ok(chunk_size)
}
fn read_carriage_return(&mut self) -> IoResult<()> {
match self.source.by_ref().bytes().next() {
Some(Ok(b'\r')) => Ok(()),
_ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
}
}
fn read_line_feed(&mut self) -> IoResult<()> {
match self.source.by_ref().bytes().next() {
Some(Ok(b'\n')) => Ok(()),
_ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
}
}
}
impl<R> Read for Decoder<R>
where
R: Read,
{
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let remaining_chunks_size = match self.remaining_chunks_size {
Some(c) => c,
None => {
// first possibility: we are not in a chunk, so we'll attempt to determine
// the chunks size
let chunk_size = self.read_chunk_size()?;
// if the chunk size is 0, we are at EOF
if chunk_size == 0 {
self.read_carriage_return()?;
self.read_line_feed()?;
self.end = true;
return Ok(0);
}
chunk_size
}
};
// second possibility: we continue reading from a chunk
if buf.len() < remaining_chunks_size {
let read = self.source.read(buf)?;
self.remaining_chunks_size = Some(remaining_chunks_size - read);
return Ok(read);
}
// third possibility: the read request goes further than the current chunk
// we simply read until the end of the chunk and return
let buf = &mut buf[..remaining_chunks_size];
let read = self.source.read(buf)?;
self.remaining_chunks_size = if read == remaining_chunks_size {
self.read_carriage_return()?;
self.read_line_feed()?;
None
} else {
Some(remaining_chunks_size - read)
};
Ok(read)
}
}
#[derive(Debug, Copy, Clone)]
struct DecoderError;
impl fmt::Display for DecoderError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(fmt, "Error while decoding chunks")
}
}
impl Error for DecoderError {
fn description(&self) -> &str {
"Error while decoding chunks"
}
}
#[cfg(test)]
mod test {
use super::Decoder;
use std::io;
use std::io::Read;
/// This unit test is taken from from Hyper
/// https://github.com/hyperium/hyper
/// Copyright (c) 2014 Sean McArthur
#[test]
fn test_read_chunk_size() {
fn read(s: &str, expected: usize) {
let mut decoded = Decoder::new(s.as_bytes(), None);
let actual = decoded.read_chunk_size().unwrap();
assert_eq!(expected, actual);
}
fn read_err(s: &str) {
let mut decoded = Decoder::new(s.as_bytes(), None);
let err_kind = decoded.read_chunk_size().unwrap_err().kind();
assert_eq!(err_kind, io::ErrorKind::InvalidInput);
}
read("1\r\n", 1);
read("01\r\n", 1);
read("0\r\n", 0);
read("00\r\n", 0);
read("A\r\n", 10);
read("a\r\n", 10);
read("Ff\r\n", 255);
read("Ff \r\n", 255);
// Missing LF or CRLF
read_err("F\rF");
read_err("F");
// Invalid hex digit
read_err("X\r\n");
read_err("1X\r\n");
read_err("-\r\n");
read_err("-1\r\n");
// Acceptable (if not fully valid) extensions do not influence the size
read("1;extension\r\n", 1);
read("a;ext name=value\r\n", 10);
read("1;extension;extension2\r\n", 1);
read("1;;; ;\r\n", 1);
read("2; extension...\r\n", 2);
read("3 ; extension=123\r\n", 3);
read("3 ;\r\n", 3);
read("3 ; \r\n", 3);
// Invalid extensions cause an error
read_err("1 invalid extension\r\n");
read_err("1 A\r\n");
read_err("1;no CRLF");
}
#[test]
fn test_valid_chunk_decode() {
let source = io::Cursor::new(
"3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
decoded.read_to_string(&mut string).unwrap();
assert_eq!(string, "hello world!!!");
}
#[test]
fn test_decode_zero_length() {
let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None);
let mut decoded = String::new();
decoder.read_to_string(&mut decoded).unwrap();
assert_eq!(decoded, "");
}
#[test]
fn test_decode_invalid_chunk_length() {
let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None);
let mut decoded = String::new();
assert!(decoder.read_to_string(&mut decoded).is_err());
}
#[test]
fn invalid_input1() {
let source = io::Cursor::new(
"2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
assert!(decoded.read_to_string(&mut string).is_err());
}
#[test]
fn invalid_input2() {
let source = io::Cursor::new(
"3\rhel\r\nb\r\nlo world!!!\r\n0\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
assert!(decoded.read_to_string(&mut string).is_err());
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,49 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::Stream;
use std::pin::Pin;
#[derive(Debug)]
pub struct InnerRequest {
/// Backing buffer for the request.
pub buffer: Pin<Box<[u8]>>,
/// Owned headers, we have to keep it around since its referenced in `req`.
pub _headers: Vec<httparse::Header<'static>>,
/// Fully parsed request.
pub req: httparse::Request<'static, 'static>,
pub body_offset: usize,
pub body_len: usize,
}
#[derive(Debug)]
pub struct Request {
pub inner: InnerRequest,
// Pointer to stream owned by the server loop thread.
//
// Dereferencing is safe until server thread finishes and
// op_flash_serve resolves or websocket upgrade is performed.
pub socket: *mut Stream,
pub keep_alive: bool,
pub content_read: usize,
pub content_length: Option<u64>,
pub remaining_chunk_size: Option<usize>,
pub te_chunked: bool,
pub expect_continue: bool,
}
// SAFETY: Sent from server thread to JS thread.
// See comment above for `socket`.
unsafe impl Send for Request {}
impl Request {
#[inline(always)]
pub fn socket<'a>(&self) -> &'a mut Stream {
// SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
unsafe { &mut *self.socket }
}
#[inline(always)]
pub fn method(&self) -> &str {
self.inner.req.method.unwrap()
}
}

View file

@ -1,82 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs
use std::future::Future;
use std::io;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::task::Poll;
use std::task::{self};
pub struct SendFile {
pub io: (RawFd, RawFd),
pub written: usize,
}
impl SendFile {
#[inline]
pub fn try_send(&mut self) -> Result<usize, std::io::Error> {
#[cfg(target_os = "linux")]
{
// This is the maximum the Linux kernel will write in a single call.
let count = 0x7ffff000;
let mut offset = self.written as libc::off_t;
let res =
// SAFETY: call to libc::sendfile()
unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) };
if res == -1 {
Err(io::Error::last_os_error())
} else {
self.written = offset as usize;
Ok(res as usize)
}
}
#[cfg(target_os = "macos")]
{
// Send all bytes.
let mut length = 0;
// On macOS `length` is value-result parameter. It determines the number
// of bytes to write and returns the number of bytes written also in
// case of `EAGAIN` errors.
// SAFETY: call to libc::sendfile()
let res = unsafe {
libc::sendfile(
self.io.0,
self.io.1,
self.written as libc::off_t,
&mut length,
std::ptr::null_mut(),
0,
)
};
self.written += length as usize;
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(length as usize)
}
}
}
}
impl Future for SendFile {
type Output = Result<(), std::io::Error>;
fn poll(
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<Self::Output> {
loop {
match self.try_send() {
Ok(0) => break Poll::Ready(Ok(())),
Ok(_) => continue,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
break Poll::Pending
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again.
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}

View file

@ -1,151 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::UnsafeCell;
use std::future::Future;
use std::io::Read;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use deno_core::error::AnyError;
use mio::net::TcpStream;
use tokio::sync::mpsc;
use crate::ParseStatus;
type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
pub enum InnerStream {
Tcp(TcpStream),
Tls(Box<TlsTcpStream>),
}
pub struct Stream {
pub inner: InnerStream,
pub detached: bool,
pub read_rx: Option<mpsc::Receiver<()>>,
pub read_tx: Option<mpsc::Sender<()>>,
pub parse_done: ParseStatus,
pub buffer: UnsafeCell<Vec<u8>>,
pub read_lock: Arc<Mutex<()>>,
pub addr: std::net::SocketAddr,
}
impl Stream {
pub fn detach_ownership(&mut self) {
self.detached = true;
}
/// Try to write to the socket.
#[inline]
pub fn try_write(&mut self, buf: &[u8]) -> usize {
let mut nwritten = 0;
while nwritten < buf.len() {
match self.write(&buf[nwritten..]) {
Ok(n) => nwritten += n,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break;
}
Err(e) => {
log::trace!("Error writing to socket: {}", e);
break;
}
}
}
nwritten
}
#[inline]
pub fn shutdown(&mut self) {
match &mut self.inner {
InnerStream::Tcp(stream) => {
// Typically shutdown shouldn't fail.
let _ = stream.shutdown(std::net::Shutdown::Both);
}
InnerStream::Tls(stream) => {
let _ = stream.sock.shutdown(std::net::Shutdown::Both);
}
}
}
pub fn as_std(&mut self) -> std::net::TcpStream {
#[cfg(unix)]
let std_stream = {
use std::os::unix::prelude::AsRawFd;
use std::os::unix::prelude::FromRawFd;
let fd = match self.inner {
InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
_ => todo!(),
};
// SAFETY: `fd` is a valid file descriptor.
unsafe { std::net::TcpStream::from_raw_fd(fd) }
};
#[cfg(windows)]
let std_stream = {
use std::os::windows::prelude::AsRawSocket;
use std::os::windows::prelude::FromRawSocket;
let fd = match self.inner {
InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
_ => todo!(),
};
// SAFETY: `fd` is a valid file descriptor.
unsafe { std::net::TcpStream::from_raw_socket(fd) }
};
std_stream
}
#[inline]
pub async fn with_async_stream<F, T>(&mut self, f: F) -> Result<T, AnyError>
where
F: FnOnce(
&mut tokio::net::TcpStream,
) -> Pin<Box<dyn '_ + Future<Output = Result<T, AnyError>>>>,
{
let mut async_stream = tokio::net::TcpStream::from_std(self.as_std())?;
let result = f(&mut async_stream).await?;
forget_stream(async_stream.into_std()?);
Ok(result)
}
}
#[inline]
pub fn forget_stream(stream: std::net::TcpStream) {
#[cfg(unix)]
{
use std::os::unix::prelude::IntoRawFd;
let _ = stream.into_raw_fd();
}
#[cfg(windows)]
{
use std::os::windows::prelude::IntoRawSocket;
let _ = stream.into_raw_socket();
}
}
impl Write for Stream {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self.inner {
InnerStream::Tcp(ref mut stream) => stream.write(buf),
InnerStream::Tls(ref mut stream) => stream.write(buf),
}
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
match self.inner {
InnerStream::Tcp(ref mut stream) => stream.flush(),
InnerStream::Tls(ref mut stream) => stream.flush(),
}
}
}
impl Read for Stream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.inner {
InnerStream::Tcp(ref mut stream) => stream.read(buf),
InnerStream::Tls(ref mut stream) => stream.read(buf),
}
}
}

View file

@ -41,7 +41,6 @@ deno_core.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
deno_flash.workspace = true
deno_fs.workspace = true
deno_http.workspace = true
deno_io.workspace = true
@ -68,7 +67,6 @@ deno_core.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
deno_flash.workspace = true
deno_fs.workspace = true
deno_http.workspace = true
deno_io.workspace = true

View file

@ -120,16 +120,6 @@ mod startup_snapshot {
}
}
impl deno_flash::FlashPermissions for Permissions {
fn check_net<T: AsRef<str>>(
&mut self,
_host: &(T, Option<u16>),
_api_name: &str,
) -> Result<(), deno_core::error::AnyError> {
unreachable!("snapshotting!")
}
}
impl deno_node::NodePermissions for Permissions {
fn check_read(
&mut self,
@ -244,7 +234,6 @@ mod startup_snapshot {
deno_net,
deno_napi,
deno_http,
deno_flash,
deno_io,
deno_fs
],
@ -322,7 +311,6 @@ mod startup_snapshot {
deno_http::deno_http::init_ops_and_esm(),
deno_io::deno_io::init_ops_and_esm(Default::default()),
deno_fs::deno_fs::init_ops_and_esm::<Permissions>(false),
deno_flash::deno_flash::init_ops_and_esm::<Permissions>(false), // No --unstable
runtime::init_ops_and_esm(),
// FIXME(bartlomieju): these extensions are specified last, because they
// depend on `runtime`, even though it should be other way around

View file

@ -9,7 +9,6 @@ import * as ffi from "ext:deno_ffi/00_ffi.js";
import * as net from "ext:deno_net/01_net.js";
import * as tls from "ext:deno_net/02_tls.js";
import * as http from "ext:deno_http/01_http.js";
import * as flash from "ext:deno_flash/01_http.js";
import * as errors from "ext:runtime/01_errors.js";
import * as version from "ext:runtime/01_version.ts";
import * as permissions from "ext:runtime/10_permissions.js";
@ -172,7 +171,6 @@ const denoNsUnstable = {
funlock: fs.funlock,
funlockSync: fs.funlockSync,
upgradeHttp: http.upgradeHttp,
upgradeHttpRaw: flash.upgradeHttpRaw,
serve: http.serve,
openKv: kv.openKv,
Kv: kv.Kv,

View file

@ -7,7 +7,6 @@ pub use deno_core;
pub use deno_crypto;
pub use deno_fetch;
pub use deno_ffi;
pub use deno_flash;
pub use deno_fs;
pub use deno_http;
pub use deno_io;

View file

@ -29,7 +29,7 @@ use tokio::net::UnixStream;
deno_core::extension!(
deno_http_runtime,
ops = [op_http_start, op_http_upgrade, op_flash_upgrade_http],
ops = [op_http_start, op_http_upgrade],
customizer = |ext: &mut deno_core::ExtensionBuilder| {
ext.force_op_registration();
},
@ -91,23 +91,6 @@ fn op_http_start(
Err(bad_resource_id())
}
#[op]
fn op_flash_upgrade_http(
state: &mut OpState,
token: u32,
server_id: u32,
) -> Result<deno_core::ResourceId, AnyError> {
let flash_ctx = state.borrow_mut::<deno_flash::FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tcp_stream = deno_flash::detach_socket(ctx, token)?;
Ok(
state
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())),
)
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeResult {

View file

@ -1826,17 +1826,6 @@ impl PermissionsContainer {
}
}
impl deno_flash::FlashPermissions for PermissionsContainer {
#[inline(always)]
fn check_net<T: AsRef<str>>(
&mut self,
host: &(T, Option<u16>),
api_name: &str,
) -> Result<(), AnyError> {
self.0.lock().net.check(host, Some(api_name))
}
}
impl deno_node::NodePermissions for PermissionsContainer {
#[inline(always)]
fn check_read(&mut self, path: &Path) -> Result<(), AnyError> {

View file

@ -440,7 +440,6 @@ impl WebWorker {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(unstable),
deno_flash::deno_flash::init_ops::<PermissionsContainer>(unstable),
deno_node::deno_node::init_ops::<crate::RuntimeNodeEnv>(
options.npm_resolver,
),

View file

@ -264,7 +264,6 @@ impl MainWorker {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(unstable),
deno_flash::deno_flash::init_ops::<PermissionsContainer>(unstable),
deno_node::deno_node::init_ops::<crate::RuntimeNodeEnv>(
options.npm_resolver,
),