streams - add highWaterMark option

This commit is contained in:
Benjamin Pasero 2020-06-08 12:44:32 +02:00
parent a2983662de
commit 0c73b69495
4 changed files with 156 additions and 9 deletions

View file

@ -214,6 +214,6 @@ export function streamToBufferReadableStream(stream: streams.ReadableStreamEvent
return streams.transform<Uint8Array | string, VSBuffer>(stream, { data: data => typeof data === 'string' ? VSBuffer.fromString(data) : VSBuffer.wrap(data) }, chunks => VSBuffer.concat(chunks));
}
export function newWriteableBufferStream(): streams.WriteableStream<VSBuffer> {
return streams.newWriteableStream<VSBuffer>(chunks => VSBuffer.concat(chunks));
export function newWriteableBufferStream(options?: streams.WriteableStreamOptions): streams.WriteableStream<VSBuffer> {
return streams.newWriteableStream<VSBuffer>(chunks => VSBuffer.concat(chunks), options);
}

View file

@ -49,6 +49,11 @@ export interface ReadableStream<T> extends ReadableStreamEvents<T> {
* Destroys the stream and stops emitting any event.
*/
destroy(): void;
/**
* Allows to remove a listener that was previously added.
*/
removeListener(event: string, callback: Function): void;
}
/**
@ -74,8 +79,14 @@ export interface WriteableStream<T> extends ReadableStream<T> {
* Writing data to the stream will trigger the on('data')
* event listener if the stream is flowing and buffer the
* data otherwise until the stream is flowing.
*
* If a `highWaterMark` is configured and writing to the
* stream reaches this mark, a promise will be returned
* that should be awaited on before writing more data.
* Otherwise there is a risk of buffering a large number
* of data chunks without consumer.
*/
write(data: T): void;
write(data: T): void | Promise<void>;
/**
* Signals an error to the consumer of the stream via the
@ -118,8 +129,18 @@ export interface ITransformer<Original, Transformed> {
error?: IErrorTransformer;
}
export function newWriteableStream<T>(reducer: IReducer<T>): WriteableStream<T> {
return new WriteableStreamImpl<T>(reducer);
export function newWriteableStream<T>(reducer: IReducer<T>, options?: WriteableStreamOptions): WriteableStream<T> {
return new WriteableStreamImpl<T>(reducer, options);
}
export interface WriteableStreamOptions {
/**
* The number of objects to buffer before WriteableStream#write()
* signals back that the buffer is full. Can be used to reduce
* the memory pressure when the stream is not flowing.
*/
highWaterMark?: number;
}
class WriteableStreamImpl<T> implements WriteableStream<T> {
@ -141,7 +162,9 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
end: [] as { (): void }[]
};
constructor(private reducer: IReducer<T>) { }
private readonly pendingWritePromises: Function[] = [];
constructor(private reducer: IReducer<T>, private options?: WriteableStreamOptions) { }
pause(): void {
if (this.state.destroyed) {
@ -166,7 +189,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
}
}
write(data: T): void {
write(data: T): void | Promise<void> {
if (this.state.destroyed) {
return;
}
@ -179,6 +202,11 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
// not yet flowing: buffer data until flowing
else {
this.buffer.data.push(data);
// highWaterMark: if configured, signal back when buffer reached limits
if (typeof this.options?.highWaterMark === 'number' && this.buffer.data.length > this.options.highWaterMark) {
return new Promise(resolve => this.pendingWritePromises.push(resolve));
}
}
}
@ -267,6 +295,35 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
}
}
removeListener(event: string, callback: Function): void {
if (this.state.destroyed) {
return;
}
let listeners: unknown[] | undefined = undefined;
switch (event) {
case 'data':
listeners = this.listeners.data;
break;
case 'end':
listeners = this.listeners.end;
break;
case 'error':
listeners = this.listeners.error;
break;
}
if (listeners) {
const index = listeners.indexOf(callback);
if (index >= 0) {
listeners.splice(index, 1);
}
}
}
private flowData(): void {
if (this.buffer.data.length > 0) {
const fullDataBuffer = this.reducer(this.buffer.data);
@ -274,6 +331,11 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
this.listeners.data.forEach(listener => listener(fullDataBuffer));
this.buffer.data.length = 0;
// When the buffer is empty, resolve all pending writers
const pendingWritePromises = [...this.pendingWritePromises];
this.pendingWritePromises.length = 0;
pendingWritePromises.forEach(pendingWritePromise => pendingWritePromise());
}
}
@ -308,6 +370,8 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
this.listeners.data.length = 0;
this.listeners.error.length = 0;
this.listeners.end.length = 0;
this.pendingWritePromises.length = 0;
}
}
}

View file

@ -5,6 +5,7 @@
import * as assert from 'assert';
import { isReadableStream, newWriteableStream, Readable, consumeReadable, consumeReadableWithLimit, consumeStream, ReadableStream, toStream, toReadable, transform, consumeStreamWithLimit } from 'vs/base/common/stream';
import { timeout } from 'vs/base/common/async';
suite('Stream', () => {
@ -13,7 +14,7 @@ suite('Stream', () => {
assert.ok(isReadableStream(newWriteableStream(d => d)));
});
test('WriteableStream', () => {
test('WriteableStream - basics', () => {
const stream = newWriteableStream<string>(strings => strings.join());
let error = false;
@ -66,6 +67,87 @@ suite('Stream', () => {
assert.equal(chunks.length, 4);
});
test('WriteableStream - removeListener', () => {
const stream = newWriteableStream<string>(strings => strings.join());
let error = false;
const errorListener = (e: Error) => {
error = true;
};
stream.on('error', errorListener);
let end = false;
const endListener = () => {
end = true;
};
stream.on('end', endListener);
let data = false;
const dataListener = () => {
data = true;
};
stream.on('data', dataListener);
stream.write('Hello');
assert.equal(data, true);
data = false;
stream.removeListener('data', dataListener);
stream.write('World');
assert.equal(data, false);
stream.error(new Error());
assert.equal(error, true);
error = false;
stream.removeListener('error', errorListener);
stream.error(new Error());
assert.equal(error, false);
});
test('WriteableStream - highWaterMark', async () => {
const stream = newWriteableStream<string>(strings => strings.join(), { highWaterMark: 3 });
let res = stream.write('1');
assert.ok(!res);
res = stream.write('2');
assert.ok(!res);
res = stream.write('3');
assert.ok(!res);
let promise1 = stream.write('4');
assert.ok(promise1 instanceof Promise);
let promise2 = stream.write('5');
assert.ok(promise2 instanceof Promise);
let drained1 = false;
(async () => {
await promise1;
drained1 = true;
})();
let drained2 = false;
(async () => {
await promise2;
drained2 = true;
})();
let data: string | undefined = undefined;
stream.on('data', chunk => {
data = chunk;
});
assert.ok(data);
await timeout(0);
assert.equal(drained1, true);
assert.equal(drained2, true);
});
test('consumeReadable', () => {
const readable = arrayToReadable(['1', '2', '3', '4', '5']);
const consumed = consumeReadable(readable, strings => strings.join());

View file

@ -752,7 +752,7 @@ export class TestFileService implements IFileService {
this.lastReadFileUri = resource;
return Promise.resolve({
resource: resource,
resource,
value: {
on: (event: string, callback: Function): void => {
if (event === 'data') {
@ -762,6 +762,7 @@ export class TestFileService implements IFileService {
callback();
}
},
removeListener: () => { },
resume: () => { },
pause: () => { },
destroy: () => { }