fix(ext/http): flush gzip streaming response (#23991)

This commit changes `gzip` compression in `Deno.serve` API to flush data
after each write. There's a slight performance regression, but provided
test shows a scenario that was not possible before.

---------

Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
This commit is contained in:
Bartek Iwańczuk 2024-05-28 21:46:04 +01:00 committed by GitHub
parent 69da5d8290
commit 7d8a8a0461
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 68 additions and 13 deletions

View file

@ -394,7 +394,7 @@ impl PollFrame for GZipResponseStream {
stm.compress(&[], &mut buf, flate2::FlushCompress::Finish)
}
ResponseStreamResult::NonEmptyBuf(mut input) => {
let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None);
let res = stm.compress(&input, &mut buf, flate2::FlushCompress::Sync);
let len_in = (stm.total_in() - start_in) as usize;
debug_assert!(len_in <= input.len());
this.crc.update(&input[..len_in]);

View file

@ -671,6 +671,9 @@ class ClientRequest extends OutgoingMessage {
(async () => {
try {
const res = await op_fetch_send(this._req.requestRid);
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
try {
cb?.();
} catch (_) {
@ -709,10 +712,6 @@ class ClientRequest extends OutgoingMessage {
Object.entries(res.headers).flat().length,
);
if (this._req.cancelHandleRid !== null) {
core.tryClose(this._req.cancelHandleRid);
}
if (incoming.upgrade) {
if (this.listenerCount("upgrade") === 0) {
// No listeners, so we got nothing to do

View file

@ -11,6 +11,7 @@ import {
curlRequest,
curlRequestWithStdErr,
execCode,
execCode3,
fail,
tmpUnixSocketPath,
} from "./test_util.ts";
@ -3985,3 +3986,59 @@ Deno.test(
assert(respText === "Internal Server Error");
},
);
Deno.test(
{
permissions: { net: true, run: true, read: true },
ignore: Deno.build.os !== "linux",
},
async function gzipFlushResponseStream() {
const { promise, resolve } = Promise.withResolvers<void>();
const ac = new AbortController();
console.log("Starting server", servePort);
let timer: number | undefined = undefined;
let _controller;
const server = Deno.serve(
{
port: servePort,
onListen: onListen(resolve),
signal: ac.signal,
},
() => {
const body = new ReadableStream({
start(controller) {
timer = setInterval(() => {
const message = `It is ${new Date().toISOString()}\n`;
controller.enqueue(new TextEncoder().encode(message));
}, 1000);
_controller = controller;
},
cancel() {
if (timer !== undefined) {
clearInterval(timer);
}
},
});
return new Response(body, {
headers: {
"content-type": "text/plain",
"x-content-type-options": "nosniff",
},
});
},
);
await promise;
const e = await execCode3("/usr/bin/sh", [
"-c",
`curl --stderr - -N --compressed --no-progress-meter http://localhost:${servePort}`,
]);
await e.waitStdoutText("It is ");
clearTimeout(timer);
_controller!.close();
await e.finished();
ac.abort();
await server.finished;
},
);

View file

@ -35,14 +35,9 @@ export function execCode(code: string): Promise<readonly [number, string]> {
return execCode2(code).finished();
}
export function execCode2(code: string) {
const command = new Deno.Command(Deno.execPath(), {
args: [
"eval",
"--unstable",
"--no-check",
code,
],
export function execCode3(cmd: string, args: string[]) {
const command = new Deno.Command(cmd, {
args,
stdout: "piped",
stderr: "inherit",
});
@ -82,6 +77,10 @@ export function execCode2(code: string) {
};
}
export function execCode2(code: string) {
return execCode3(Deno.execPath(), ["eval", "--unstable", "--no-check", code]);
}
export function tmpUnixSocketPath(): string {
const folder = Deno.makeTempDirSync();
return join(folder, "socket");