deno/cli/tests/unit/streams_transform_test.ts
2020-05-20 17:52:51 -04:00

563 lines
15 KiB
TypeScript

// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import {
unitTest,
assert,
assertEquals,
assertNotEquals,
assertThrows,
} from "./test_util.ts";
function delay(seconds: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, seconds);
});
}
function readableStreamToArray<R>(
readable: { getReader(): ReadableStreamDefaultReader<R> },
reader?: ReadableStreamDefaultReader<R>
): Promise<R[]> {
if (reader === undefined) {
reader = readable.getReader();
}
const chunks: R[] = [];
return pump();
function pump(): Promise<R[]> {
return reader!.read().then((result) => {
if (result.done) {
return chunks;
}
chunks.push(result.value);
return pump();
});
}
}
unitTest(function transformStreamConstructedWithTransformFunction() {
new TransformStream({ transform(): void {} });
});
unitTest(function transformStreamConstructedNoTransform() {
new TransformStream();
new TransformStream({});
});
unitTest(function transformStreamIntstancesHaveProperProperties() {
const ts = new TransformStream({ transform(): void {} });
const proto = Object.getPrototypeOf(ts);
const writableStream = Object.getOwnPropertyDescriptor(proto, "writable");
assert(writableStream !== undefined, "it has a writable property");
assert(!writableStream.enumerable, "writable should be non-enumerable");
assertEquals(
typeof writableStream.get,
"function",
"writable should have a getter"
);
assertEquals(
writableStream.set,
undefined,
"writable should not have a setter"
);
assert(writableStream.configurable, "writable should be configurable");
assert(
ts.writable instanceof WritableStream,
"writable is an instance of WritableStream"
);
assert(
WritableStream.prototype.getWriter.call(ts.writable),
"writable should pass WritableStream brand check"
);
const readableStream = Object.getOwnPropertyDescriptor(proto, "readable");
assert(readableStream !== undefined, "it has a readable property");
assert(!readableStream.enumerable, "readable should be non-enumerable");
assertEquals(
typeof readableStream.get,
"function",
"readable should have a getter"
);
assertEquals(
readableStream.set,
undefined,
"readable should not have a setter"
);
assert(readableStream.configurable, "readable should be configurable");
assert(
ts.readable instanceof ReadableStream,
"readable is an instance of ReadableStream"
);
assertNotEquals(
ReadableStream.prototype.getReader.call(ts.readable),
undefined,
"readable should pass ReadableStream brand check"
);
});
unitTest(function transformStreamWritableStartsAsWritable() {
const ts = new TransformStream({ transform(): void {} });
const writer = ts.writable.getWriter();
assertEquals(writer.desiredSize, 1, "writer.desiredSize should be 1");
});
unitTest(async function transformStreamReadableCanReadOutOfWritable() {
const ts = new TransformStream();
const writer = ts.writable.getWriter();
writer.write("a");
assertEquals(
writer.desiredSize,
0,
"writer.desiredSize should be 0 after write()"
);
const result = await ts.readable.getReader().read();
assertEquals(
result.value,
"a",
"result from reading the readable is the same as was written to writable"
);
assert(!result.done, "stream should not be done");
await delay(0);
assert(writer.desiredSize === 1, "desiredSize should be 1 again");
});
unitTest(async function transformStreamCanReadWhatIsWritten() {
let c: TransformStreamDefaultController;
const ts = new TransformStream({
start(controller: TransformStreamDefaultController): void {
c = controller;
},
transform(chunk: string): void {
c.enqueue(chunk.toUpperCase());
},
});
const writer = ts.writable.getWriter();
writer.write("a");
const result = await ts.readable.getReader().read();
assertEquals(
result.value,
"A",
"result from reading the readable is the transformation of what was written to writable"
);
assert(!result.done, "stream should not be done");
});
unitTest(async function transformStreamCanReadBothChunks() {
let c: TransformStreamDefaultController;
const ts = new TransformStream({
start(controller: TransformStreamDefaultController): void {
c = controller;
},
transform(chunk: string): void {
c.enqueue(chunk.toUpperCase());
c.enqueue(chunk.toUpperCase());
},
});
const writer = ts.writable.getWriter();
writer.write("a");
const reader = ts.readable.getReader();
const result1 = await reader.read();
assertEquals(
result1.value,
"A",
"the first chunk read is the transformation of the single chunk written"
);
assert(!result1.done, "stream should not be done");
const result2 = await reader.read();
assertEquals(
result2.value,
"A",
"the second chunk read is also the transformation of the single chunk written"
);
assert(!result2.done, "stream should not be done");
});
unitTest(async function transformStreamCanReadWhatIsWritten() {
let c: TransformStreamDefaultController;
const ts = new TransformStream({
start(controller: TransformStreamDefaultController): void {
c = controller;
},
transform(chunk: string): Promise<void> {
return delay(0).then(() => c.enqueue(chunk.toUpperCase()));
},
});
const writer = ts.writable.getWriter();
writer.write("a");
const result = await ts.readable.getReader().read();
assertEquals(
result.value,
"A",
"result from reading the readable is the transformation of what was written to writable"
);
assert(!result.done, "stream should not be done");
});
unitTest(async function transformStreamAsyncReadMultipleChunks() {
let doSecondEnqueue: () => void;
let returnFromTransform: () => void;
const ts = new TransformStream({
transform(
chunk: string,
controller: TransformStreamDefaultController
): Promise<void> {
delay(0).then(() => controller.enqueue(chunk.toUpperCase()));
doSecondEnqueue = (): void => controller.enqueue(chunk.toUpperCase());
return new Promise((resolve) => {
returnFromTransform = resolve;
});
},
});
const reader = ts.readable.getReader();
const writer = ts.writable.getWriter();
writer.write("a");
const result1 = await reader.read();
assertEquals(
result1.value,
"A",
"the first chunk read is the transformation of the single chunk written"
);
assert(!result1.done, "stream should not be done");
doSecondEnqueue!();
const result2 = await reader.read();
assertEquals(
result2.value,
"A",
"the second chunk read is also the transformation of the single chunk written"
);
assert(!result2.done, "stream should not be done");
returnFromTransform!();
});
unitTest(function transformStreamClosingWriteClosesRead() {
const ts = new TransformStream({ transform(): void {} });
const writer = ts.writable.getWriter();
writer.close();
return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
undefined
);
});
unitTest(async function transformStreamCloseWaitAwaitsTransforms() {
let transformResolve: () => void;
const transformPromise = new Promise<void>((resolve) => {
transformResolve = resolve;
});
const ts = new TransformStream(
{
transform(): Promise<void> {
return transformPromise;
},
},
undefined,
{ highWaterMark: 1 }
);
const writer = ts.writable.getWriter();
writer.write("a");
writer.close();
let rsClosed = false;
ts.readable.getReader().closed.then(() => {
rsClosed = true;
});
await delay(0);
assertEquals(rsClosed, false, "readable is not closed after a tick");
transformResolve!();
await writer.closed;
// TODO: Is this expectation correct?
assertEquals(rsClosed, true, "readable is closed at that point");
});
unitTest(async function transformStreamCloseWriteAfterSyncEnqueues() {
let c: TransformStreamDefaultController<string>;
const ts = new TransformStream<string, string>({
start(controller: TransformStreamDefaultController): void {
c = controller;
},
transform(): Promise<void> {
c.enqueue("x");
c.enqueue("y");
return delay(0);
},
});
const writer = ts.writable.getWriter();
writer.write("a");
writer.close();
const readableChunks = readableStreamToArray(ts.readable);
await writer.closed;
const chunks = await readableChunks;
assertEquals(
chunks,
["x", "y"],
"both enqueued chunks can be read from the readable"
);
});
unitTest(async function transformStreamWritableCloseAsyncAfterAsyncEnqueues() {
let c: TransformStreamDefaultController<string>;
const ts = new TransformStream<string, string>({
start(controller: TransformStreamDefaultController<string>): void {
c = controller;
},
transform(): Promise<void> {
return delay(0)
.then(() => c.enqueue("x"))
.then(() => c.enqueue("y"))
.then(() => delay(0));
},
});
const writer = ts.writable.getWriter();
writer.write("a");
writer.close();
const readableChunks = readableStreamToArray(ts.readable);
await writer.closed;
const chunks = await readableChunks;
assertEquals(
chunks,
["x", "y"],
"both enqueued chunks can be read from the readable"
);
});
unitTest(async function transformStreamTransformerMethodsCalledAsMethods() {
let c: TransformStreamDefaultController<string>;
const transformer = {
suffix: "-suffix",
start(controller: TransformStreamDefaultController<string>): void {
c = controller;
c.enqueue("start" + this.suffix);
},
transform(chunk: string): void {
c.enqueue(chunk + this.suffix);
},
flush(): void {
c.enqueue("flushed" + this.suffix);
},
};
const ts = new TransformStream(transformer);
const writer = ts.writable.getWriter();
writer.write("a");
writer.close();
const readableChunks = readableStreamToArray(ts.readable);
await writer.closed;
const chunks = await readableChunks;
assertEquals(
chunks,
["start-suffix", "a-suffix", "flushed-suffix"],
"all enqueued chunks have suffixes"
);
});
unitTest(async function transformStreamMethodsShouldNotBeAppliedOrCalled() {
function functionWithOverloads(): void {}
functionWithOverloads.apply = (): void => {
throw new Error("apply() should not be called");
};
functionWithOverloads.call = (): void => {
throw new Error("call() should not be called");
};
const ts = new TransformStream({
start: functionWithOverloads,
transform: functionWithOverloads,
flush: functionWithOverloads,
});
const writer = ts.writable.getWriter();
writer.write("a");
writer.close();
await readableStreamToArray(ts.readable);
});
unitTest(async function transformStreamCallTransformSync() {
let transformCalled = false;
const ts = new TransformStream(
{
transform(): void {
transformCalled = true;
},
},
undefined,
{ highWaterMark: Infinity }
);
// transform() is only called synchronously when there is no backpressure and
// all microtasks have run.
await delay(0);
const writePromise = ts.writable.getWriter().write(undefined);
assert(transformCalled, "transform() should have been called");
await writePromise;
});
unitTest(function transformStreamCloseWriteCloesesReadWithNoChunks() {
const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
const writer = ts.writable.getWriter();
writer.close();
return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
undefined
);
});
unitTest(function transformStreamEnqueueThrowsAfterTerminate() {
new TransformStream({
start(controller: TransformStreamDefaultController): void {
controller.terminate();
assertThrows(() => {
controller.enqueue(undefined);
}, TypeError);
},
});
});
unitTest(function transformStreamEnqueueThrowsAfterReadableCancel() {
let controller: TransformStreamDefaultController;
const ts = new TransformStream({
start(c: TransformStreamDefaultController): void {
controller = c;
},
});
const cancelPromise = ts.readable.cancel();
assertThrows(
() => controller.enqueue(undefined),
TypeError,
undefined,
"enqueue should throw"
);
return cancelPromise;
});
unitTest(function transformStreamSecondTerminateNoOp() {
new TransformStream({
start(controller: TransformStreamDefaultController): void {
controller.terminate();
controller.terminate();
},
});
});
unitTest(async function transformStreamTerminateAfterReadableCancelIsNoop() {
let controller: TransformStreamDefaultController;
const ts = new TransformStream({
start(c: TransformStreamDefaultController): void {
controller = c;
},
});
const cancelReason = { name: "cancelReason" };
const cancelPromise = ts.readable.cancel(cancelReason);
controller!.terminate();
await cancelPromise;
try {
await ts.writable.getWriter().closed;
} catch (e) {
assert(e === cancelReason);
return;
}
throw new Error("closed should have rejected");
});
unitTest(async function transformStreamStartCalledOnce() {
let calls = 0;
new TransformStream({
start(): void {
++calls;
},
});
await delay(0);
assertEquals(calls, 1, "start() should have been called exactly once");
});
unitTest(function transformStreamReadableTypeThrows() {
assertThrows(
// eslint-disable-next-line
() => new TransformStream({ readableType: "bytes" as any }),
RangeError,
undefined,
"constructor should throw"
);
});
unitTest(function transformStreamWirtableTypeThrows() {
assertThrows(
// eslint-disable-next-line
() => new TransformStream({ writableType: "bytes" as any }),
RangeError,
undefined,
"constructor should throw"
);
});
unitTest(function transformStreamSubclassable() {
class Subclass extends TransformStream {
extraFunction(): boolean {
return true;
}
}
assert(
Object.getPrototypeOf(Subclass.prototype) === TransformStream.prototype,
"Subclass.prototype's prototype should be TransformStream.prototype"
);
assert(
Object.getPrototypeOf(Subclass) === TransformStream,
"Subclass's prototype should be TransformStream"
);
const sub = new Subclass();
assert(
sub instanceof TransformStream,
"Subclass object should be an instance of TransformStream"
);
assert(
sub instanceof Subclass,
"Subclass object should be an instance of Subclass"
);
const readableGetter = Object.getOwnPropertyDescriptor(
TransformStream.prototype,
"readable"
)!.get;
assert(
readableGetter!.call(sub) === sub.readable,
"Subclass object should pass brand check"
);
assert(
sub.extraFunction(),
"extraFunction() should be present on Subclass object"
);
});