Revert "[io] Improve the performance of the IOSink returned by openWrite by writing eagerly and accumulating small writes."

This reverts commit c2bdda63f5.

Reason for revert: We are seeing some windows failures and also 
Bad state: /tmp/file_test_NOYTEV/foo is bound to a stream
** Note that stack traces are truncated. See http://go/dart-chain-stack-traces for information on how to view the full stack trace **
dart:io                                                          _RandomAccessFileIOSync.flush


Original change's description:
> [io] Improve the performance of the IOSink returned by `openWrite` by writing eagerly and accumulating small writes.
>
> Benchmarks (benchmarks/FileIOSink/dart/FileIOSink.dart):
>
>  @before:
>
>   FileIOSink.Add.ManySmall(RunTime): 2341597.0 us.
>   FileIOSink.Add.OneLarge(RunTime): 111.06925927974774 us.
>   FileIOSink.Add.AlternatingAddSize(RunTime): 105.65958788898234 us.
>
>  @after
>
>   FileIOSink.Add.ManySmall(RunTime): 5007.1125 us.
>   FileIOSink.Add.OneLarge(RunTime): 98.23492468475541 us.
>   FileIOSink.Add.AlternatingAddSize(RunTime): 91.77411527720551 us.
>
> So the relative performance changes are:
>
>  FileIOSink.Add.ManySmall(RunTime): 0.00213x (MUCH faster)
>  FileIOSink.Add.OneLarge(RunTime): 0.884x (slight faster - noise?)
>  FileIOSink.Add.AlternatingAddSize(RunTime): 0.868x (slightly faster - noise?)
>
> https://golem.corp.goog/Revision?repository=dart&revision=102771&patch=17842
>
> Change-Id: Ic73f33299a570096dd05f254982f556767559966
> Bug:https://github.com/dart-lang/sdk/issues/32874
> Tested: unit tests
> CoreLibraryReviewExempt: Performance-only fix for file writes in the VM.
> Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/285420
> Commit-Queue: Brian Quinlan <bquinlan@google.com>
> Reviewed-by: Alexander Aprelev <aam@google.com>

Bug: https://github.com/dart-lang/sdk/issues/32874
Change-Id: I6e16cfb460a9a6b16d7a63cb02d46fab9fc6244d
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/288606
Bot-Commit: Rubber Stamper <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Siva Annamalai <asiva@google.com>
Reviewed-by: Siva Annamalai <asiva@google.com>
This commit is contained in:
Siva Annamalai 2023-03-14 04:54:17 +00:00 committed by Commit Queue
parent 0edaaca2be
commit c17e55a1de
6 changed files with 29 additions and 1269 deletions

View file

@ -2622,7 +2622,7 @@ class ResourceHandle {
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
_WritePipe wp = pipe as _WritePipe;
return ResourceHandle.fromFile(wp._randomAccessFile);
return ResourceHandle.fromFile(wp._file);
}
}

View file

@ -454,9 +454,6 @@ abstract class File implements FileSystemEntity {
/// Creates a new independent [IOSink] for the file.
///
/// The [IOSink] may accumulate added data in memory until it is closed or
/// flushed.
///
/// The [IOSink] must be closed when no longer used, to free
/// system resources.
///

View file

@ -186,173 +186,22 @@ class _FileStream extends Stream<List<int>> {
}
}
/// An adapter that transforms a [RandomAccessFile] into an [IOSink].
///
/// The adapter will eagerly write the data provided by [add] to the
/// [RandomAccessFile]. Since [add] is expected to return immediately and does
/// not return a [Future], any errors encountered during the eager write (e.g.
/// a file IO error) will be saved and thrown when [flush] or [close] are
/// called. If the event loop queue is not being serviced then only the first
/// call to [add] will result in a write.
///
/// The adapter coalesces small [add]s into a single write. This prevents the
/// pathological case where [RandomAccessFile.writeFrom] is called once per
/// byte [add]ed.
class _RandomAccessFileIOSync implements IOSink {
static const _writeBufferSize = 64 * 1024;
// A queue of write calls to make.
Queue<List<int>> _buffers = Queue();
class _FileStreamConsumer extends StreamConsumer<List<int>> {
File? _file;
Future<RandomAccessFile> _openFuture;
final Completer<File?> _doneCompleter = new Completer();
// Tracks whether [_writeBuffers] is currently running.
Completer<File?> _pendingWriteBuffers = Completer()..complete();
bool _isClosed = false;
bool _isBound = false;
Object? _error;
StackTrace? _stackTrace;
_RandomAccessFileIOSync(File file, FileMode mode, Encoding encoding)
_FileStreamConsumer(File file, FileMode mode)
: _file = file,
_openFuture = file.open(mode: mode),
encoding = encoding;
_openFuture = file.open(mode: mode);
_RandomAccessFileIOSync.fromStdio(int fd, Encoding encoding)
: _openFuture = new Future.value(_File._openStdioSync(fd)),
encoding = encoding;
_FileStreamConsumer.fromStdio(int fd)
: _openFuture = new Future.value(_File._openStdioSync(fd));
_RandomAccessFileIOSync.fromRandomAccessFile(
RandomAccessFile f, Encoding encoding)
: _openFuture = Future.value(f),
encoding = encoding;
_FileStreamConsumer.fromRandomAccessFile(RandomAccessFile f)
: _openFuture = Future.value(f);
Encoding encoding;
/// Returns whether the next item in [_buffers] will fit in the current write
/// buffer.
@pragma('vm:prefer-inline')
bool _willNextBufferFit(int bufferOffset) =>
_buffers.isNotEmpty &&
bufferOffset + _buffers.first.length < _writeBufferSize;
/// Accumulate as much data as possible into a single buffer and then make
/// a single [RandomAccessFile.writeFrom] call.
Future<RandomAccessFile> _smallWrite(
RandomAccessFile openFile, List<int> initialData) {
final buffer = Uint8List(_writeBufferSize);
var bufferOffset = initialData.length;
buffer.setRange(0, bufferOffset, initialData);
do {
final data = _buffers.removeFirst();
buffer.setRange(bufferOffset, bufferOffset + data.length, data);
bufferOffset += data.length;
} while (_willNextBufferFit(bufferOffset));
return openFile.writeFrom(buffer, 0, bufferOffset);
}
/// Write all of the data in [_buffers].
Future<File?> _writeBuffers() async {
assert(_pendingWriteBuffers.isCompleted);
assert(_error == null);
if (_buffers.isEmpty) {
return Future.value(_file);
}
_pendingWriteBuffers = Completer();
try {
final openFile = await _openFuture;
while (_buffers.isNotEmpty) {
final data = _buffers.removeFirst();
if (_willNextBufferFit(data.length)) {
await _smallWrite(openFile, data);
} else {
await openFile.writeFrom(data);
}
}
} on FileSystemException catch (e, st) {
if (_error == null) {
_error = e;
_stackTrace = st;
}
_pendingWriteBuffers.completeError(e, st);
return _pendingWriteBuffers.future;
}
_pendingWriteBuffers.complete(_file);
return _pendingWriteBuffers.future;
}
void _add(List<int> data) {
if (_error != null) {
return;
}
if (data.isEmpty) {
return;
}
_buffers.add(data);
if (_pendingWriteBuffers.isCompleted) {
// If [_writeBuffers] is not currently running then run it but don't
// wait for this results.
_writeBuffers();
}
}
void add(List<int> data) {
if (_isClosed) {
throw StateError("${_file?.path ?? "IOSink"} is closed");
}
if (_isBound) {
throw StateError("${_file?.path ?? "IOSink"} is bound to a stream");
}
_add(data);
}
void addError(Object error, [StackTrace? stackTrace]) {
if (_isClosed) {
throw StateError("${_file?.path ?? "IOSink"} is closed");
}
if (_isBound) {
throw StateError("${_file?.path ?? "IOSink"} is bound to a stream");
}
_error = error;
_stackTrace = stackTrace;
_doneCompleter.completeError(error, stackTrace);
}
Future<File?> addStream(Stream<List<int>> stream) async {
if (_isClosed) {
throw StateError("${_file?.path ?? "IOSink"} is closed");
}
if (_isBound) {
throw StateError("${_file?.path ?? "IOSink"} is bound to a stream");
}
if (_error != null) return done;
_isBound = true;
await _pendingWriteBuffers.future;
// Write each [stream] item as it appears to work with code that expects
// the data to be fully written when the Stream is done even if
// [addStream] has not completed.
// ```dart
// final sink = file.openWrite();
// final controller = StreamController<List<int>>();
// sink.addStream(controller.stream); // Not awaited!
// controller.add(<int>[1, 2, 3, 4, 5]);
// await controller.close();
// ```
Completer<File?> completer = new Completer<File?>();
Future<File?> addStream(Stream<List<int>> stream) {
Completer<File?> completer = new Completer<File?>.sync();
_openFuture.then((openedFile) {
late StreamSubscription<List<int>> _subscription;
void error(e, StackTrace stackTrace) {
@ -374,95 +223,11 @@ class _RandomAccessFileIOSync implements IOSink {
completer.complete(_file);
}, onError: error, cancelOnError: true);
}).catchError(completer.completeError);
return completer.future.whenComplete(() => _isBound = false);
return completer.future;
}
Future<File?> close() {
if (_isClosed) {
return _doneCompleter.future;
}
if (_isBound) {
throw StateError("${_file?.path ?? "IOSink"} is bound to a stream");
}
_isClosed = true;
_openFuture.then((file) {
_flush().then(
(_) => file.close().then((_) => _doneCompleter.complete(_file),
onError: _doneCompleter.completeError),
// Attempt to close the file even if _flush() fails.
onError: (e, st) => file.close().then(
(_) => _doneCompleter.completeError(e, st),
onError: (_) => _doneCompleter.completeError(e, st)));
}, onError: _doneCompleter.completeError);
// The [Future] returned by [close] is documented to be identical to the
// [Future] returned by [done].
return _doneCompleter.future;
}
Future<File?> get done => _doneCompleter.future;
Future<File?> _flush() {
if (_error != null) {
final error = _error!;
final stack = _stackTrace;
_error = null;
_stackTrace = null;
return Future.error(error, stack);
}
_isBound = true;
if (_pendingWriteBuffers.isCompleted) {
// If [_buffers] is not currently being written then just start a
// new write. If there is no data to write then [_pendingWriteBuffers] will
// complete immediately.
_writeBuffers();
}
return _pendingWriteBuffers.future.whenComplete(() => _isBound = false);
}
Future<File?> flush() {
if (_isClosed) {
throw StateError("${_file?.path ?? "IOSink"} is closed");
}
if (_isBound) {
throw StateError("${_file?.path ?? "IOSink"} is bound to a stream");
}
return _flush();
}
void write(Object? obj) {
String string = '$obj';
if (string.isEmpty) return;
add(encoding.encode(string));
}
void writeAll(Iterable objects, [String separator = ""]) {
Iterator iterator = objects.iterator;
if (!iterator.moveNext()) return;
if (separator.isEmpty) {
do {
write(iterator.current);
} while (iterator.moveNext());
} else {
write(iterator.current);
while (iterator.moveNext()) {
write(separator);
write(iterator.current);
}
}
}
void writeCharCode(int charCode) {
write(new String.fromCharCode(charCode));
}
void writeln([Object? object = ""]) {
write(object);
write("\n");
}
Future<File?> close() =>
_openFuture.then((openedFile) => openedFile.close()).then((_) => _file);
}
// Class for encapsulating the native implementation of files.
@ -747,7 +512,8 @@ class _File extends FileSystemEntity implements File {
mode != FileMode.writeOnlyAppend) {
throw new ArgumentError('Invalid file mode for this operation');
}
return new _RandomAccessFileIOSync(this, mode, encoding);
var consumer = new _FileStreamConsumer(this, mode);
return new IOSink(consumer, encoding: encoding);
}
Future<Uint8List> readAsBytes() {
@ -1349,12 +1115,11 @@ class _ReadPipe extends _FileStream implements ReadPipe {
_ReadPipe(RandomAccessFile file) : super.forRandomAccessFile(file);
}
class _WritePipe extends _RandomAccessFileIOSync implements WritePipe {
RandomAccessFile _randomAccessFile;
_WritePipe(RandomAccessFile file)
: _randomAccessFile = file,
super.fromRandomAccessFile(file, utf8);
class _WritePipe extends _IOSinkImpl implements WritePipe {
RandomAccessFile _file;
_WritePipe(file)
: this._file = file,
super(_FileStreamConsumer.fromRandomAccessFile(file), utf8);
}
class _Pipe implements Pipe {

View file

@ -262,7 +262,7 @@ class Stdout extends _StdSink implements IOSink {
/// A non-blocking `IOSink` for the same output.
IOSink get nonBlocking {
return _nonBlocking ??= new _RandomAccessFileIOSync.fromStdio(_fd, utf8);
return _nonBlocking ??= new IOSink(new _FileStreamConsumer.fromStdio(_fd));
}
}
@ -452,8 +452,14 @@ StdioType stdioType(object) {
return StdioType.file;
}
}
if (object is _RandomAccessFileIOSync) {
return StdioType.file;
if (object is _IOSinkImpl) {
try {
if (object._target is _FileStreamConsumer) {
return StdioType.file;
}
} catch (e) {
// Only the interface implemented, _sink not available.
}
}
return StdioType.other;
}

View file

@ -1,503 +0,0 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// Test the [IOSink] returned by [File.openWrite].
import "dart:async";
import "dart:convert";
import 'dart:collection';
import "dart:io";
import 'dart:typed_data';
import "package:expect/expect.dart";
/// A list that throws when it's data is retrieved. Useful in simulating write
/// failures.
class BrokenList extends Object with ListMixin<int> implements List<int> {
String _message;
BrokenList(this._message, this.length);
int length = 10;
void operator []=(int index, int value) {
throw UnsupportedError('[$index] = $value');
}
operator [](int index) => throw FileSystemException(_message);
}
Future<void> testWriteAllEmptyIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_all_empty_iterator');
final sink = file.openWrite();
sink.writeAll([]);
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testWriteAllEmptyIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_all_empty_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll([], ',');
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testWriteAllOneElementIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_one_element_iterator');
final sink = file.openWrite();
sink.writeAll(['hello']);
await sink.close();
Expect.equals('hello', file.readAsStringSync());
}
Future<void> testWriteAllOneElementIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_one_element_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll(['hello'], ',');
await sink.close();
Expect.equals('hello', file.readAsStringSync());
}
Future<void> testWriteAllTwoElementIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_two_element_iterator');
final sink = file.openWrite();
sink.writeAll(['hello', 'world']);
await sink.close();
Expect.equals('helloworld', file.readAsStringSync());
}
Future<void> testWriteAllTwoElementIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_two_element_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll(['hello', 'world'], ',');
await sink.close();
Expect.equals('hello,world', file.readAsStringSync());
}
Future<void> testWriteln(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln');
final sink = file.openWrite();
sink.writeln();
await sink.close();
Expect.equals('\n', file.readAsStringSync());
}
Future<void> testWritelnWithArg(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln_with_arg');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.close();
Expect.equals('Hello World!\n', file.readAsStringSync());
}
Future<void> testWriteEncoded(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln_with_arg');
final sink = file.openWrite();
sink.encoding = latin1;
sink.write('Allô');
await sink.close();
Expect.equals('Allô', file.readAsStringSync(encoding: latin1));
}
Future<void> testFlushWithoutWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write');
final sink = file.openWrite();
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testSmallWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.close();
Expect.equals('Hello World!\n', file.readAsStringSync());
}
Future<void> testSmallWriteAfterFlush(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write_after_flush');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.flush();
sink.writeln('How are you?');
await sink.close();
Expect.listEquals(['Hello World!', 'How are you?'], file.readAsLinesSync());
}
Future<void> testSmallWriteAfterClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write_after_flush');
final sink = file.openWrite();
await sink.close();
Expect.throws(() => sink.writeln('Hello World!'), (e) => e is StateError);
}
Future<void> testManySmallWrites(Directory tmpDir) async {
final file = File('${tmpDir.path}/many_small_writes');
final sink = file.openWrite();
final data = List.generate(10000, (l) => '{l}');
data.forEach(sink.writeln);
await sink.close();
Expect.listEquals(data, file.readAsLinesSync());
}
Future<void> testLargeWriteAfterSmallWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/large_write_after_small_write');
final sink = file.openWrite();
sink.writeln('Hello');
sink.writeln('World' * 100000);
await sink.close();
Expect.listEquals(['Hello', 'World' * 100000], file.readAsLinesSync());
}
Future<void> testAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
await sink.addStream(Stream.fromIterable(data));
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testMultipleAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream');
final sink = file.openWrite();
final data1 = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final data2 = [
[11],
[12, 13],
[14, 15, 16],
[17, 18, 19, 20]
];
await sink.addStream(Stream.fromIterable(data1));
await sink.addStream(Stream.fromIterable(data2));
await sink.close();
Expect.listEquals(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
file.readAsBytesSync());
}
Future<void> testAddDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
sink.add([1]);
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('add_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddErrorDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_error_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
sink.addError(FormatException());
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains(
'add_error_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddStreamDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.addStream(Stream.fromIterable([
[11, 12]
]));
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains(
'add_stream_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testCloseDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/close_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.close();
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('close_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testFlushDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/flush_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.flush();
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('flush_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
sink.writeln("Hello");
} on StateError catch (e) {
Expect.contains('add_during_close is closed', e.message);
}
await close;
}
Future<void> testAddErrorDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_error_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
sink.addError(FormatException());
} on StateError catch (e) {
Expect.contains('add_error_during_close is closed', e.message);
}
await close;
}
Future<void> testAddStreamDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
await sink.addStream(Stream.fromIterable([
[1]
]));
} on StateError catch (e) {
Expect.contains('add_stream_during_close is closed', e.message);
}
await close;
}
Future<void> testCloseDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/close_during_close');
final sink = file.openWrite();
final close = sink.close();
await sink.close();
await close;
}
Future<void> testFlushDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/flush_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
await sink.flush();
} on StateError catch (e) {
Expect.contains('flush_during_close is closed', e.message);
}
await close;
}
Future<void> testFailedWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/failed_write');
final sink = file.openWrite();
sink.add(BrokenList('testFailedWrite', 100000));
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {
Expect.equals('testFailedWrite', e.message);
}
try {
await sink.close();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
Expect.listEquals([], file.readAsBytesSync());
}
Future<void> testAddAfterFailedWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_after_failed_write');
final sink = file.openWrite();
sink.add(BrokenList('testAddAfterFailedWrite', 100000));
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
sink.add([1, 2, 3]);
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
try {
await sink.close();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
Expect.listEquals([], file.readAsBytesSync());
}
Future<void> testUnawaitedAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/unawaited_add_stream');
final sink = file.openWrite();
final controller = StreamController<List<int>>();
sink.addStream(controller.stream);
controller.add(<int>[1, 2, 3, 4, 5]);
controller.add(<int>[6, 7, 8, 9]);
await controller.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9], file.readAsBytesSync());
}
void main() async {
final tmpDir = Directory.systemTemp.createTempSync('file_iosink_tests');
try {
await testWriteAllEmptyIterator(tmpDir);
await testWriteAllEmptyIteratorWithSep(tmpDir);
await testWriteAllOneElementIterator(tmpDir);
await testWriteAllOneElementIteratorWithSep(tmpDir);
await testWriteAllTwoElementIterator(tmpDir);
await testWriteAllTwoElementIteratorWithSep(tmpDir);
await testWriteln(tmpDir);
await testWritelnWithArg(tmpDir);
await testWriteEncoded(tmpDir);
await testFlushWithoutWrite(tmpDir);
await testSmallWrite(tmpDir);
await testSmallWriteAfterFlush(tmpDir);
await testSmallWriteAfterClose(tmpDir);
await testManySmallWrites(tmpDir);
await testLargeWriteAfterSmallWrite(tmpDir);
await testAddStream(tmpDir);
await testMultipleAddStream(tmpDir);
await testAddDuringAddStream(tmpDir);
await testAddErrorDuringAddStream(tmpDir);
await testAddStreamDuringAddStream(tmpDir);
await testCloseDuringAddStream(tmpDir);
await testFlushDuringAddStream(tmpDir);
await testAddDuringClose(tmpDir);
await testAddErrorDuringClose(tmpDir);
await testAddStreamDuringClose(tmpDir);
await testCloseDuringClose(tmpDir);
await testFlushDuringClose(tmpDir);
await testFailedWrite(tmpDir);
await testAddAfterFailedWrite(tmpDir);
await testUnawaitedAddStream(tmpDir);
} finally {
tmpDir.deleteSync(recursive: true);
}
}

View file

@ -1,505 +0,0 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
/// Test the [IOSink] returned by [File.openWrite].
import "dart:async";
import "dart:convert";
import 'dart:collection';
import "dart:io";
import 'dart:typed_data';
import "package:expect/expect.dart";
/// A list that throws when it's data is retrieved. Useful in simulating write
/// failures.
class BrokenList extends Object with ListMixin<int> implements List<int> {
String _message;
BrokenList(this._message, this.length);
int length = 10;
void operator []=(int index, int value) {
throw UnsupportedError('[$index] = $value');
}
operator [](int index) => throw FileSystemException(_message);
}
Future<void> testWriteAllEmptyIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_all_empty_iterator');
final sink = file.openWrite();
sink.writeAll([]);
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testWriteAllEmptyIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_all_empty_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll([], ',');
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testWriteAllOneElementIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_one_element_iterator');
final sink = file.openWrite();
sink.writeAll(['hello']);
await sink.close();
Expect.equals('hello', file.readAsStringSync());
}
Future<void> testWriteAllOneElementIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_one_element_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll(['hello'], ',');
await sink.close();
Expect.equals('hello', file.readAsStringSync());
}
Future<void> testWriteAllTwoElementIterator(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_two_element_iterator');
final sink = file.openWrite();
sink.writeAll(['hello', 'world']);
await sink.close();
Expect.equals('helloworld', file.readAsStringSync());
}
Future<void> testWriteAllTwoElementIteratorWithSep(Directory tmpDir) async {
final file = File('${tmpDir.path}/write_two_element_iterator_with_sep');
final sink = file.openWrite();
sink.writeAll(['hello', 'world'], ',');
await sink.close();
Expect.equals('hello,world', file.readAsStringSync());
}
Future<void> testWriteln(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln');
final sink = file.openWrite();
sink.writeln();
await sink.close();
Expect.equals('\n', file.readAsStringSync());
}
Future<void> testWritelnWithArg(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln_with_arg');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.close();
Expect.equals('Hello World!\n', file.readAsStringSync());
}
Future<void> testWriteEncoded(Directory tmpDir) async {
final file = File('${tmpDir.path}/test_writeln_with_arg');
final sink = file.openWrite();
sink.encoding = latin1;
sink.write('Allô');
await sink.close();
Expect.equals('Allô', file.readAsStringSync(encoding: latin1));
}
Future<void> testFlushWithoutWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write');
final sink = file.openWrite();
await sink.close();
Expect.equals('', file.readAsStringSync());
}
Future<void> testSmallWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.close();
Expect.equals('Hello World!\n', file.readAsStringSync());
}
Future<void> testSmallWriteAfterFlush(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write_after_flush');
final sink = file.openWrite();
sink.writeln('Hello World!');
await sink.flush();
sink.writeln('How are you?');
await sink.close();
Expect.listEquals(['Hello World!', 'How are you?'], file.readAsLinesSync());
}
Future<void> testSmallWriteAfterClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/small_write_after_flush');
final sink = file.openWrite();
await sink.close();
Expect.throws(() => sink.writeln('Hello World!'), (e) => e is StateError);
}
Future<void> testManySmallWrites(Directory tmpDir) async {
final file = File('${tmpDir.path}/many_small_writes');
final sink = file.openWrite();
final data = List.generate(10000, (l) => '{l}');
data.forEach(sink.writeln);
await sink.close();
Expect.listEquals(data, file.readAsLinesSync());
}
Future<void> testLargeWriteAfterSmallWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/large_write_after_small_write');
final sink = file.openWrite();
sink.writeln('Hello');
sink.writeln('World' * 100000);
await sink.close();
Expect.listEquals(['Hello', 'World' * 100000], file.readAsLinesSync());
}
Future<void> testAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
await sink.addStream(Stream.fromIterable(data));
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testMultipleAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream');
final sink = file.openWrite();
final data1 = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final data2 = [
[11],
[12, 13],
[14, 15, 16],
[17, 18, 19, 20]
];
await sink.addStream(Stream.fromIterable(data1));
await sink.addStream(Stream.fromIterable(data2));
await sink.close();
Expect.listEquals(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
file.readAsBytesSync());
}
Future<void> testAddDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
sink.add([1]);
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('add_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddErrorDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_error_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
sink.addError(FormatException());
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains(
'add_error_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddStreamDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.addStream(Stream.fromIterable([
[11, 12]
]));
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains(
'add_stream_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testCloseDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/close_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.close();
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('close_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testFlushDuringAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/flush_during_add_stream');
final sink = file.openWrite();
final data = [
[1],
[2, 3],
[4, 5, 6],
[7, 8, 9, 10]
];
final add = sink.addStream(Stream.fromIterable(data));
try {
await sink.flush();
Expect.fail('expected exception');
} on StateError catch (e) {
Expect.contains('flush_during_add_stream is bound to a stream', e.message);
}
await add;
await sink.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], file.readAsBytesSync());
}
Future<void> testAddDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
sink.writeln("Hello");
} on StateError catch (e) {
Expect.contains('add_during_close is closed', e.message);
}
await close;
}
Future<void> testAddErrorDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_error_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
sink.addError(FormatException());
} on StateError catch (e) {
Expect.contains('add_error_during_close is closed', e.message);
}
await close;
}
Future<void> testAddStreamDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_stream_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
await sink.addStream(Stream.fromIterable([
[1]
]));
} on StateError catch (e) {
Expect.contains('add_stream_during_close is closed', e.message);
}
await close;
}
Future<void> testCloseDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/close_during_close');
final sink = file.openWrite();
final close = sink.close();
await sink.close();
await close;
}
Future<void> testFlushDuringClose(Directory tmpDir) async {
final file = File('${tmpDir.path}/flush_during_close');
final sink = file.openWrite();
final close = sink.close();
try {
await sink.flush();
} on StateError catch (e) {
Expect.contains('flush_during_close is closed', e.message);
}
await close;
}
Future<void> testFailedWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/failed_write');
final sink = file.openWrite();
sink.add(BrokenList('testFailedWrite', 100000));
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {
Expect.equals('testFailedWrite', e.message);
}
try {
await sink.close();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
Expect.listEquals([], file.readAsBytesSync());
}
Future<void> testAddAfterFailedWrite(Directory tmpDir) async {
final file = File('${tmpDir.path}/add_after_failed_write');
final sink = file.openWrite();
sink.add(BrokenList('testAddAfterFailedWrite', 100000));
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
sink.add([1, 2, 3]);
try {
await sink.flush();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
try {
await sink.close();
Expect.fail('expected exception');
} on FileSystemException catch (e) {}
Expect.listEquals([], file.readAsBytesSync());
}
Future<void> testUnawaitedAddStream(Directory tmpDir) async {
final file = File('${tmpDir.path}/unawaited_add_stream');
final sink = file.openWrite();
final controller = StreamController<List<int>>();
sink.addStream(controller.stream);
controller.add(<int>[1, 2, 3, 4, 5]);
controller.add(<int>[6, 7, 8, 9]);
await controller.close();
Expect.listEquals([1, 2, 3, 4, 5, 6, 7, 8, 9], file.readAsBytesSync());
}
void main() async {
final tmpDir = Directory.systemTemp.createTempSync('file_iosink_tests');
try {
await testWriteAllEmptyIterator(tmpDir);
await testWriteAllEmptyIteratorWithSep(tmpDir);
await testWriteAllOneElementIterator(tmpDir);
await testWriteAllOneElementIteratorWithSep(tmpDir);
await testWriteAllTwoElementIterator(tmpDir);
await testWriteAllTwoElementIteratorWithSep(tmpDir);
await testWriteln(tmpDir);
await testWritelnWithArg(tmpDir);
await testWriteEncoded(tmpDir);
await testFlushWithoutWrite(tmpDir);
await testSmallWrite(tmpDir);
await testSmallWriteAfterFlush(tmpDir);
await testSmallWriteAfterClose(tmpDir);
await testManySmallWrites(tmpDir);
await testLargeWriteAfterSmallWrite(tmpDir);
await testAddStream(tmpDir);
await testMultipleAddStream(tmpDir);
await testAddDuringAddStream(tmpDir);
await testAddErrorDuringAddStream(tmpDir);
await testAddStreamDuringAddStream(tmpDir);
await testCloseDuringAddStream(tmpDir);
await testFlushDuringAddStream(tmpDir);
await testAddDuringClose(tmpDir);
await testAddErrorDuringClose(tmpDir);
await testAddStreamDuringClose(tmpDir);
await testCloseDuringClose(tmpDir);
await testFlushDuringClose(tmpDir);
await testFailedWrite(tmpDir);
await testAddAfterFailedWrite(tmpDir);
await testUnawaitedAddStream(tmpDir);
} finally {
tmpDir.deleteSync(recursive: true);
}
}