Enforce HTTP/1.1 pipeline response order (denoland/deno_std#331)

Original: 144ef0e08d
This commit is contained in:
Kevin (Kun) "Kassimo" Qian 2019-04-13 12:23:56 -07:00 committed by Ryan Dahl
parent 733fbfd555
commit 236cedc7cb
5 changed files with 224 additions and 18 deletions

53
http/racing_server.ts Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { serve, ServerRequest } from "./server.ts";
const addr = Deno.args[1] || "127.0.0.1:4501";
const server = serve(addr);
const body = new TextEncoder().encode("Hello 1\n");
const body4 = new TextEncoder().encode("World 4\n");
function sleep(ms: number): Promise<void> {
return new Promise(res => setTimeout(res, ms));
}
async function delayedRespond(request: ServerRequest): Promise<void> {
await sleep(3000);
await request.respond({ status: 200, body });
}
async function largeRespond(request: ServerRequest, c: string): Promise<void> {
const b = new Uint8Array(1024 * 1024);
b.fill(c.charCodeAt(0));
await request.respond({ status: 200, body: b });
}
async function main(): Promise<void> {
let step = 1;
for await (const request of server) {
switch (step) {
case 1:
// Try to wait long enough.
// For pipelining, this should cause all the following response
// to block.
delayedRespond(request);
break;
case 2:
// HUGE body.
largeRespond(request, "a");
break;
case 3:
// HUGE body.
largeRespond(request, "b");
break;
default:
request.respond({ status: 200, body: body4 });
break;
}
step++;
}
}
main();
console.log("Racing server listening...\n");

View file

@ -0,0 +1,65 @@
const { dial, run } = Deno;
import { test } from "../testing/mod.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
import { BufReader } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
let server;
async function startServer(): Promise<void> {
server = run({
args: ["deno", "-A", "http/racing_server.ts"],
stdout: "piped"
});
// Once fileServer is ready it will write to its stdout.
const r = new TextProtoReader(new BufReader(server.stdout));
const [s, err] = await r.readLine();
assert(err == null);
assert(s.includes("Racing server listening..."));
}
function killServer(): void {
server.close();
server.stdout.close();
}
let input = `GET / HTTP/1.1
GET / HTTP/1.1
GET / HTTP/1.1
GET / HTTP/1.1
`;
const HUGE_BODY_SIZE = 1024 * 1024;
let output = `HTTP/1.1 200 OK
content-length: 8
Hello 1
HTTP/1.1 200 OK
content-length: ${HUGE_BODY_SIZE}
${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
content-length: ${HUGE_BODY_SIZE}
${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
content-length: 8
World 4
`;
test(async function serverPipelineRace(): Promise<void> {
await startServer();
const conn = await dial("tcp", "127.0.0.1:4501");
const r = new TextProtoReader(new BufReader(conn));
await conn.write(new TextEncoder().encode(input));
const outLines = output.split("\n");
// length - 1 to disregard last empty line
for (let i = 0; i < outLines.length - 1; i++) {
const [s, err] = await r.readLine();
assert(!err);
assertEquals(s, outLines[i]);
}
killServer();
});

View file

@ -13,6 +13,42 @@ interface Deferred {
resolve: () => void;
reject: () => void;
}
function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}
interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
}
function createHttpConn(c: Conn): HttpConn {
const httpConn = Object.assign(c, {
lastPipelineId: 0,
pendingDeferredMap: new Map()
});
const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
return httpConn;
}
function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
return w;
@ -115,11 +151,12 @@ async function readAllIterator(
}
export class ServerRequest {
pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
conn: Conn;
conn: HttpConn;
r: BufReader;
w: BufWriter;
@ -204,23 +241,26 @@ export class ServerRequest {
}
async respond(r: Response): Promise<void> {
return writeResponse(this.w, r);
// Check and wait if the previous request is done responding.
const lastPipelineId = this.pipelineId - 1;
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
// Signal the next pending request that it can start writing.
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
this.pipelineId
);
assert(!!currPipelineDeferred);
currPipelineDeferred.resolve();
}
}
function deferred(): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve,
reject
};
}
interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
@ -235,7 +275,7 @@ interface ServeEnv {
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: Conn,
c: HttpConn,
bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
@ -243,6 +283,13 @@ async function readRequest(
}
const bufw = new BufWriter(c);
const req = new ServerRequest();
// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());
req.conn = c;
req.r = bufr!;
req.w = bufw;
@ -277,7 +324,7 @@ function maybeHandleReq(
env.serveDeferred.resolve(); // signal while loop to process it
}
function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void {
function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
@ -298,7 +345,8 @@ export async function* serve(
listener.accept().then(handleConn);
};
handleConn = (conn: Conn) => {
serveConn(env, conn); // don't block
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};

View file

@ -19,6 +19,28 @@ interface ResponseTest {
const enc = new TextEncoder();
const dec = new TextDecoder();
interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}
function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}
const responseTests: ResponseTest[] = [
// Default response
{
@ -44,7 +66,24 @@ test(async function responseWrite() {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;
request.conn = {
localAddr: "",
remoteAddr: "",
rid: -1,
closeRead: () => {},
closeWrite: () => {},
read: async () => {
return { eof: true, nread: 0 };
},
write: async () => {
return -1;
},
close: () => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
};
await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);

View file

@ -1,3 +1,4 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./server_test.ts";
import "./file_server_test.ts";
import "./racing_server_test.ts";