feat(core): add Deno.core.writeAll(rid, chunk) (#16228)

This commit adds a new op_write_all to core that allows writing an
entire chunk in a single async op call. Internally this calls
`Resource::write_all`.

The `writableStreamForRid` has been moved to `06_streams.js` now, and
uses this new op. Various other code paths now also use this new op.

Closes #16227
This commit is contained in:
Luca Casonato 2022-10-10 10:28:35 +02:00 committed by GitHub
parent 4d6aed1b52
commit 1ab3691b09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 85 additions and 43 deletions

View file

@ -329,6 +329,7 @@
tryClose: (rid) => ops.op_try_close(rid),
read: opAsync.bind(null, "op_read"),
write: opAsync.bind(null, "op_write"),
writeAll: opAsync.bind(null, "op_write_all"),
shutdown: opAsync.bind(null, "op_shutdown"),
print: (msg, isErr) => ops.op_print(msg, isErr),
setMacrotaskCallback: (fn) => ops.op_set_macrotask_callback(fn),

View file

@ -23,7 +23,7 @@ async function serve(rid) {
try {
while (true) {
await Deno.core.read(rid, requestBuf);
await Deno.core.write(rid, responseBuf);
await Deno.core.writeAll(rid, responseBuf);
}
} catch (e) {
if (

View file

@ -61,6 +61,11 @@ declare namespace Deno {
*/
function write(rid: number, buf: Uint8Array): Promise<number>;
/**
* Write to a (stream) resource that implements write()
*/
function writeAll(rid: number, buf: Uint8Array): Promise<void>;
/**
* Print a message to stdout or stderr
*/

View file

@ -38,6 +38,7 @@ pub(crate) fn init_builtins() -> Extension {
op_read::decl(),
op_read_all::decl(),
op_write::decl(),
op_write_all::decl(),
op_shutdown::decl(),
op_metrics::decl(),
op_format_file_name::decl(),
@ -253,6 +254,18 @@ async fn op_write(
Ok(resp.nwritten() as u32)
}
#[op]
async fn op_write_all(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
buf: ZeroCopyBuf,
) -> Result<(), Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
let view = BufView::from(buf);
resource.write_all(view).await?;
Ok(())
}
#[op]
async fn op_shutdown(
state: Rc<RefCell<OpState>>,

View file

@ -148,7 +148,7 @@
await core.shutdown(rid);
break;
}
await core.write(rid, value);
await core.writeAll(rid, value);
}
} finally {
core.close(rid);

View file

@ -225,7 +225,7 @@
}
try {
await PromisePrototypeCatch(
core.write(requestBodyRid, value),
core.writeAll(requestBodyRid, value),
(err) => {
if (terminator.aborted) return;
throw err;

View file

@ -4,7 +4,8 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@ -65,39 +66,6 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
function tryClose(rid) {
try {
core.close(rid);
} catch {
// Ignore errors
}
}
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {
try {
let nwritten = 0;
while (nwritten < chunk.length) {
nwritten += await write(
rid,
TypedArrayPrototypeSubarray(chunk, nwritten),
);
}
} catch (e) {
controller.error(e);
tryClose(rid);
}
},
close() {
tryClose(rid);
},
abort() {
tryClose(rid);
},
});
}
class Conn {
#rid = 0;
#remoteAddr = null;
@ -353,7 +321,4 @@
Datagram,
resolveDns,
};
window.__bootstrap.streamUtils = {
writableStreamForRid,
};
})(this);

View file

@ -826,6 +826,62 @@
return finalBuffer;
}
/**
* Create a new Writable object that is backed by a Resource that implements
* `Resource::write` / `Resource::write_all`. This object contains enough
* metadata to allow callers to bypass the JavaScript WritableStream
* implementation and write directly to the underlying resource if they so
* choose (FastStream).
*
* @param {number} rid The resource ID to write to.
* @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true.
* @returns {ReadableStream<Uint8Array>}
*/
function writableStreamForRid(rid, autoClose = true) {
const stream = webidl.createBranded(WritableStream);
stream[_resourceBacking] = { rid, autoClose };
const tryClose = () => {
if (!autoClose) return;
RESOURCE_REGISTRY.unregister(stream);
core.tryClose(rid);
};
if (autoClose) {
RESOURCE_REGISTRY.register(stream, rid, stream);
}
const underlyingSink = {
async write(chunk, controller) {
try {
await core.writeAll(rid, chunk);
} catch (e) {
controller.error(e);
tryClose();
}
},
close() {
tryClose();
},
abort() {
tryClose();
},
};
initializeWritableStream(stream);
setUpWritableStreamDefaultControllerFromUnderlyingSink(
stream,
underlyingSink,
underlyingSink,
1,
() => 1,
);
return stream;
}
function getWritableStreamResourceBacking(stream) {
return stream[_resourceBacking];
}
/*
* @param {ReadableStream} stream
*/
@ -6059,6 +6115,8 @@
readableStreamForRidUnrefableUnref,
readableStreamThrowIfErrored,
getReadableStreamResourceBacking,
writableStreamForRid,
getWritableStreamResourceBacking,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,

View file

@ -7,8 +7,8 @@
const { read, readSync, write, writeSync } = window.__bootstrap.io;
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
const { pathFromURL } = window.__bootstrap.util;
const { writableStreamForRid } = window.__bootstrap.streamUtils;
const { readableStreamForRid } = window.__bootstrap.streams;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streams;
const {
ArrayPrototypeFilter,
Error,

View file

@ -20,8 +20,8 @@
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
writableStreamForRid,
} = window.__bootstrap.streams;
const { writableStreamForRid } = window.__bootstrap.streamUtils;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");