feat: Add WritableStreams (and enable ReadableStreams piping) (#4980)

This commit is contained in:
Kitson Kelly 2020-05-01 00:40:10 +10:00 committed by GitHub
parent 84d687e958
commit 81c75332fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 2094 additions and 212 deletions

View file

@ -23,6 +23,8 @@ import * as workers from "./web/workers.ts";
import * as performanceUtil from "./web/performance.ts";
import * as request from "./web/request.ts";
import * as readableStream from "./web/streams/readable_stream.ts";
import * as queuingStrategy from "./web/streams/queuing_strategy.ts";
import * as writableStream from "./web/streams/writable_stream.ts";
// These imports are not exposed and therefore are fine to just import the
// symbols required.
@ -216,6 +218,10 @@ export const windowOrWorkerGlobalScopeProperties = {
AbortController: nonEnumerable(abortController.AbortControllerImpl),
AbortSignal: nonEnumerable(abortSignal.AbortSignalImpl),
Blob: nonEnumerable(blob.DenoBlob),
ByteLengthQueuingStrategy: nonEnumerable(
queuingStrategy.ByteLengthQueuingStrategyImpl
),
CountQueuingStrategy: nonEnumerable(queuingStrategy.CountQueuingStrategyImpl),
File: nonEnumerable(domFile.DomFileImpl),
CustomEvent: nonEnumerable(customEvent.CustomEventImpl),
DOMException: nonEnumerable(domException.DOMExceptionImpl),
@ -232,6 +238,7 @@ export const windowOrWorkerGlobalScopeProperties = {
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
Worker: nonEnumerable(workers.WorkerImpl),
WritableStream: nonEnumerable(writableStream.WritableStreamImpl),
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any

View file

@ -309,6 +309,21 @@ interface QueuingStrategy<T = any> {
size?: QueuingStrategySizeCallback<T>;
}
/** This Streams API interface provides a built-in byte length queuing strategy
* that can be used when constructing streams. */
declare class CountQueuingStrategy implements QueuingStrategy {
constructor(options: { highWaterMark: number });
highWaterMark: number;
size(chunk: any): 1;
}
declare class ByteLengthQueuingStrategy
implements QueuingStrategy<ArrayBufferView> {
constructor(options: { highWaterMark: number });
highWaterMark: number;
size(chunk: ArrayBufferView): number;
}
/** This Streams API interface represents a readable stream of byte data. The
* Fetch API offers a concrete instance of a ReadableStream through the body
* property of a Response object. */
@ -347,13 +362,58 @@ declare var ReadableStream: {
): ReadableStream<R>;
};
/** This Streams API interface provides a standard abstraction for writing streaming data to a destination, known as a sink. This object comes with built-in backpressure and queuing. */
interface WritableStream<W = any> {
interface WritableStreamDefaultControllerCloseCallback {
(): void | PromiseLike<void>;
}
interface WritableStreamDefaultControllerStartCallback {
(controller: WritableStreamDefaultController): void | PromiseLike<void>;
}
interface WritableStreamDefaultControllerWriteCallback<W> {
(chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
void
>;
}
interface WritableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}
interface UnderlyingSink<W = any> {
abort?: WritableStreamErrorCallback;
close?: WritableStreamDefaultControllerCloseCallback;
start?: WritableStreamDefaultControllerStartCallback;
type?: undefined;
write?: WritableStreamDefaultControllerWriteCallback<W>;
}
/** This Streams API interface provides a standard abstraction for writing
* streaming data to a destination, known as a sink. This object comes with
* built-in backpressure and queuing. */
declare class WritableStream<W = any> {
constructor(
underlyingSink?: UnderlyingSink<W>,
strategy?: QueuingStrategy<W>
);
readonly locked: boolean;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}
/** This Streams API interface represents a controller allowing control of a
* WritableStream's state. When constructing a WritableStream, the underlying
* sink is given a corresponding WritableStreamDefaultController instance to
* manipulate. */
interface WritableStreamDefaultController {
error(error?: any): void;
}
/** This Streams API interface is the object returned by
* WritableStream.getWriter() and once created locks the < writer to the
* WritableStream ensuring that no other streams can write to the underlying
* sink. */
interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;

View file

@ -0,0 +1,131 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { unitTest, assert, assertEquals } from "./test_util.ts";
import { assertThrowsAsync } from "../../../std/testing/asserts.ts";
unitTest(function streamPipeLocks() {
const rs = new ReadableStream();
const ws = new WritableStream();
assertEquals(rs.locked, false);
assertEquals(ws.locked, false);
rs.pipeTo(ws);
assert(rs.locked);
assert(ws.locked);
});
unitTest(async function streamPipeFinishUnlocks() {
const rs = new ReadableStream({
start(controller: ReadableStreamDefaultController): void {
controller.close();
},
});
const ws = new WritableStream();
await rs.pipeTo(ws);
assertEquals(rs.locked, false);
assertEquals(ws.locked, false);
});
unitTest(async function streamPipeReadableStreamLocked() {
const rs = new ReadableStream();
const ws = new WritableStream();
rs.getReader();
await assertThrowsAsync(async () => {
await rs.pipeTo(ws);
}, TypeError);
});
unitTest(async function streamPipeReadableStreamLocked() {
const rs = new ReadableStream();
const ws = new WritableStream();
ws.getWriter();
await assertThrowsAsync(async () => {
await rs.pipeTo(ws);
}, TypeError);
});
unitTest(async function streamPipeLotsOfChunks() {
const CHUNKS = 10;
const rs = new ReadableStream<number>({
start(c: ReadableStreamDefaultController): void {
for (let i = 0; i < CHUNKS; ++i) {
c.enqueue(i);
}
c.close();
},
});
const written: Array<string | number> = [];
const ws = new WritableStream(
{
write(chunk: number): void {
written.push(chunk);
},
close(): void {
written.push("closed");
},
},
new CountQueuingStrategy({ highWaterMark: CHUNKS })
);
await rs.pipeTo(ws);
const targetValues = [];
for (let i = 0; i < CHUNKS; ++i) {
targetValues.push(i);
}
targetValues.push("closed");
assertEquals(written, targetValues, "the correct values must be written");
// Ensure both readable and writable are closed by the time the pipe finishes.
await Promise.all([rs.getReader().closed, ws.getWriter().closed]);
});
for (const preventAbort of [true, false]) {
unitTest(function undefinedRejectionFromPull() {
const rs = new ReadableStream({
pull(): Promise<void> {
return Promise.reject(undefined);
},
});
return rs.pipeTo(new WritableStream(), { preventAbort }).then(
() => {
throw new Error("pipeTo promise should be rejected");
},
(value) =>
assertEquals(value, undefined, "rejection value should be undefined")
);
});
}
for (const preventCancel of [true, false]) {
unitTest(function undefinedRejectionWithPreventCancel() {
const rs = new ReadableStream({
pull(controller: ReadableStreamDefaultController<number>): void {
controller.enqueue(0);
},
});
const ws = new WritableStream({
write(): Promise<void> {
return Promise.reject(undefined);
},
});
return rs.pipeTo(ws, { preventCancel }).then(
() => {
throw new Error("pipeTo promise should be rejected");
},
(value) =>
assertEquals(value, undefined, "rejection value should be undefined")
);
});
}

View file

@ -0,0 +1,253 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { unitTest, assert, assertEquals, assertThrows } from "./test_util.ts";
unitTest(function writableStreamDesiredSizeOnReleasedWriter() {
const ws = new WritableStream();
const writer = ws.getWriter();
writer.releaseLock();
assertThrows(() => {
writer.desiredSize;
}, TypeError);
});
unitTest(function writableStreamDesiredSizeInitialValue() {
const ws = new WritableStream();
const writer = ws.getWriter();
assertEquals(writer.desiredSize, 1);
});
unitTest(async function writableStreamDesiredSizeClosed() {
const ws = new WritableStream();
const writer = ws.getWriter();
await writer.close();
assertEquals(writer.desiredSize, 0);
});
unitTest(function writableStreamStartThrowsDesiredSizeNull() {
const ws = new WritableStream({
start(c): void {
c.error();
},
});
const writer = ws.getWriter();
assertEquals(writer.desiredSize, null, "desiredSize should be null");
});
unitTest(function getWriterOnClosingStream() {
const ws = new WritableStream({});
const writer = ws.getWriter();
writer.close();
writer.releaseLock();
ws.getWriter();
});
unitTest(async function getWriterOnClosedStream() {
const ws = new WritableStream({});
const writer = ws.getWriter();
await writer.close();
writer.releaseLock();
ws.getWriter();
});
unitTest(function getWriterOnAbortedStream() {
const ws = new WritableStream({});
const writer = ws.getWriter();
writer.abort();
writer.releaseLock();
ws.getWriter();
});
unitTest(function getWriterOnErroredStream() {
const ws = new WritableStream({
start(c): void {
c.error();
},
});
const writer = ws.getWriter();
return writer.closed.then(
(v) => {
throw new Error(`writer.closed fulfilled unexpectedly with: ${v}`);
},
() => {
writer.releaseLock();
ws.getWriter();
}
);
});
unitTest(function closedAndReadyOnReleasedWriter() {
const ws = new WritableStream({});
const writer = ws.getWriter();
writer.releaseLock();
return writer.closed.then(
(v) => {
throw new Error("writer.closed fulfilled unexpectedly with: " + v);
},
(closedRejection) => {
assertEquals(
closedRejection.name,
"TypeError",
"closed promise should reject with a TypeError"
);
return writer.ready.then(
(v) => {
throw new Error("writer.ready fulfilled unexpectedly with: " + v);
},
(readyRejection) =>
assertEquals(
readyRejection,
closedRejection,
"ready promise should reject with the same error"
)
);
}
);
});
unitTest(function sinkMethodsCalledAsMethods() {
let thisObject: Sink | null = null;
// Calls to Sink methods after the first are implicitly ignored. Only the
// first value that is passed to the resolver is used.
class Sink {
start(): void {
assertEquals(this, thisObject, "start should be called as a method");
}
write(): void {
assertEquals(this, thisObject, "write should be called as a method");
}
close(): void {
assertEquals(this, thisObject, "close should be called as a method");
}
abort(): void {
assertEquals(this, thisObject, "abort should be called as a method");
}
}
const theSink = new Sink();
thisObject = theSink;
const ws = new WritableStream(theSink);
const writer = ws.getWriter();
writer.write("a");
const closePromise = writer.close();
const ws2 = new WritableStream(theSink);
const writer2 = ws2.getWriter();
const abortPromise = writer2.abort();
return Promise.all([closePromise, abortPromise]).then(undefined);
});
unitTest(function sizeShouldNotBeCalledAsMethod() {
const strategy = {
size(): number {
if (this !== undefined) {
throw new Error("size called as a method");
}
return 1;
},
};
const ws = new WritableStream({}, strategy);
const writer = ws.getWriter();
return writer.write("a");
});
unitTest(function redundantReleaseLockIsNoOp() {
const ws = new WritableStream();
const writer1 = ws.getWriter();
assertEquals(
undefined,
writer1.releaseLock(),
"releaseLock() should return undefined"
);
const writer2 = ws.getWriter();
assertEquals(
undefined,
writer1.releaseLock(),
"no-op releaseLock() should return undefined"
);
// Calling releaseLock() on writer1 should not interfere with writer2. If it did, then the ready promise would be
// rejected.
return writer2.ready;
});
unitTest(function readyPromiseShouldFireBeforeReleaseLock() {
const events: string[] = [];
const ws = new WritableStream();
const writer = ws.getWriter();
return writer.ready.then(() => {
// Force the ready promise back to a pending state.
const writerPromise = writer.write("dummy");
const readyPromise = writer.ready.catch(() => events.push("ready"));
const closedPromise = writer.closed.catch(() => events.push("closed"));
writer.releaseLock();
return Promise.all([readyPromise, closedPromise]).then(() => {
assertEquals(
events,
["ready", "closed"],
"ready promise should fire before closed promise"
);
// Stop the writer promise hanging around after the test has finished.
return Promise.all([writerPromise, ws.abort()]).then(undefined);
});
});
});
unitTest(function subclassingWritableStream() {
class Subclass extends WritableStream {
extraFunction(): boolean {
return true;
}
}
assert(
Object.getPrototypeOf(Subclass.prototype) === WritableStream.prototype,
"Subclass.prototype's prototype should be WritableStream.prototype"
);
assert(
Object.getPrototypeOf(Subclass) === WritableStream,
"Subclass's prototype should be WritableStream"
);
const sub = new Subclass();
assert(
sub instanceof WritableStream,
"Subclass object should be an instance of WritableStream"
);
assert(
sub instanceof Subclass,
"Subclass object should be an instance of Subclass"
);
const lockedGetter = Object.getOwnPropertyDescriptor(
WritableStream.prototype,
"locked"
)!.get!;
assert(
lockedGetter.call(sub) === sub.locked,
"Subclass object should pass brand check"
);
assert(
sub.extraFunction(),
"extraFunction() should be present on Subclass object"
);
});
unitTest(function lockedGetterShouldReturnTrue() {
const ws = new WritableStream();
assert(!ws.locked, "stream should not be locked");
ws.getWriter();
assert(ws.locked, "stream should be locked");
});

View file

@ -53,6 +53,8 @@ import "./request_test.ts";
import "./resources_test.ts";
import "./signal_test.ts";
import "./stat_test.ts";
import "./streams_piping_test.ts";
import "./streams_writable_test.ts";
import "./symlink_test.ts";
import "./text_encoding_test.ts";
import "./testing_test.ts";

View file

@ -20,9 +20,17 @@ export function log(...args: unknown[]): void {
}
// @internal
export function assert(cond: unknown, msg = "assert"): asserts cond {
export class AssertionError extends Error {
constructor(msg?: string) {
super(msg);
this.name = "AssertionError";
}
}
// @internal
export function assert(cond: unknown, msg = "Assertion failed."): asserts cond {
if (!cond) {
throw Error(msg);
throw new AssertionError(msg);
}
}

View file

@ -253,22 +253,6 @@ export interface Body {
text(): Promise<string>;
}
export interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}
export interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;
readonly ready: Promise<void>;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
releaseLock(): void;
write(chunk: W): Promise<void>;
}
export interface RequestInit {
body?: BodyInit | null;
cache?: RequestCache;

View file

@ -258,7 +258,7 @@ class Body
pipeThrough<T>(
_: {
writable: domTypes.WritableStream<Uint8Array>;
writable: WritableStream<Uint8Array>;
readable: ReadableStream<T>;
},
_options?: PipeOptions
@ -267,7 +267,7 @@ class Body
}
pipeTo(
_dest: domTypes.WritableStream<Uint8Array>,
_dest: WritableStream<Uint8Array>,
_options?: PipeOptions
): Promise<void> {
return notImplemented();

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,53 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { setFunctionName } from "./internals.ts";
import { customInspect } from "../console.ts";
export class CountQueuingStrategyImpl implements CountQueuingStrategy {
highWaterMark: number;
constructor({ highWaterMark }: { highWaterMark: number }) {
this.highWaterMark = highWaterMark;
}
size(): 1 {
return 1;
}
[customInspect](): string {
return `${this.constructor.name} { highWaterMark: ${String(
this.highWaterMark
)}, size: f }`;
}
}
Object.defineProperty(CountQueuingStrategyImpl.prototype, "size", {
enumerable: true,
});
setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy");
export class ByteLengthQueuingStrategyImpl
implements ByteLengthQueuingStrategy {
highWaterMark: number;
constructor({ highWaterMark }: { highWaterMark: number }) {
this.highWaterMark = highWaterMark;
}
size(chunk: ArrayBufferView): number {
return chunk.byteLength;
}
[customInspect](): string {
return `${this.constructor.name} { highWaterMark: ${String(
this.highWaterMark
)}, size: f }`;
}
}
Object.defineProperty(ByteLengthQueuingStrategyImpl.prototype, "size", {
enumerable: true,
});
setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy");

View file

@ -18,11 +18,12 @@ import {
readableStreamHasDefaultReader,
readableStreamGetNumReadRequests,
readableStreamCreateReadResult,
setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
import { assert } from "../../util.ts";
import { customInspect } from "../../web/console.ts";
import { customInspect } from "../console.ts";
export class ReadableByteStreamControllerImpl
implements ReadableByteStreamController {
@ -136,8 +137,13 @@ export class ReadableByteStreamControllerImpl
}
[customInspect](): string {
return `ReadableByteStreamController { byobRequest: ${String(
return `${this.constructor.name} { byobRequest: ${String(
this.byobRequest
)}, desiredSize: ${String(this.desiredSize)} }`;
}
}
setFunctionName(
ReadableByteStreamControllerImpl,
"ReadableByteStreamController"
);

View file

@ -6,9 +6,14 @@ import {
isReadableStream,
isReadableStreamLocked,
isUnderlyingByteSource,
isWritableStream,
isWritableStreamLocked,
makeSizeAlgorithmFromSizeFunction,
setFunctionName,
setPromiseIsHandledToTrue,
readableStreamCancel,
ReadableStreamGenericReader,
readableStreamPipeTo,
readableStreamTee,
setUpReadableByteStreamControllerFromUnderlyingSource,
setUpReadableStreamDefaultControllerFromUnderlyingSource,
@ -18,8 +23,8 @@ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_control
import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts";
import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
import * as sym from "./symbols.ts";
import { notImplemented } from "../../util.ts";
import { customInspect } from "../../web/console.ts";
import { customInspect } from "../console.ts";
import { AbortSignalImpl } from "../abort_signal.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
@ -119,80 +124,81 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
throw new RangeError(`Unsupported mode "${mode}"`);
}
pipeThrough<T>(): // {
// writable,
// readable,
// }: {
// writable: WritableStream<R>;
// readable: ReadableStream<T>;
// },
// { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
ReadableStream<T> {
return notImplemented();
// if (!isReadableStream(this)) {
// throw new TypeError("Invalid ReadableStream.");
// }
// if (!isWritableStream(writable)) {
// throw new TypeError("writable is not a valid WritableStream.");
// }
// if (!isReadableStream(readable)) {
// throw new TypeError("readable is not a valid ReadableStream.");
// }
// preventClose = Boolean(preventClose);
// preventAbort = Boolean(preventAbort);
// preventCancel = Boolean(preventCancel);
// if (signal && !(signal instanceof AbortSignalImpl)) {
// throw new TypeError("Invalid signal.");
// }
// if (isReadableStreamLocked(this)) {
// throw new TypeError("ReadableStream is locked.");
// }
// if (isWritableStreamLocked(writable)) {
// throw new TypeError("writable is locked.");
// }
// readableStreamPipeTo(
// this,
// writable,
// preventClose,
// preventAbort,
// preventCancel,
// signal,
// );
// return readable;
pipeThrough<T>(
{
writable,
readable,
}: {
writable: WritableStream<R>;
readable: ReadableStream<T>;
},
{ preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
): ReadableStream<T> {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
if (!isWritableStream(writable)) {
throw new TypeError("writable is not a valid WritableStream.");
}
if (!isReadableStream(readable)) {
throw new TypeError("readable is not a valid ReadableStream.");
}
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
if (signal && !(signal instanceof AbortSignalImpl)) {
throw new TypeError("Invalid signal.");
}
if (isReadableStreamLocked(this)) {
throw new TypeError("ReadableStream is locked.");
}
if (isWritableStreamLocked(writable)) {
throw new TypeError("writable is locked.");
}
const promise = readableStreamPipeTo(
this,
writable,
preventClose,
preventAbort,
preventCancel,
signal
);
setPromiseIsHandledToTrue(promise);
return readable;
}
pipeTo(): // dest: WritableStream<R>,
// { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
Promise<void> {
return notImplemented();
// if (!isReadableStream(this)) {
// return Promise.reject(new TypeError("Invalid ReadableStream."));
// }
// if (!isWritableStream(dest)) {
// return Promise.reject(
// new TypeError("dest is not a valid WritableStream."),
// );
// }
// preventClose = Boolean(preventClose);
// preventAbort = Boolean(preventAbort);
// preventCancel = Boolean(preventCancel);
// if (signal && !(signal instanceof AbortSignalImpl)) {
// return Promise.reject(new TypeError("Invalid signal."));
// }
// if (isReadableStreamLocked(this)) {
// return Promise.reject(new TypeError("ReadableStream is locked."));
// }
// if (isWritableStreamLocked(this)) {
// return Promise.reject(new TypeError("dest is locked."));
// }
// return readableStreamPipeTo(
// this,
// dest,
// preventClose,
// preventAbort,
// preventCancel,
// signal,
// );
pipeTo(
dest: WritableStream<R>,
{ preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
): Promise<void> {
if (!isReadableStream(this)) {
return Promise.reject(new TypeError("Invalid ReadableStream."));
}
if (!isWritableStream(dest)) {
return Promise.reject(
new TypeError("dest is not a valid WritableStream.")
);
}
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
if (signal && !(signal instanceof AbortSignalImpl)) {
return Promise.reject(new TypeError("Invalid signal."));
}
if (isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("ReadableStream is locked."));
}
if (isWritableStreamLocked(dest)) {
return Promise.reject(new TypeError("dest is locked."));
}
return readableStreamPipeTo(
this,
dest,
preventClose,
preventAbort,
preventCancel,
signal
);
}
tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] {
@ -203,7 +209,7 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
}
[customInspect](): string {
return `ReadableStream { locked: ${String(this.locked)} }`;
return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
[Symbol.asyncIterator](
@ -214,3 +220,5 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
return this.getIterator(options);
}
}
setFunctionName(ReadableStreamImpl, "ReadableStream");

View file

@ -18,10 +18,11 @@ import {
readableStreamDefaultControllerGetDesiredSize,
resetQueue,
SizeAlgorithm,
setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
import { customInspect } from "../../web/console.ts";
import { customInspect } from "../console.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamDefaultControllerImpl<R = any>
@ -113,8 +114,13 @@ export class ReadableStreamDefaultControllerImpl<R = any>
}
[customInspect](): string {
return `ReadableStreamDefaultController { desiredSize: ${String(
return `${this.constructor.name} { desiredSize: ${String(
this.desiredSize
)} }`;
}
}
setFunctionName(
ReadableStreamDefaultControllerImpl,
"ReadableStreamDefaultController"
);

View file

@ -9,10 +9,11 @@ import {
readableStreamReaderGenericCancel,
readableStreamReaderGenericInitialize,
readableStreamReaderGenericRelease,
setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
import { customInspect } from "../../web/console.ts";
import { customInspect } from "../console.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamDefaultReaderImpl<R = any>
@ -84,6 +85,8 @@ export class ReadableStreamDefaultReaderImpl<R = any>
}
[customInspect](): string {
return `ReadableStreamDefaultReader { closed: Promise }`;
return `${this.constructor.name} { closed: Promise }`;
}
}
setFunctionName(ReadableStreamDefaultReaderImpl, "ReadableStreamDefaultReader");

View file

@ -6,21 +6,32 @@
// this data from the public, therefore we will use unique symbols which are
// not available in the runtime.
export const abortAlgorithm = Symbol("abortAlgorithm");
export const abortSteps = Symbol("abortSteps");
export const asyncIteratorReader = Symbol("asyncIteratorReader");
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
export const backpressure = Symbol("backpressure");
export const byobRequest = Symbol("byobRequest");
export const cancelAlgorithm = Symbol("cancelAlgorithm");
export const cancelSteps = Symbol("cancelSteps");
export const closeAlgorithm = Symbol("closeAlgorithm");
export const closedPromise = Symbol("closedPromise");
export const closeRequest = Symbol("closeRequest");
export const closeRequested = Symbol("closeRequested");
export const controlledReadableByteStream = Symbol(
"controlledReadableByteStream"
);
export const controlledReadableStream = Symbol("controlledReadableStream");
export const controlledWritableStream = Symbol("controlledWritableStream");
export const disturbed = Symbol("disturbed");
export const errorSteps = Symbol("errorSteps");
export const forAuthorCode = Symbol("forAuthorCode");
export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
export const isFakeDetached = Symbol("isFakeDetached");
export const ownerReadableStream = Symbol("ownerReadableStream");
export const ownerWritableStream = Symbol("ownerWritableStream");
export const pendingAbortRequest = Symbol("pendingAbortRequest");
export const preventCancel = Symbol("preventCancel");
export const pullAgain = Symbol("pullAgain");
export const pullAlgorithm = Symbol("pullAlgorithm");
@ -31,8 +42,13 @@ export const queueTotalSize = Symbol("queueTotalSize");
export const readableStreamController = Symbol("readableStreamController");
export const reader = Symbol("reader");
export const readRequests = Symbol("readRequests");
export const readyPromise = Symbol("readyPromise");
export const started = Symbol("started");
export const state = Symbol("state");
export const storedError = Symbol("storedError");
export const strategyHWM = Symbol("strategyHWM");
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
export const writableStreamController = Symbol("writableStreamController");
export const writeAlgorithm = Symbol("writeAlgorithm");
export const writer = Symbol("writer");
export const writeRequests = Symbol("writeRequests");

View file

@ -0,0 +1,107 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import {
AbortRequest,
acquireWritableStreamDefaultWriter,
Deferred,
initializeWritableStream,
isWritableStream,
isWritableStreamLocked,
makeSizeAlgorithmFromSizeFunction,
setFunctionName,
setUpWritableStreamDefaultControllerFromUnderlyingSink,
writableStreamAbort,
writableStreamClose,
writableStreamCloseQueuedOrInFlight,
validateAndNormalizeHighWaterMark,
} from "./internals.ts";
import * as sym from "./symbols.ts";
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
import { customInspect } from "../console.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class WritableStreamImpl<W = any> implements WritableStream<W> {
[sym.backpressure]: boolean;
[sym.closeRequest]?: Deferred<void>;
[sym.inFlightWriteRequest]?: Required<Deferred<void>>;
[sym.inFlightCloseRequest]?: Deferred<void>;
[sym.pendingAbortRequest]?: AbortRequest;
[sym.state]: "writable" | "closed" | "erroring" | "errored";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[sym.storedError]?: any;
[sym.writableStreamController]?: WritableStreamDefaultControllerImpl<W>;
[sym.writer]?: WritableStreamDefaultWriterImpl<W>;
[sym.writeRequests]: Array<Required<Deferred<void>>>;
constructor(
underlyingSink: UnderlyingSink = {},
strategy: QueuingStrategy = {}
) {
initializeWritableStream(this);
const size = strategy.size;
let highWaterMark = strategy.highWaterMark ?? 1;
const { type } = underlyingSink;
if (type !== undefined) {
throw new RangeError(`Sink type of "${String(type)}" not supported.`);
}
const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
setUpWritableStreamDefaultControllerFromUnderlyingSink(
this,
underlyingSink,
highWaterMark,
sizeAlgorithm
);
}
get locked(): boolean {
if (!isWritableStream(this)) {
throw new TypeError("Invalid WritableStream.");
}
return isWritableStreamLocked(this);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
abort(reason: any): Promise<void> {
if (!isWritableStream(this)) {
return Promise.reject(new TypeError("Invalid WritableStream."));
}
if (isWritableStreamLocked(this)) {
return Promise.reject(
new TypeError("Cannot abort a locked WritableStream.")
);
}
return writableStreamAbort(this, reason);
}
close(): Promise<void> {
if (!isWritableStream(this)) {
return Promise.reject(new TypeError("Invalid WritableStream."));
}
if (isWritableStreamLocked(this)) {
return Promise.reject(
new TypeError("Cannot abort a locked WritableStream.")
);
}
if (writableStreamCloseQueuedOrInFlight(this)) {
return Promise.reject(
new TypeError("Cannot close an already closing WritableStream.")
);
}
return writableStreamClose(this);
}
getWriter(): WritableStreamDefaultWriter<W> {
if (!isWritableStream(this)) {
throw new TypeError("Invalid WritableStream.");
}
return acquireWritableStreamDefaultWriter(this);
}
[customInspect](): string {
return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
}
setFunctionName(WritableStreamImpl, "WritableStream");

View file

@ -0,0 +1,68 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import {
AbortAlgorithm,
CloseAlgorithm,
isWritableStreamDefaultController,
Pair,
resetQueue,
setFunctionName,
SizeAlgorithm,
WriteAlgorithm,
writableStreamDefaultControllerClearAlgorithms,
writableStreamDefaultControllerError,
} from "./internals.ts";
import * as sym from "./symbols.ts";
import { WritableStreamImpl } from "./writable_stream.ts";
import { customInspect } from "../console.ts";
export class WritableStreamDefaultControllerImpl<W>
implements WritableStreamDefaultController {
[sym.abortAlgorithm]: AbortAlgorithm;
[sym.closeAlgorithm]: CloseAlgorithm;
[sym.controlledWritableStream]: WritableStreamImpl;
[sym.queue]: Array<Pair<{ chunk: W } | "close">>;
[sym.queueTotalSize]: number;
[sym.started]: boolean;
[sym.strategyHWM]: number;
[sym.strategySizeAlgorithm]: SizeAlgorithm<W>;
[sym.writeAlgorithm]: WriteAlgorithm<W>;
private constructor() {
throw new TypeError(
"WritableStreamDefaultController's constructor cannot be called."
);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
error(e: any): void {
if (!isWritableStreamDefaultController(this)) {
throw new TypeError("Invalid WritableStreamDefaultController.");
}
const state = this[sym.controlledWritableStream][sym.state];
if (state !== "writable") {
return;
}
writableStreamDefaultControllerError(this, e);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[sym.abortSteps](reason: any): PromiseLike<void> {
const result = this[sym.abortAlgorithm](reason);
writableStreamDefaultControllerClearAlgorithms(this);
return result;
}
[sym.errorSteps](): void {
resetQueue(this);
}
[customInspect](): string {
return `${this.constructor.name} { }`;
}
}
setFunctionName(
WritableStreamDefaultControllerImpl,
"WritableStreamDefaultController"
);

View file

@ -0,0 +1,164 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import {
Deferred,
getDeferred,
isWritableStream,
isWritableStreamDefaultWriter,
isWritableStreamLocked,
setFunctionName,
setPromiseIsHandledToTrue,
writableStreamCloseQueuedOrInFlight,
writableStreamDefaultWriterAbort,
writableStreamDefaultWriterClose,
writableStreamDefaultWriterGetDesiredSize,
writableStreamDefaultWriterRelease,
writableStreamDefaultWriterWrite,
} from "./internals.ts";
import * as sym from "./symbols.ts";
import { WritableStreamImpl } from "./writable_stream.ts";
import { customInspect } from "../console.ts";
import { assert } from "../../util.ts";
export class WritableStreamDefaultWriterImpl<W>
implements WritableStreamDefaultWriter<W> {
[sym.closedPromise]: Deferred<void>;
[sym.ownerWritableStream]: WritableStreamImpl<W>;
[sym.readyPromise]: Deferred<void>;
constructor(stream: WritableStreamImpl<W>) {
if (!isWritableStream(stream)) {
throw new TypeError("Invalid stream.");
}
if (isWritableStreamLocked(stream)) {
throw new TypeError("Cannot create a reader for a locked stream.");
}
this[sym.ownerWritableStream] = stream;
stream[sym.writer] = this;
const state = stream[sym.state];
if (state === "writable") {
if (
!writableStreamCloseQueuedOrInFlight(stream) &&
stream[sym.backpressure]
) {
this[sym.readyPromise] = getDeferred();
} else {
this[sym.readyPromise] = { promise: Promise.resolve() };
}
this[sym.closedPromise] = getDeferred();
} else if (state === "erroring") {
this[sym.readyPromise] = {
promise: Promise.reject(stream[sym.storedError]),
};
setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
this[sym.closedPromise] = getDeferred();
} else if (state === "closed") {
this[sym.readyPromise] = { promise: Promise.resolve() };
this[sym.closedPromise] = { promise: Promise.resolve() };
} else {
assert(state === "errored");
const storedError = stream[sym.storedError];
this[sym.readyPromise] = { promise: Promise.reject(storedError) };
setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
this[sym.closedPromise] = { promise: Promise.reject(storedError) };
setPromiseIsHandledToTrue(this[sym.closedPromise].promise);
}
}
get closed(): Promise<void> {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter.")
);
}
return this[sym.closedPromise].promise;
}
get desiredSize(): number | null {
if (!isWritableStreamDefaultWriter(this)) {
throw new TypeError("Invalid WritableStreamDefaultWriter.");
}
if (!this[sym.ownerWritableStream]) {
throw new TypeError("WritableStreamDefaultWriter has no owner.");
}
return writableStreamDefaultWriterGetDesiredSize(this);
}
get ready(): Promise<void> {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter.")
);
}
return this[sym.readyPromise].promise;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
abort(reason: any): Promise<void> {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter.")
);
}
if (!this[sym.ownerWritableStream]) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner.")
);
}
return writableStreamDefaultWriterAbort(this, reason);
}
close(): Promise<void> {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter.")
);
}
const stream = this[sym.ownerWritableStream];
if (!stream) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner.")
);
}
if (writableStreamCloseQueuedOrInFlight(stream)) {
Promise.reject(
new TypeError("Stream is in an invalid state to be closed.")
);
}
return writableStreamDefaultWriterClose(this);
}
releaseLock(): void {
if (!isWritableStreamDefaultWriter(this)) {
throw new TypeError("Invalid WritableStreamDefaultWriter.");
}
const stream = this[sym.ownerWritableStream];
if (!stream) {
return;
}
assert(stream[sym.writer]);
writableStreamDefaultWriterRelease(this);
}
write(chunk: W): Promise<void> {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter.")
);
}
if (!this[sym.ownerWritableStream]) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner.")
);
}
return writableStreamDefaultWriterWrite(this, chunk);
}
[customInspect](): string {
return `${this.constructor.name} { closed: Promise, desiredSize: ${String(
this.desiredSize
)}, ready: Promise }`;
}
}
setFunctionName(WritableStreamDefaultWriterImpl, "WritableStreamDefaultWriter");