perf(ext/streams): fast path when consuming body of tee'd stream (#16329)

Add a fast path for consuming the body of cloned `Request`/`Response`,
which is very common specially when using `cache` API.
This commit is contained in:
Marcos Casagrande 2022-10-24 13:13:20 +02:00 committed by GitHub
parent 873a5ce2ed
commit b7d86b4bed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 0 deletions

View file

@ -2459,6 +2459,62 @@ Deno.test(
},
);
Deno.test(
{ permissions: { net: true } },
async function httpServerRequestResponseClone() {
const body = "deno".repeat(64 * 1024);
let httpConn: Deno.HttpConn;
const listener = Deno.listen({ port: 4501 });
const promise = (async () => {
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
const clone = request.clone();
const reader = clone.body!.getReader();
// get first chunk from branch2
const clonedChunks = [];
const { value, done } = await reader.read();
assert(!done);
clonedChunks.push(value);
// consume request after first chunk single read
// readAll should read correctly the rest of the body.
// firstChunk should be in the stream internal buffer
const body1 = await request.text();
while (true) {
const { value, done } = await reader.read();
if (done) break;
clonedChunks.push(value);
}
let offset = 0;
const body2 = new Uint8Array(body.length);
for (const chunk of clonedChunks) {
body2.set(chunk, offset);
offset += chunk.byteLength;
}
assertEquals(body1, body);
assertEquals(body1, new TextDecoder().decode(body2));
await respondWith(new Response(body));
})();
const response = await fetch("http://localhost:4501", {
body,
method: "POST",
});
const clone = response.clone();
assertEquals(await response.text(), await clone.text());
await promise;
httpConn!.close();
},
);
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);

View file

@ -323,6 +323,7 @@
close: (rid) => ops.op_close(rid),
tryClose: (rid) => ops.op_try_close(rid),
read: opAsync.bind(null, "op_read"),
readAll: opAsync.bind(null, "op_read_all"),
write: opAsync.bind(null, "op_write"),
writeAll: opAsync.bind(null, "op_write_all"),
shutdown: opAsync.bind(null, "op_shutdown"),

View file

@ -652,6 +652,9 @@
const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
core.tryClose(rid);
});
const _readAll = Symbol("[[readAll]]");
const _original = Symbol("[[original]]");
/**
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
@ -681,6 +684,17 @@
async pull(controller) {
const v = controller.byobRequest.view;
try {
if (controller[_readAll] === true) {
// fast path for tee'd streams consuming body
const chunk = await core.readAll(rid);
if (chunk.byteLength > 0) {
controller.enqueue(chunk);
}
controller.close();
tryClose();
return;
}
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
tryClose();
@ -809,8 +823,17 @@
/** @type {Uint8Array[]} */
const chunks = [];
let totalLength = 0;
// tee'd stream
if (stream[_original]) {
// One of the branches is consuming the stream
// signal controller.pull that we can consume it in a single op
stream[_original][_controller][_readAll] = true;
}
while (true) {
const { value: chunk, done } = await reader.read();
if (done) break;
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)) {
@ -3029,6 +3052,10 @@
pull2Algorithm,
cancel2Algorithm,
);
branch1[_original] = stream;
branch2[_original] = stream;
forwardReaderError(reader);
return [branch1, branch2];
}