mirror of
https://github.com/denoland/deno
synced 2024-11-05 18:45:24 +00:00
fix: [http] Consume unread body and trailers before reading next request (#3990)
- Added `ServerRequest.finalize()`: consuming all unread body stream and trailers. - This is cleanup method for reading next request from same keep-alive connection. - Needed when handler didn't consume all body and trailers even after responding. - refactor: `ServerRequest._bodyStream()`, `ServerRequestBody` are removed. - Now using `bodyReader()` and `chunkedBodyReader()` instead. - fix: Trailers should only be read `transfer-encoding` is `chunked` and `trailer` header is set and its value is valid. - fix: use `Headers.append()` on reading trailers. - fix: delete `trailer` field from headers after reading trailers. - reorg: Several functions related to IO are moved into `http/io.ts`
This commit is contained in:
parent
5c1ab080cd
commit
22f88b9f37
7 changed files with 555 additions and 247 deletions
|
@ -131,6 +131,7 @@ mod tests {
|
|||
let expected: Vec<Url> = vec![
|
||||
format!("{}/cookie_test.ts", root_url),
|
||||
format!("{}/file_server_test.ts", root_url),
|
||||
format!("{}/io_test.ts", root_url),
|
||||
format!("{}/racing_server_test.ts", root_url),
|
||||
format!("{}/server_test.ts", root_url),
|
||||
]
|
||||
|
|
213
std/http/io.ts
Normal file
213
std/http/io.ts
Normal file
|
@ -0,0 +1,213 @@
|
|||
import { BufReader, UnexpectedEOFError, BufWriter } from "../io/bufio.ts";
|
||||
import { TextProtoReader } from "../textproto/mod.ts";
|
||||
import { assert } from "../testing/asserts.ts";
|
||||
import { encoder } from "../strings/mod.ts";
|
||||
|
||||
export function emptyReader(): Deno.Reader {
|
||||
return {
|
||||
async read(_: Uint8Array): Promise<number | Deno.EOF> {
|
||||
return Deno.EOF;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function bodyReader(contentLength: number, r: BufReader): Deno.Reader {
|
||||
let totalRead = 0;
|
||||
let finished = false;
|
||||
async function read(buf: Uint8Array): Promise<number | Deno.EOF> {
|
||||
if (finished) return Deno.EOF;
|
||||
let result: number | Deno.EOF;
|
||||
const remaining = contentLength - totalRead;
|
||||
if (remaining >= buf.byteLength) {
|
||||
result = await r.read(buf);
|
||||
} else {
|
||||
const readBuf = buf.subarray(0, remaining);
|
||||
result = await r.read(readBuf);
|
||||
}
|
||||
if (result !== Deno.EOF) {
|
||||
totalRead += result;
|
||||
}
|
||||
finished = totalRead === contentLength;
|
||||
return result;
|
||||
}
|
||||
return { read };
|
||||
}
|
||||
|
||||
export function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
|
||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
const tp = new TextProtoReader(r);
|
||||
let finished = false;
|
||||
const chunks: Array<{
|
||||
offset: number;
|
||||
data: Uint8Array;
|
||||
}> = [];
|
||||
async function read(buf: Uint8Array): Promise<number | Deno.EOF> {
|
||||
if (finished) return Deno.EOF;
|
||||
const [chunk] = chunks;
|
||||
if (chunk) {
|
||||
const chunkRemaining = chunk.data.byteLength - chunk.offset;
|
||||
const readLength = Math.min(chunkRemaining, buf.byteLength);
|
||||
for (let i = 0; i < readLength; i++) {
|
||||
buf[i] = chunk.data[chunk.offset + i];
|
||||
}
|
||||
chunk.offset += readLength;
|
||||
if (chunk.offset === chunk.data.byteLength) {
|
||||
chunks.shift();
|
||||
// Consume \r\n;
|
||||
if ((await tp.readLine()) === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
}
|
||||
return readLength;
|
||||
}
|
||||
const line = await tp.readLine();
|
||||
if (line === Deno.EOF) throw new UnexpectedEOFError();
|
||||
// TODO: handle chunk extension
|
||||
const [chunkSizeString] = line.split(";");
|
||||
const chunkSize = parseInt(chunkSizeString, 16);
|
||||
if (Number.isNaN(chunkSize) || chunkSize < 0) {
|
||||
throw new Error("Invalid chunk size");
|
||||
}
|
||||
if (chunkSize > 0) {
|
||||
if (chunkSize > buf.byteLength) {
|
||||
let eof = await r.readFull(buf);
|
||||
if (eof === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
const restChunk = new Uint8Array(chunkSize - buf.byteLength);
|
||||
eof = await r.readFull(restChunk);
|
||||
if (eof === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
} else {
|
||||
chunks.push({
|
||||
offset: 0,
|
||||
data: restChunk
|
||||
});
|
||||
}
|
||||
return buf.byteLength;
|
||||
} else {
|
||||
const bufToFill = buf.subarray(0, chunkSize);
|
||||
const eof = await r.readFull(bufToFill);
|
||||
if (eof === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
// Consume \r\n
|
||||
if ((await tp.readLine()) === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
return chunkSize;
|
||||
}
|
||||
} else {
|
||||
assert(chunkSize === 0);
|
||||
// Consume \r\n
|
||||
if ((await r.readLine()) === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
await readTrailers(h, r);
|
||||
finished = true;
|
||||
return Deno.EOF;
|
||||
}
|
||||
}
|
||||
return { read };
|
||||
}
|
||||
|
||||
const kProhibitedTrailerHeaders = [
|
||||
"transfer-encoding",
|
||||
"content-length",
|
||||
"trailer"
|
||||
];
|
||||
|
||||
/**
|
||||
* Read trailer headers from reader and append values to headers.
|
||||
* "trailer" field will be deleted.
|
||||
* */
|
||||
export async function readTrailers(
|
||||
headers: Headers,
|
||||
r: BufReader
|
||||
): Promise<void> {
|
||||
const keys = parseTrailer(headers.get("trailer"));
|
||||
if (!keys) return;
|
||||
const tp = new TextProtoReader(r);
|
||||
const result = await tp.readMIMEHeader();
|
||||
assert(result != Deno.EOF, "trailer must be set");
|
||||
for (const [k, v] of result) {
|
||||
if (!keys.has(k)) {
|
||||
throw new Error("Undeclared trailer field");
|
||||
}
|
||||
keys.delete(k);
|
||||
headers.append(k, v);
|
||||
}
|
||||
assert(keys.size === 0, "Missing trailers");
|
||||
headers.delete("trailer");
|
||||
}
|
||||
|
||||
function parseTrailer(field: string | null): Set<string> | undefined {
|
||||
if (field == null) {
|
||||
return undefined;
|
||||
}
|
||||
const keys = field.split(",").map(v => v.trim());
|
||||
if (keys.length === 0) {
|
||||
throw new Error("Empty trailer");
|
||||
}
|
||||
for (const invalid of kProhibitedTrailerHeaders) {
|
||||
if (keys.includes(invalid)) {
|
||||
throw new Error(`Prohibited field for trailer`);
|
||||
}
|
||||
}
|
||||
return new Set(keys);
|
||||
}
|
||||
|
||||
export async function writeChunkedBody(
|
||||
w: Deno.Writer,
|
||||
r: Deno.Reader
|
||||
): Promise<void> {
|
||||
const writer = BufWriter.create(w);
|
||||
for await (const chunk of Deno.toAsyncIterator(r)) {
|
||||
if (chunk.byteLength <= 0) continue;
|
||||
const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
|
||||
const end = encoder.encode("\r\n");
|
||||
await writer.write(start);
|
||||
await writer.write(chunk);
|
||||
await writer.write(end);
|
||||
}
|
||||
|
||||
const endChunk = encoder.encode("0\r\n\r\n");
|
||||
await writer.write(endChunk);
|
||||
}
|
||||
|
||||
/** write trailer headers to writer. it mostly should be called after writeResponse */
|
||||
export async function writeTrailers(
|
||||
w: Deno.Writer,
|
||||
headers: Headers,
|
||||
trailers: Headers
|
||||
): Promise<void> {
|
||||
const trailer = headers.get("trailer");
|
||||
if (trailer === null) {
|
||||
throw new Error('response headers must have "trailer" header field');
|
||||
}
|
||||
const transferEncoding = headers.get("transfer-encoding");
|
||||
if (transferEncoding === null || !transferEncoding.match(/^chunked/)) {
|
||||
throw new Error(
|
||||
`trailer headers is only allowed for "transfer-encoding: chunked": got "${transferEncoding}"`
|
||||
);
|
||||
}
|
||||
const writer = BufWriter.create(w);
|
||||
const trailerHeaderFields = trailer
|
||||
.split(",")
|
||||
.map(s => s.trim().toLowerCase());
|
||||
for (const f of trailerHeaderFields) {
|
||||
assert(
|
||||
!kProhibitedTrailerHeaders.includes(f),
|
||||
`"${f}" is prohibited for trailer header`
|
||||
);
|
||||
}
|
||||
for (const [key, value] of trailers) {
|
||||
assert(
|
||||
trailerHeaderFields.includes(key),
|
||||
`Not trailer header field: ${key}`
|
||||
);
|
||||
await writer.write(encoder.encode(`${key}: ${value}\r\n`));
|
||||
}
|
||||
await writer.write(encoder.encode("\r\n"));
|
||||
await writer.flush();
|
||||
}
|
167
std/http/io_test.ts
Normal file
167
std/http/io_test.ts
Normal file
|
@ -0,0 +1,167 @@
|
|||
import {
|
||||
AssertionError,
|
||||
assertThrowsAsync,
|
||||
assertEquals
|
||||
} from "../testing/asserts.ts";
|
||||
import { bodyReader, writeTrailers, readTrailers } from "./io.ts";
|
||||
import { encode, decode } from "../strings/mod.ts";
|
||||
import { BufReader } from "../io/bufio.ts";
|
||||
import { chunkedBodyReader } from "./io.ts";
|
||||
const { test, Buffer } = Deno;
|
||||
|
||||
test("bodyReader", async () => {
|
||||
const text = "Hello, Deno";
|
||||
const r = bodyReader(text.length, new BufReader(new Buffer(encode(text))));
|
||||
assertEquals(decode(await Deno.readAll(r)), text);
|
||||
});
|
||||
function chunkify(n: number, char: string): string {
|
||||
const v = Array.from({ length: n })
|
||||
.map(() => `${char}`)
|
||||
.join("");
|
||||
return `${n.toString(16)}\r\n${v}\r\n`;
|
||||
}
|
||||
test("chunkedBodyReader", async () => {
|
||||
const body = [
|
||||
chunkify(3, "a"),
|
||||
chunkify(5, "b"),
|
||||
chunkify(11, "c"),
|
||||
chunkify(22, "d"),
|
||||
chunkify(0, "")
|
||||
].join("");
|
||||
const h = new Headers();
|
||||
const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body))));
|
||||
let result: number | Deno.EOF;
|
||||
// Use small buffer as some chunks exceed buffer size
|
||||
const buf = new Uint8Array(5);
|
||||
const dest = new Buffer();
|
||||
while ((result = await r.read(buf)) !== Deno.EOF) {
|
||||
const len = Math.min(buf.byteLength, result);
|
||||
await dest.write(buf.subarray(0, len));
|
||||
}
|
||||
const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd";
|
||||
assertEquals(dest.toString(), exp);
|
||||
});
|
||||
|
||||
test("chunkedBodyReader with trailers", async () => {
|
||||
const body = [
|
||||
chunkify(3, "a"),
|
||||
chunkify(5, "b"),
|
||||
chunkify(11, "c"),
|
||||
chunkify(22, "d"),
|
||||
chunkify(0, ""),
|
||||
"deno: land\r\n",
|
||||
"node: js\r\n",
|
||||
"\r\n"
|
||||
].join("");
|
||||
const h = new Headers({
|
||||
trailer: "deno,node"
|
||||
});
|
||||
const r = chunkedBodyReader(h, new BufReader(new Buffer(encode(body))));
|
||||
assertEquals(h.has("trailer"), true);
|
||||
assertEquals(h.has("deno"), false);
|
||||
assertEquals(h.has("node"), false);
|
||||
const act = decode(await Deno.readAll(r));
|
||||
const exp = "aaabbbbbcccccccccccdddddddddddddddddddddd";
|
||||
assertEquals(act, exp);
|
||||
assertEquals(h.has("trailer"), false);
|
||||
assertEquals(h.get("deno"), "land");
|
||||
assertEquals(h.get("node"), "js");
|
||||
});
|
||||
|
||||
test("readTrailers", async () => {
|
||||
const h = new Headers({
|
||||
trailer: "deno,node"
|
||||
});
|
||||
const trailer = ["deno: land", "node: js", "", ""].join("\r\n");
|
||||
await readTrailers(h, new BufReader(new Buffer(encode(trailer))));
|
||||
assertEquals(h.has("trailer"), false);
|
||||
assertEquals(h.get("deno"), "land");
|
||||
assertEquals(h.get("node"), "js");
|
||||
});
|
||||
|
||||
test("readTrailer should throw if undeclared headers found in trailer", async () => {
|
||||
const patterns = [
|
||||
["deno,node", "deno: land\r\nnode: js\r\ngo: lang\r\n\r\n"],
|
||||
["deno", "node: js\r\n\r\n"],
|
||||
["deno", "node:js\r\ngo: lang\r\n\r\n"]
|
||||
];
|
||||
for (const [header, trailer] of patterns) {
|
||||
const h = new Headers({
|
||||
trailer: header
|
||||
});
|
||||
await assertThrowsAsync(
|
||||
async () => {
|
||||
await readTrailers(h, new BufReader(new Buffer(encode(trailer))));
|
||||
},
|
||||
Error,
|
||||
"Undeclared trailer field"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
test("readTrailer should throw if trailer contains prohibited fields", async () => {
|
||||
for (const f of ["content-length", "trailer", "transfer-encoding"]) {
|
||||
const h = new Headers({
|
||||
trailer: f
|
||||
});
|
||||
await assertThrowsAsync(
|
||||
async () => {
|
||||
await readTrailers(h, new BufReader(new Buffer()));
|
||||
},
|
||||
Error,
|
||||
"Prohibited field for trailer"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
test("writeTrailer", async () => {
|
||||
const w = new Buffer();
|
||||
await writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: "deno,node" }),
|
||||
new Headers({ deno: "land", node: "js" })
|
||||
);
|
||||
assertEquals(w.toString(), "deno: land\r\nnode: js\r\n\r\n");
|
||||
});
|
||||
|
||||
test("writeTrailer should throw", async () => {
|
||||
const w = new Buffer();
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(w, new Headers(), new Headers());
|
||||
},
|
||||
Error,
|
||||
'must have "trailer"'
|
||||
);
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(w, new Headers({ trailer: "deno" }), new Headers());
|
||||
},
|
||||
Error,
|
||||
"only allowed"
|
||||
);
|
||||
for (const f of ["content-length", "trailer", "transfer-encoding"]) {
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: f }),
|
||||
new Headers({ [f]: "1" })
|
||||
);
|
||||
},
|
||||
AssertionError,
|
||||
"prohibited"
|
||||
);
|
||||
}
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: "deno" }),
|
||||
new Headers({ node: "js" })
|
||||
);
|
||||
},
|
||||
AssertionError,
|
||||
"Not trailer"
|
||||
);
|
||||
});
|
|
@ -5,12 +5,15 @@ import { delay } from "../util/async.ts";
|
|||
const addr = Deno.args[1] || "127.0.0.1:4501";
|
||||
const server = serve(addr);
|
||||
|
||||
const body = new TextEncoder().encode("Hello 1\n");
|
||||
const body4 = new TextEncoder().encode("World 4\n");
|
||||
|
||||
async function delayedRespond(request: ServerRequest): Promise<void> {
|
||||
function body(i: number): string {
|
||||
return `Step${i}\n`;
|
||||
}
|
||||
async function delayedRespond(
|
||||
request: ServerRequest,
|
||||
step: number
|
||||
): Promise<void> {
|
||||
await delay(3000);
|
||||
await request.respond({ status: 200, body });
|
||||
await request.respond({ status: 200, body: body(step) });
|
||||
}
|
||||
|
||||
async function largeRespond(request: ServerRequest, c: string): Promise<void> {
|
||||
|
@ -19,6 +22,13 @@ async function largeRespond(request: ServerRequest, c: string): Promise<void> {
|
|||
await request.respond({ status: 200, body: b });
|
||||
}
|
||||
|
||||
async function ignoreToConsume(
|
||||
request: ServerRequest,
|
||||
step: number
|
||||
): Promise<void> {
|
||||
await request.respond({ status: 200, body: body(step) });
|
||||
}
|
||||
|
||||
console.log("Racing server listening...\n");
|
||||
|
||||
let step = 1;
|
||||
|
@ -28,7 +38,7 @@ for await (const request of server) {
|
|||
// Try to wait long enough.
|
||||
// For pipelining, this should cause all the following response
|
||||
// to block.
|
||||
delayedRespond(request);
|
||||
delayedRespond(request, step);
|
||||
break;
|
||||
case 2:
|
||||
// HUGE body.
|
||||
|
@ -38,8 +48,20 @@ for await (const request of server) {
|
|||
// HUGE body.
|
||||
largeRespond(request, "b");
|
||||
break;
|
||||
case 4:
|
||||
// Ignore to consume body (content-length)
|
||||
ignoreToConsume(request, step);
|
||||
break;
|
||||
case 5:
|
||||
// Ignore to consume body (chunked)
|
||||
ignoreToConsume(request, step);
|
||||
break;
|
||||
case 6:
|
||||
// Ignore to consume body (chunked + trailers)
|
||||
ignoreToConsume(request, step);
|
||||
break;
|
||||
default:
|
||||
request.respond({ status: 200, body: body4 });
|
||||
request.respond({ status: 200, body: body(step) });
|
||||
break;
|
||||
}
|
||||
step++;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
const { connect, run } = Deno;
|
||||
|
||||
import { assert, assertEquals } from "../testing/asserts.ts";
|
||||
import { BufReader } from "../io/bufio.ts";
|
||||
import { BufReader, BufWriter } from "../io/bufio.ts";
|
||||
import { TextProtoReader } from "../textproto/mod.ts";
|
||||
|
||||
let server: Deno.Process;
|
||||
|
@ -21,20 +21,20 @@ function killServer(): void {
|
|||
server.stdout?.close();
|
||||
}
|
||||
|
||||
const input = `GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
`;
|
||||
const input = [
|
||||
"GET / HTTP/1.1\r\n\r\n",
|
||||
"GET / HTTP/1.1\r\n\r\n",
|
||||
"GET / HTTP/1.1\r\n\r\n",
|
||||
"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\ndeno",
|
||||
"POST / HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n4\r\ndeno\r\n0\r\n\r\n",
|
||||
"POST / HTTP/1.1\r\ntransfer-encoding: chunked\r\ntrailer: deno\r\n\r\n4\r\ndeno\r\n0\r\n\r\ndeno: land\r\n\r\n",
|
||||
"GET / HTTP/1.1\r\n\r\n"
|
||||
].join("");
|
||||
const HUGE_BODY_SIZE = 1024 * 1024;
|
||||
const output = `HTTP/1.1 200 OK
|
||||
content-length: 8
|
||||
content-length: 6
|
||||
|
||||
Hello 1
|
||||
Step1
|
||||
HTTP/1.1 200 OK
|
||||
content-length: ${HUGE_BODY_SIZE}
|
||||
|
||||
|
@ -42,9 +42,21 @@ ${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
|
|||
content-length: ${HUGE_BODY_SIZE}
|
||||
|
||||
${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
|
||||
content-length: 8
|
||||
content-length: 6
|
||||
|
||||
World 4
|
||||
Step4
|
||||
HTTP/1.1 200 OK
|
||||
content-length: 6
|
||||
|
||||
Step5
|
||||
HTTP/1.1 200 OK
|
||||
content-length: 6
|
||||
|
||||
Step6
|
||||
HTTP/1.1 200 OK
|
||||
content-length: 6
|
||||
|
||||
Step7
|
||||
`;
|
||||
|
||||
Deno.test(async function serverPipelineRace(): Promise<void> {
|
||||
|
@ -52,7 +64,9 @@ Deno.test(async function serverPipelineRace(): Promise<void> {
|
|||
|
||||
const conn = await connect({ port: 4501 });
|
||||
const r = new TextProtoReader(new BufReader(conn));
|
||||
await conn.write(new TextEncoder().encode(input));
|
||||
const w = new BufWriter(conn);
|
||||
await w.write(new TextEncoder().encode(input));
|
||||
await w.flush();
|
||||
const outLines = output.split("\n");
|
||||
// length - 1 to disregard last empty line
|
||||
for (let i = 0; i < outLines.length - 1; i++) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
const { listen, listenTLS, copy, toAsyncIterator } = Deno;
|
||||
const { listen, listenTLS, copy } = Deno;
|
||||
type Listener = Deno.Listener;
|
||||
type Conn = Deno.Conn;
|
||||
type Reader = Deno.Reader;
|
||||
|
@ -9,6 +9,13 @@ import { TextProtoReader } from "../textproto/mod.ts";
|
|||
import { STATUS_TEXT } from "./http_status.ts";
|
||||
import { assert } from "../testing/asserts.ts";
|
||||
import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts";
|
||||
import {
|
||||
bodyReader,
|
||||
chunkedBodyReader,
|
||||
writeChunkedBody,
|
||||
writeTrailers,
|
||||
emptyReader
|
||||
} from "./io.ts";
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
|
@ -30,64 +37,6 @@ export function setContentLength(r: Response): void {
|
|||
}
|
||||
}
|
||||
|
||||
async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
|
||||
const writer = BufWriter.create(w);
|
||||
|
||||
for await (const chunk of toAsyncIterator(r)) {
|
||||
if (chunk.byteLength <= 0) continue;
|
||||
const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
|
||||
const end = encoder.encode("\r\n");
|
||||
await writer.write(start);
|
||||
await writer.write(chunk);
|
||||
await writer.write(end);
|
||||
}
|
||||
|
||||
const endChunk = encoder.encode("0\r\n\r\n");
|
||||
await writer.write(endChunk);
|
||||
}
|
||||
const kProhibitedTrailerHeaders = [
|
||||
"transfer-encoding",
|
||||
"content-length",
|
||||
"trailer"
|
||||
];
|
||||
|
||||
/** write trailer headers to writer. it mostly should be called after writeResponse */
|
||||
export async function writeTrailers(
|
||||
w: Writer,
|
||||
headers: Headers,
|
||||
trailers: Headers
|
||||
): Promise<void> {
|
||||
const trailer = headers.get("trailer");
|
||||
if (trailer === null) {
|
||||
throw new Error('response headers must have "trailer" header field');
|
||||
}
|
||||
const transferEncoding = headers.get("transfer-encoding");
|
||||
if (transferEncoding === null || !transferEncoding.match(/^chunked/)) {
|
||||
throw new Error(
|
||||
`trailer headers is only allowed for "transfer-encoding: chunked": got "${transferEncoding}"`
|
||||
);
|
||||
}
|
||||
const writer = BufWriter.create(w);
|
||||
const trailerHeaderFields = trailer
|
||||
.split(",")
|
||||
.map(s => s.trim().toLowerCase());
|
||||
for (const f of trailerHeaderFields) {
|
||||
assert(
|
||||
!kProhibitedTrailerHeaders.includes(f),
|
||||
`"${f}" is prohibited for trailer header`
|
||||
);
|
||||
}
|
||||
for (const [key, value] of trailers) {
|
||||
assert(
|
||||
trailerHeaderFields.includes(key),
|
||||
`Not trailer header field: ${key}`
|
||||
);
|
||||
await writer.write(encoder.encode(`${key}: ${value}\r\n`));
|
||||
}
|
||||
await writer.write(encoder.encode("\r\n"));
|
||||
await writer.flush();
|
||||
}
|
||||
|
||||
export async function writeResponse(w: Writer, r: Response): Promise<void> {
|
||||
const protoMajor = 1;
|
||||
const protoMinor = 1;
|
||||
|
@ -138,17 +87,6 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
|
|||
await writer.flush();
|
||||
}
|
||||
|
||||
export class ServerRequestBody implements Reader {
|
||||
constructor(private it: AsyncIterator<number, undefined, Uint8Array>) {}
|
||||
async read(p: Uint8Array): Promise<number | Deno.EOF> {
|
||||
const res = await this.it.next(p);
|
||||
if (res.done) {
|
||||
return Deno.EOF;
|
||||
}
|
||||
return res.value;
|
||||
}
|
||||
}
|
||||
|
||||
export class ServerRequest {
|
||||
url!: string;
|
||||
method!: string;
|
||||
|
@ -184,7 +122,7 @@ export class ServerRequest {
|
|||
return this._contentLength;
|
||||
}
|
||||
|
||||
private _body: ServerRequestBody | null = null;
|
||||
private _body: Deno.Reader | null = null;
|
||||
|
||||
/**
|
||||
* Body of the request.
|
||||
|
@ -200,102 +138,30 @@ export class ServerRequest {
|
|||
* bufSlice = bufSlice.subarray(nread);
|
||||
* }
|
||||
*/
|
||||
get body(): ServerRequestBody {
|
||||
get body(): Deno.Reader {
|
||||
if (!this._body) {
|
||||
const stream = this._bodyStream();
|
||||
stream.next(); // drop dummy such that first read is not empty.
|
||||
this._body = new ServerRequestBody(stream);
|
||||
if (this.contentLength != null) {
|
||||
this._body = bodyReader(this.contentLength, this.r);
|
||||
} else {
|
||||
const transferEncoding = this.headers.get("transfer-encoding");
|
||||
if (transferEncoding != null) {
|
||||
const parts = transferEncoding
|
||||
.split(",")
|
||||
.map((e): string => e.trim().toLowerCase());
|
||||
assert(
|
||||
parts.includes("chunked"),
|
||||
'transfer-encoding must include "chunked" if content-length is not set'
|
||||
);
|
||||
this._body = chunkedBodyReader(this.headers, this.r);
|
||||
} else {
|
||||
// Neither content-length nor transfer-encoding: chunked
|
||||
this._body = emptyReader();
|
||||
}
|
||||
}
|
||||
}
|
||||
return this._body;
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal: actually reading body. Each step, buf to use is passed
|
||||
* in through yield result.
|
||||
* Returns on no more data to read or error.
|
||||
*/
|
||||
private async *_bodyStream(): AsyncIterator<number, undefined, Uint8Array> {
|
||||
let buf = yield 0; // dummy yield to retrieve user provided buf.
|
||||
if (this.headers.has("content-length")) {
|
||||
const len = this.contentLength;
|
||||
if (len === null) {
|
||||
return;
|
||||
}
|
||||
let rr = await this.r.read(buf);
|
||||
let nread = rr === Deno.EOF ? 0 : rr;
|
||||
let nreadTotal = nread;
|
||||
while (rr !== Deno.EOF && nreadTotal < len) {
|
||||
buf = yield nread;
|
||||
rr = await this.r.read(buf);
|
||||
nread = rr === Deno.EOF ? 0 : rr;
|
||||
nreadTotal += nread;
|
||||
}
|
||||
yield nread;
|
||||
} else {
|
||||
const transferEncoding = this.headers.get("transfer-encoding");
|
||||
if (transferEncoding) {
|
||||
const parts = transferEncoding
|
||||
.split(",")
|
||||
.map((e): string => e.trim().toLowerCase());
|
||||
if (parts.includes("chunked")) {
|
||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
const tp = new TextProtoReader(this.r);
|
||||
let line = await tp.readLine();
|
||||
if (line === Deno.EOF) throw new UnexpectedEOFError();
|
||||
// TODO: handle chunk extension
|
||||
const [chunkSizeString] = line.split(";");
|
||||
let chunkSize = parseInt(chunkSizeString, 16);
|
||||
if (Number.isNaN(chunkSize) || chunkSize < 0) {
|
||||
throw new Error("Invalid chunk size");
|
||||
}
|
||||
while (chunkSize > 0) {
|
||||
let currChunkOffset = 0;
|
||||
// Since given readBuffer might be smaller, loop.
|
||||
while (currChunkOffset < chunkSize) {
|
||||
// Try to be as large as chunkSize. Might be smaller though.
|
||||
const bufferToFill = buf.subarray(0, chunkSize);
|
||||
if ((await this.r.readFull(bufferToFill)) === Deno.EOF) {
|
||||
throw new UnexpectedEOFError();
|
||||
}
|
||||
currChunkOffset += bufferToFill.length;
|
||||
buf = yield bufferToFill.length;
|
||||
}
|
||||
await this.r.readLine(); // Consume \r\n
|
||||
line = await tp.readLine();
|
||||
if (line === Deno.EOF) throw new UnexpectedEOFError();
|
||||
chunkSize = parseInt(line, 16);
|
||||
}
|
||||
const entityHeaders = await tp.readMIMEHeader();
|
||||
if (entityHeaders !== Deno.EOF) {
|
||||
for (const [k, v] of entityHeaders) {
|
||||
this.headers.set(k, v);
|
||||
}
|
||||
}
|
||||
/* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
length := 0
|
||||
read chunk-size, chunk-extension (if any) and CRLF
|
||||
while (chunk-size > 0) {
|
||||
read chunk-data and CRLF
|
||||
append chunk-data to entity-body
|
||||
length := length + chunk-size
|
||||
read chunk-size and CRLF
|
||||
}
|
||||
read entity-header
|
||||
while (entity-header not empty) {
|
||||
append entity-header to existing header fields
|
||||
read entity-header
|
||||
}
|
||||
Content-Length := length
|
||||
Remove "chunked" from Transfer-Encoding
|
||||
*/
|
||||
return; // Must return here to avoid fall through
|
||||
}
|
||||
// TODO: handle other transfer-encoding types
|
||||
}
|
||||
// Otherwise... Do nothing
|
||||
}
|
||||
}
|
||||
|
||||
async respond(r: Response): Promise<void> {
|
||||
let err: Error | undefined;
|
||||
try {
|
||||
|
@ -316,6 +182,16 @@ export class ServerRequest {
|
|||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private finalized = false;
|
||||
async finalize(): Promise<void> {
|
||||
if (this.finalized) return;
|
||||
// Consume unread body
|
||||
const body = this.body;
|
||||
const buf = new Uint8Array(1024);
|
||||
while ((await body.read(buf)) !== Deno.EOF) {}
|
||||
this.finalized = true;
|
||||
}
|
||||
}
|
||||
|
||||
function fixLength(req: ServerRequest): void {
|
||||
|
@ -462,6 +338,8 @@ export class Server implements AsyncIterable<ServerRequest> {
|
|||
// req.done implies this connection already closed, so we can just return.
|
||||
return;
|
||||
}
|
||||
// Consume unread body and trailers if receiver didn't consume those data
|
||||
await req.finalize();
|
||||
}
|
||||
|
||||
if (req === Deno.EOF) {
|
||||
|
|
|
@ -11,8 +11,6 @@ import {
|
|||
assert,
|
||||
assertEquals,
|
||||
assertNotEquals,
|
||||
assertThrowsAsync,
|
||||
AssertionError,
|
||||
assertNotEOF
|
||||
} from "../testing/asserts.ts";
|
||||
import {
|
||||
|
@ -21,8 +19,7 @@ import {
|
|||
writeResponse,
|
||||
serve,
|
||||
readRequest,
|
||||
parseHTTPVersion,
|
||||
writeTrailers
|
||||
parseHTTPVersion
|
||||
} from "./server.ts";
|
||||
import {
|
||||
BufReader,
|
||||
|
@ -32,6 +29,7 @@ import {
|
|||
} from "../io/bufio.ts";
|
||||
import { delay, deferred } from "../util/async.ts";
|
||||
import { StringReader } from "../io/readers.ts";
|
||||
import { encode } from "../strings/mod.ts";
|
||||
|
||||
interface ResponseTest {
|
||||
response: Response;
|
||||
|
@ -43,7 +41,7 @@ const dec = new TextDecoder();
|
|||
|
||||
type Handler = () => void;
|
||||
|
||||
const mockConn = {
|
||||
const mockConn = (): Deno.Conn => ({
|
||||
localAddr: {
|
||||
transport: "tcp",
|
||||
hostname: "",
|
||||
|
@ -64,7 +62,7 @@ const mockConn = {
|
|||
return -1;
|
||||
},
|
||||
close: (): void => {}
|
||||
};
|
||||
});
|
||||
|
||||
const responseTests: ResponseTest[] = [
|
||||
// Default response
|
||||
|
@ -100,7 +98,7 @@ test(async function responseWrite(): Promise<void> {
|
|||
const request = new ServerRequest();
|
||||
request.w = bufw;
|
||||
|
||||
request.conn = mockConn as Deno.Conn;
|
||||
request.conn = mockConn();
|
||||
|
||||
await request.respond(testCase.response);
|
||||
assertEquals(buf.toString(), testCase.raw);
|
||||
|
@ -142,6 +140,25 @@ test(async function requestContentLength(): Promise<void> {
|
|||
}
|
||||
});
|
||||
|
||||
interface TotalReader extends Deno.Reader {
|
||||
total: number;
|
||||
}
|
||||
function totalReader(r: Deno.Reader): TotalReader {
|
||||
let _total = 0;
|
||||
async function read(p: Uint8Array): Promise<number | Deno.EOF> {
|
||||
const result = await r.read(p);
|
||||
if (typeof result === "number") {
|
||||
_total += result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return {
|
||||
read,
|
||||
get total(): number {
|
||||
return _total;
|
||||
}
|
||||
};
|
||||
}
|
||||
test(async function requestBodyWithContentLength(): Promise<void> {
|
||||
{
|
||||
const req = new ServerRequest();
|
||||
|
@ -164,8 +181,53 @@ test(async function requestBodyWithContentLength(): Promise<void> {
|
|||
const body = dec.decode(await Deno.readAll(req.body));
|
||||
assertEquals(body, longText);
|
||||
}
|
||||
// Handler ignored to consume body
|
||||
});
|
||||
test("ServerRequest.finalize() should consume unread body / content-length", async () => {
|
||||
const text = "deno.land";
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("content-length", "" + text.length);
|
||||
const tr = totalReader(new Buffer(encode(text)));
|
||||
req.r = new BufReader(tr);
|
||||
req.w = new BufWriter(new Buffer());
|
||||
await req.respond({ status: 200, body: "ok" });
|
||||
assertEquals(tr.total, 0);
|
||||
await req.finalize();
|
||||
assertEquals(tr.total, text.length);
|
||||
});
|
||||
test("ServerRequest.finalize() should consume unread body / chunked, trailers", async () => {
|
||||
const text = [
|
||||
"5",
|
||||
"Hello",
|
||||
"4",
|
||||
"Deno",
|
||||
"0",
|
||||
"",
|
||||
"deno: land",
|
||||
"node: js",
|
||||
"",
|
||||
""
|
||||
].join("\r\n");
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("transfer-encoding", "chunked");
|
||||
req.headers.set("trailer", "deno,node");
|
||||
const body = encode(text);
|
||||
const tr = totalReader(new Buffer(body));
|
||||
req.r = new BufReader(tr);
|
||||
req.w = new BufWriter(new Buffer());
|
||||
await req.respond({ status: 200, body: "ok" });
|
||||
assertEquals(tr.total, 0);
|
||||
assertEquals(req.headers.has("trailer"), true);
|
||||
assertEquals(req.headers.has("deno"), false);
|
||||
assertEquals(req.headers.has("node"), false);
|
||||
await req.finalize();
|
||||
assertEquals(tr.total, body.byteLength);
|
||||
assertEquals(req.headers.has("trailer"), false);
|
||||
assertEquals(req.headers.get("deno"), "land");
|
||||
assertEquals(req.headers.get("node"), "js");
|
||||
});
|
||||
|
||||
test(async function requestBodyWithTransferEncoding(): Promise<void> {
|
||||
{
|
||||
const shortText = "Hello";
|
||||
|
@ -465,7 +527,7 @@ malformedHeader
|
|||
const reader = new BufReader(new StringReader(input));
|
||||
let err;
|
||||
try {
|
||||
await readRequest(mockConn as Deno.Conn, reader);
|
||||
await readRequest(mockConn(), reader);
|
||||
} catch (e) {
|
||||
err = e;
|
||||
}
|
||||
|
@ -543,7 +605,7 @@ test(async function testReadRequestError(): Promise<void> {
|
|||
let err;
|
||||
let req: ServerRequest | Deno.EOF | undefined;
|
||||
try {
|
||||
req = await readRequest(mockConn as Deno.Conn, reader);
|
||||
req = await readRequest(mockConn(), reader);
|
||||
} catch (e) {
|
||||
err = e;
|
||||
}
|
||||
|
@ -655,7 +717,10 @@ test({
|
|||
try {
|
||||
const r = new TextProtoReader(new BufReader(p.stdout!));
|
||||
const s = await r.readLine();
|
||||
assert(s !== Deno.EOF && s.includes("server listening"));
|
||||
assert(
|
||||
s !== Deno.EOF && s.includes("server listening"),
|
||||
"server must be started"
|
||||
);
|
||||
|
||||
let serverIsRunning = true;
|
||||
p.status()
|
||||
|
@ -765,55 +830,3 @@ if (Deno.build.os !== "win") {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
test("writeTrailer", async () => {
|
||||
const w = new Buffer();
|
||||
await writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: "deno,node" }),
|
||||
new Headers({ deno: "land", node: "js" })
|
||||
);
|
||||
assertEquals(w.toString(), "deno: land\r\nnode: js\r\n\r\n");
|
||||
});
|
||||
|
||||
test("writeTrailer should throw", async () => {
|
||||
const w = new Buffer();
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(w, new Headers(), new Headers());
|
||||
},
|
||||
Error,
|
||||
'must have "trailer"'
|
||||
);
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(w, new Headers({ trailer: "deno" }), new Headers());
|
||||
},
|
||||
Error,
|
||||
"only allowed"
|
||||
);
|
||||
for (const f of ["content-length", "trailer", "transfer-encoding"]) {
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: f }),
|
||||
new Headers({ [f]: "1" })
|
||||
);
|
||||
},
|
||||
AssertionError,
|
||||
"prohibited"
|
||||
);
|
||||
}
|
||||
await assertThrowsAsync(
|
||||
() => {
|
||||
return writeTrailers(
|
||||
w,
|
||||
new Headers({ "transfer-encoding": "chunked", trailer: "deno" }),
|
||||
new Headers({ node: "js" })
|
||||
);
|
||||
},
|
||||
AssertionError,
|
||||
"Not trailer"
|
||||
);
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue