Implement anonymous pipes and the ability to transmit them between processes using Unix Domain Sockets.

Change-Id: I9c9f4ec0e99075a29c6f4d97c503e759134eb094
TESTED=Unit tests
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/257804
Commit-Queue: Brian Quinlan <bquinlan@google.com>
Reviewed-by: Lasse Nielsen <lrn@google.com>
Reviewed-by: Alexander Aprelev <aam@google.com>
This commit is contained in:
Brian Quinlan 2022-09-13 21:16:28 +00:00 committed by Commit Bot
parent e324ada968
commit 2ead86ab9d
25 changed files with 777 additions and 41 deletions

View file

@ -550,6 +550,25 @@ void FUNCTION_NAME(File_CreateLink)(Dart_NativeArguments args) {
} }
} }
void FUNCTION_NAME(File_CreatePipe)(Dart_NativeArguments args) {
Namespace* namespc = Namespace::GetNamespace(args, 0);
File* readPipe;
File* writePipe;
if (File::CreatePipe(namespc, &readPipe, &writePipe)) {
Dart_Handle pipes = ThrowIfError(Dart_NewList(2));
Dart_Handle readHandle =
ThrowIfError(Dart_NewInteger(reinterpret_cast<intptr_t>(readPipe)));
Dart_Handle writeHandle =
ThrowIfError(Dart_NewInteger(reinterpret_cast<intptr_t>(writePipe)));
ThrowIfError(Dart_ListSetAt(pipes, 0, readHandle));
ThrowIfError(Dart_ListSetAt(pipes, 1, writeHandle));
Dart_SetReturnValue(args, pipes);
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError());
}
}
void FUNCTION_NAME(File_LinkTarget)(Dart_NativeArguments args) { void FUNCTION_NAME(File_LinkTarget)(Dart_NativeArguments args) {
Namespace* namespc = Namespace::GetNamespace(args, 0); Namespace* namespc = Namespace::GetNamespace(args, 0);
Dart_Handle path_handle = Dart_GetNativeArgument(args, 1); Dart_Handle path_handle = Dart_GetNativeArgument(args, 1);
@ -915,6 +934,31 @@ CObject* File::CreateRequest(const CObjectArray& request) {
: CObject::NewOSError(); : CObject::NewOSError();
} }
CObject* File::CreatePipeRequest(const CObjectArray& request) {
if ((request.Length() < 1) || !request[0]->IsIntptr()) {
return CObject::IllegalArgumentError();
}
Namespace* namespc = CObjectToNamespacePointer(request[0]);
RefCntReleaseScope<Namespace> rs(namespc);
File* readPipe;
File* writePipe;
if (!CreatePipe(namespc, &readPipe, &writePipe)) {
return CObject::NewOSError();
}
CObjectArray* pipes = new CObjectArray(CObject::NewArray(2));
CObjectNativePointer* readHandle = new CObjectNativePointer(
CObject::NewNativePointer(reinterpret_cast<intptr_t>(readPipe),
sizeof(*readPipe), ReleaseFile));
CObjectNativePointer* writeHandle = new CObjectNativePointer(
CObject::NewNativePointer(reinterpret_cast<intptr_t>(writePipe),
sizeof(*writePipe), ReleaseFile));
pipes->SetAt(0, readHandle);
pipes->SetAt(1, writeHandle);
return pipes;
}
CObject* File::OpenRequest(const CObjectArray& request) { CObject* File::OpenRequest(const CObjectArray& request) {
if ((request.Length() < 1) || !request[0]->IsIntptr()) { if ((request.Length() < 1) || !request[0]->IsIntptr()) {
return CObject::IllegalArgumentError(); return CObject::IllegalArgumentError();

View file

@ -228,10 +228,7 @@ class File : public ReferenceCounted<File> {
// (stdin, stout or stderr). // (stdin, stout or stderr).
static File* OpenStdio(int fd); static File* OpenStdio(int fd);
#if defined(DART_HOST_OS_FUCHSIA) || defined(DART_HOST_OS_LINUX) || \
defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_MACOS)
static File* OpenFD(int fd); static File* OpenFD(int fd);
#endif
static bool Exists(Namespace* namespc, const char* path); static bool Exists(Namespace* namespc, const char* path);
static bool ExistsUri(Namespace* namespc, const char* uri); static bool ExistsUri(Namespace* namespc, const char* uri);
@ -239,6 +236,7 @@ class File : public ReferenceCounted<File> {
static bool CreateLink(Namespace* namespc, static bool CreateLink(Namespace* namespc,
const char* path, const char* path,
const char* target); const char* target);
static bool CreatePipe(Namespace* namespc, File** readPipe, File** writePipe);
static bool Delete(Namespace* namespc, const char* path); static bool Delete(Namespace* namespc, const char* path);
static bool DeleteLink(Namespace* namespc, const char* path); static bool DeleteLink(Namespace* namespc, const char* path);
static bool Rename(Namespace* namespc, static bool Rename(Namespace* namespc,
@ -297,6 +295,7 @@ class File : public ReferenceCounted<File> {
static CObject* ExistsRequest(const CObjectArray& request); static CObject* ExistsRequest(const CObjectArray& request);
static CObject* CreateRequest(const CObjectArray& request); static CObject* CreateRequest(const CObjectArray& request);
static CObject* CreatePipeRequest(const CObjectArray& request);
static CObject* DeleteRequest(const CObjectArray& request); static CObject* DeleteRequest(const CObjectArray& request);
static CObject* RenameRequest(const CObjectArray& request); static CObject* RenameRequest(const CObjectArray& request);
static CObject* CopyRequest(const CObjectArray& request); static CObject* CopyRequest(const CObjectArray& request);

View file

@ -334,6 +334,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(SymlinkAt(target, ns.fd(), ns.path())) == 0; return NO_RETRY_EXPECTED(SymlinkAt(target, ns.fd(), ns.path())) == 0;
} }
bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}
File::Type File::GetType(Namespace* namespc, File::Type File::GetType(Namespace* namespc,
const char* name, const char* name,
bool follow_links) { bool follow_links) {

View file

@ -332,6 +332,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0; return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0;
} }
bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}
File::Type File::GetType(Namespace* namespc, File::Type File::GetType(Namespace* namespc,
const char* name, const char* name,
bool follow_links) { bool follow_links) {

View file

@ -328,6 +328,17 @@ bool File::CreateLink(Namespace* namespc,
return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0; return NO_RETRY_EXPECTED(symlinkat(target, ns.fd(), ns.path())) == 0;
} }
bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}
File::Type File::GetType(Namespace* namespc, File::Type File::GetType(Namespace* namespc,
const char* name, const char* name,
bool follow_links) { bool follow_links) {

View file

@ -371,6 +371,17 @@ bool File::CreateLink(Namespace* namespc,
return (status == 0); return (status == 0);
} }
bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = NO_RETRY_EXPECTED(pipe(pipe_fds));
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}
File::Type File::GetType(Namespace* namespc, File::Type File::GetType(Namespace* namespc,
const char* pathname, const char* pathname,
bool follow_links) { bool follow_links) {

View file

@ -314,14 +314,18 @@ File* File::FileOpenW(const wchar_t* system_name, FileOpenMode mode) {
return NULL; return NULL;
} }
} }
return OpenFD(fd);
}
File* File::OpenFD(int fd) {
return new File(new FileHandle(fd)); return new File(new FileHandle(fd));
} }
class StringRAII { class StringRAII {
public: public:
explicit StringRAII(StringRAII& origin) { explicit StringRAII(StringRAII* origin) {
own_ = origin.own_; own_ = origin->own_;
s_ = origin.release(); s_ = origin->release();
} }
explicit StringRAII(const char* s) : s_(s), own_(false) {} explicit StringRAII(const char* s) : s_(s), own_(false) {}
@ -616,6 +620,17 @@ bool File::CreateLink(Namespace* namespc,
return (create_status != 0); return (create_status != 0);
} }
bool File::CreatePipe(Namespace* namespc, File** readPipe, File** writePipe) {
int pipe_fds[2];
int status = _pipe(pipe_fds, 4096, _O_BINARY);
if (status != 0) {
return false;
}
*readPipe = OpenFD(pipe_fds[0]);
*writePipe = OpenFD(pipe_fds[1]);
return true;
}
bool File::Delete(Namespace* namespc, const char* name) { bool File::Delete(Namespace* namespc, const char* name) {
Utf8ToWideScope system_name(PrefixLongFilePath(name)); Utf8ToWideScope system_name(PrefixLongFilePath(name));
int status = _wremove(system_name.wide()); int status = _wremove(system_name.wide());

View file

@ -40,6 +40,7 @@ namespace bin {
V(File_Copy, 3) \ V(File_Copy, 3) \
V(File_Create, 3) \ V(File_Create, 3) \
V(File_CreateLink, 3) \ V(File_CreateLink, 3) \
V(File_CreatePipe, 1) \
V(File_Delete, 2) \ V(File_Delete, 2) \
V(File_DeleteLink, 2) \ V(File_DeleteLink, 2) \
V(File_Exists, 2) \ V(File_Exists, 2) \

View file

@ -48,18 +48,19 @@ namespace bin {
V(File, Identical, 28) \ V(File, Identical, 28) \
V(File, Stat, 29) \ V(File, Stat, 29) \
V(File, Lock, 30) \ V(File, Lock, 30) \
V(Socket, Lookup, 31) \ V(File, CreatePipe, 31) \
V(Socket, ListInterfaces, 32) \ V(Socket, Lookup, 32) \
V(Socket, ReverseLookup, 33) \ V(Socket, ListInterfaces, 33) \
V(Directory, Create, 34) \ V(Socket, ReverseLookup, 34) \
V(Directory, Delete, 35) \ V(Directory, Create, 35) \
V(Directory, Exists, 36) \ V(Directory, Delete, 36) \
V(Directory, CreateTemp, 37) \ V(Directory, Exists, 37) \
V(Directory, ListStart, 38) \ V(Directory, CreateTemp, 38) \
V(Directory, ListNext, 39) \ V(Directory, ListStart, 39) \
V(Directory, ListStop, 40) \ V(Directory, ListNext, 40) \
V(Directory, Rename, 41) \ V(Directory, ListStop, 41) \
V(SSLFilter, ProcessFilter, 42) V(Directory, Rename, 42) \
V(SSLFilter, ProcessFilter, 43)
#define DECLARE_REQUEST(type, method, id) k##type##method##Request = id, #define DECLARE_REQUEST(type, method, id) k##type##method##Request = id,

View file

@ -3258,6 +3258,7 @@
"../../../tests/standalone/io/file_non_ascii_sync_test.dart", "../../../tests/standalone/io/file_non_ascii_sync_test.dart",
"../../../tests/standalone/io/file_non_ascii_test.dart", "../../../tests/standalone/io/file_non_ascii_test.dart",
"../../../tests/standalone/io/file_output_stream_test.dart", "../../../tests/standalone/io/file_output_stream_test.dart",
"../../../tests/standalone/io/file_pipe_test.dart",
"../../../tests/standalone/io/file_read_encoded_test.dart", "../../../tests/standalone/io/file_read_encoded_test.dart",
"../../../tests/standalone/io/file_stat_test.dart", "../../../tests/standalone/io/file_stat_test.dart",
"../../../tests/standalone/io/file_stream_test.dart", "../../../tests/standalone/io/file_stream_test.dart",
@ -6580,6 +6581,7 @@
"../../../tests/standalone_2/io/file_non_ascii_sync_test.dart", "../../../tests/standalone_2/io/file_non_ascii_sync_test.dart",
"../../../tests/standalone_2/io/file_non_ascii_test.dart", "../../../tests/standalone_2/io/file_non_ascii_test.dart",
"../../../tests/standalone_2/io/file_output_stream_test.dart", "../../../tests/standalone_2/io/file_output_stream_test.dart",
"../../../tests/standalone_2/io/file_pipe_test.dart",
"../../../tests/standalone_2/io/file_read_encoded_test.dart", "../../../tests/standalone_2/io/file_read_encoded_test.dart",
"../../../tests/standalone_2/io/file_stat_test.dart", "../../../tests/standalone_2/io/file_stat_test.dart",
"../../../tests/standalone_2/io/file_stream_test.dart", "../../../tests/standalone_2/io/file_stream_test.dart",

View file

@ -122,6 +122,11 @@ class _File {
throw UnsupportedError("File._createLink"); throw UnsupportedError("File._createLink");
} }
@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}
@patch @patch
static _linkTarget(_Namespace namespace, Uint8List rawPath) { static _linkTarget(_Namespace namespace, Uint8List rawPath) {
throw UnsupportedError("File._linkTarget"); throw UnsupportedError("File._linkTarget");
@ -541,6 +546,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) { factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor"); throw UnsupportedError("ResourceHandle.fromStdout constructor");
} }
@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}
@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
} }
@patch @patch

View file

@ -122,6 +122,11 @@ class _File {
throw new UnsupportedError("File._createLink"); throw new UnsupportedError("File._createLink");
} }
@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}
@patch @patch
static _linkTarget(_Namespace namespace, Uint8List path) { static _linkTarget(_Namespace namespace, Uint8List path) {
throw new UnsupportedError("File._linkTarget"); throw new UnsupportedError("File._linkTarget");
@ -541,6 +546,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) { factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor"); throw UnsupportedError("ResourceHandle.fromStdout constructor");
} }
@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}
@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
} }
@patch @patch

View file

@ -18,6 +18,9 @@ class _File {
external static _createLink( external static _createLink(
_Namespace namespace, Uint8List rawPath, String target); _Namespace namespace, Uint8List rawPath, String target);
@patch @patch
@pragma("vm:external-name", "File_CreatePipe")
external static List<dynamic> _createPipe(_Namespace namespace);
@patch
@pragma("vm:external-name", "File_LinkTarget") @pragma("vm:external-name", "File_LinkTarget")
external static _linkTarget(_Namespace namespace, Uint8List rawPath); external static _linkTarget(_Namespace namespace, Uint8List rawPath);
@patch @patch

View file

@ -756,7 +756,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
"Address family not supported by protocol family, " "Address family not supported by protocol family, "
// ...and then add some details. // ...and then add some details.
"sourceAddress.type must be ${InternetAddressType.unix} but was " "sourceAddress.type must be ${InternetAddressType.unix} but was "
"${source.type}", address: address); "${source.type}",
address: address);
} }
connectionResult = socket.nativeCreateUnixDomainBindConnect( connectionResult = socket.nativeCreateUnixDomainBindConnect(
address.address, source.address, _Namespace._namespace); address.address, source.address, _Namespace._namespace);
@ -2592,6 +2593,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) { factory ResourceHandle.fromStdout(Stdout stdout) {
return _ResourceHandleImpl(stdout._fd); return _ResourceHandleImpl(stdout._fd);
} }
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
_ReadPipe rp = pipe as _ReadPipe;
return ResourceHandle.fromFile(rp._openedFile!);
}
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
_WritePipe wp = pipe as _WritePipe;
return ResourceHandle.fromFile(wp._file);
}
} }
@pragma("vm:entry-point") @pragma("vm:entry-point")
@ -2622,6 +2633,14 @@ class _ResourceHandleImpl implements ResourceHandle {
return _RawSocket(nativeSocket); return _RawSocket(nativeSocket);
} }
_ReadPipe toReadPipe() {
return _ReadPipe(toFile());
}
_WritePipe toWritePipe() {
return _WritePipe(toFile());
}
@pragma("vm:external-name", "ResourceHandleImpl_toRawDatagramSocket") @pragma("vm:external-name", "ResourceHandleImpl_toRawDatagramSocket")
external RawDatagramSocket toRawDatagramSocket(); external RawDatagramSocket toRawDatagramSocket();

View file

@ -121,6 +121,11 @@ class _File {
throw new UnsupportedError("File._createLink"); throw new UnsupportedError("File._createLink");
} }
@patch
static List<dynamic> _createPipe(_Namespace namespace) {
throw UnsupportedError("File._createPipe");
}
@patch @patch
static _linkTarget(_Namespace namespace, Uint8List path) { static _linkTarget(_Namespace namespace, Uint8List path) {
throw new UnsupportedError("File._linkTarget"); throw new UnsupportedError("File._linkTarget");
@ -540,6 +545,16 @@ class ResourceHandle {
factory ResourceHandle.fromStdout(Stdout stdout) { factory ResourceHandle.fromStdout(Stdout stdout) {
throw UnsupportedError("ResourceHandle.fromStdout constructor"); throw UnsupportedError("ResourceHandle.fromStdout constructor");
} }
@patch
factory ResourceHandle.fromReadPipe(ReadPipe pipe) {
throw UnsupportedError("ResourceHandle.fromReadPipe constructor");
}
@patch
factory ResourceHandle.fromWritePipe(WritePipe pipe) {
throw UnsupportedError("ResourceHandle.fromWritePipe constructor");
}
} }
@patch @patch

View file

@ -442,6 +442,12 @@ abstract class File implements FileSystemEntity {
/// In order to make sure that system resources are freed, the stream /// In order to make sure that system resources are freed, the stream
/// must be read to completion or the subscription on the stream must /// must be read to completion or the subscription on the stream must
/// be cancelled. /// be cancelled.
///
/// If [File] is a [named pipe](https://en.wikipedia.org/wiki/Named_pipe)
/// then the returned [Stream] will wait until the write side of the pipe
/// is closed before signaling "done". If there are no writers attached
/// to the pipe when it is opened, then [Stream.listen] will wait until
/// a writer opens the pipe.
Stream<List<int>> openRead([int? start, int? end]); Stream<List<int>> openRead([int? start, int? end]);
/// Creates a new independent [IOSink] for the file. /// Creates a new independent [IOSink] for the file.
@ -909,3 +915,61 @@ class FileSystemException implements IOException {
return sb.toString(); return sb.toString();
} }
} }
/// The "read" end of an [Pipe] created by [Pipe.create].
///
/// The read stream will continue to listen until the "write" end of the
/// pipe (i.e. [Pipe.write]) is closed.
///
/// ```dart
/// final pipe = await Pipe.create();
/// pipe.read.transform(utf8.decoder).listen((data) {
/// print(data);
/// }, onDone: () => print('Done'));
/// ```
abstract class ReadPipe implements Stream<List<int>> {}
/// The "write" end of an [Pipe] created by [Pipe.create].
///
/// ```dart
/// final pipe = await Pipe.create();
/// pipe.write.add("Hello World!".codeUnits);
/// pipe.write.close();
/// ```
abstract class WritePipe implements IOSink {}
/// An anonymous pipe that can be used to send data in a single direction i.e.
/// data written to [write] can be read using [read].
///
/// On macOS and Linux (excluding Android), either the [read] or [write]
/// portion of the pipe can be transmitted to another process and used for
/// interprocess communication.
///
/// For example:
/// ```dart
/// final pipe = await Pipe.create();
/// final socket = await RawSocket.connect(address, 0);
/// socket.sendMessage(<SocketControlMessage>[
/// SocketControlMessage.fromHandles(
/// <ResourceHandle>[ResourceHandle.fromReadPipe(pipe.read)])
/// ], 'Hello'.codeUnits);
/// pipe.write.add('Hello over pipe!'.codeUnits);
/// pipe.write.close();
/// ```
abstract class Pipe {
/// The read end of the [Pipe].
ReadPipe get read;
/// The write end of the [Pipe].
WritePipe get write;
// Create an anonymous pipe.
static Future<Pipe> create() {
return _Pipe.create();
}
/// Synchronously creates an anonymous pipe.
factory Pipe.createSync() {
return _Pipe.createSync();
}
}

View file

@ -13,7 +13,7 @@ class _FileStream extends Stream<List<int>> {
// Information about the underlying file. // Information about the underlying file.
String? _path; String? _path;
late RandomAccessFile _openedFile; RandomAccessFile? _openedFile;
int _position; int _position;
int? _end; int? _end;
final Completer _closeCompleter = new Completer(); final Completer _closeCompleter = new Completer();
@ -31,6 +31,10 @@ class _FileStream extends Stream<List<int>> {
_FileStream.forStdin() : _position = 0; _FileStream.forStdin() : _position = 0;
_FileStream.forRandomAccessFile(RandomAccessFile f)
: _position = 0,
_openedFile = f;
StreamSubscription<Uint8List> listen(void onData(Uint8List event)?, StreamSubscription<Uint8List> listen(void onData(Uint8List event)?,
{Function? onError, void onDone()?, bool? cancelOnError}) { {Function? onError, void onDone()?, bool? cancelOnError}) {
_controller = new StreamController<Uint8List>( _controller = new StreamController<Uint8List>(
@ -56,7 +60,7 @@ class _FileStream extends Stream<List<int>> {
_controller.close(); _controller.close();
} }
_openedFile.close().catchError(_controller.addError).whenComplete(done); _openedFile!.close().catchError(_controller.addError).whenComplete(done);
return _closeCompleter.future; return _closeCompleter.future;
} }
@ -82,20 +86,27 @@ class _FileStream extends Stream<List<int>> {
return; return;
} }
} }
_openedFile.read(readBytes).then((block) { _openedFile!.read(readBytes).then((block) {
_readInProgress = false; _readInProgress = false;
if (_unsubscribed) { if (_unsubscribed) {
_closeFile(); _closeFile();
return; return;
} }
_position += block.length; _position += block.length;
if (block.length < readBytes || (_end != null && _position == _end)) { // read() may return less than `readBytes` if `_openFile` is a pipe or
// terminal or if a signal is received. Only a empty return indicates
// that the write side of the pipe is closed or that we are at the end
// of a file.
// See https://man7.org/linux/man-pages/man2/read.2.html
if (block.length == 0 || (_end != null && _position == _end)) {
_atEnd = true; _atEnd = true;
} }
if (!_atEnd && !_controller.isPaused) { if (!_atEnd && !_controller.isPaused) {
_readBlock(); _readBlock();
} }
_controller.add(block); if (block.length > 0) {
_controller.add(block);
}
if (_atEnd) { if (_atEnd) {
_closeFile(); _closeFile();
} }
@ -141,7 +152,10 @@ class _FileStream extends Stream<List<int>> {
} }
final path = _path; final path = _path;
if (path != null) { final openedFile = _openedFile;
if (openedFile != null) {
onOpenFile(openedFile);
} else if (path != null) {
new File(path) new File(path)
.open(mode: FileMode.read) .open(mode: FileMode.read)
.then(onOpenFile, onError: openFailed); .then(onOpenFile, onError: openFailed);
@ -166,6 +180,9 @@ class _FileStreamConsumer extends StreamConsumer<List<int>> {
_FileStreamConsumer.fromStdio(int fd) _FileStreamConsumer.fromStdio(int fd)
: _openFuture = new Future.value(_File._openStdioSync(fd)); : _openFuture = new Future.value(_File._openStdioSync(fd));
_FileStreamConsumer.fromRandomAccessFile(RandomAccessFile f)
: _openFuture = Future.value(f);
Future<File?> addStream(Stream<List<int>> stream) { Future<File?> addStream(Stream<List<int>> stream) {
Completer<File?> completer = new Completer<File?>.sync(); Completer<File?> completer = new Completer<File?>.sync();
_openFuture.then((openedFile) { _openFuture.then((openedFile) {
@ -260,6 +277,8 @@ class _File extends FileSystemEntity implements File {
external static _createLink( external static _createLink(
_Namespace namespace, Uint8List rawPath, String target); _Namespace namespace, Uint8List rawPath, String target);
external static List<dynamic> _createPipe(_Namespace namespace);
external static _linkTarget(_Namespace namespace, Uint8List rawPath); external static _linkTarget(_Namespace namespace, Uint8List rawPath);
void createSync({bool recursive = false, bool exclusive = false}) { void createSync({bool recursive = false, bool exclusive = false}) {
@ -1045,3 +1064,43 @@ class _RandomAccessFile implements RandomAccessFile {
} }
} }
} }
class _ReadPipe extends _FileStream implements ReadPipe {
_ReadPipe(RandomAccessFile file) : super.forRandomAccessFile(file);
}
class _WritePipe extends _IOSinkImpl implements WritePipe {
RandomAccessFile _file;
_WritePipe(file)
: this._file = file,
super(_FileStreamConsumer.fromRandomAccessFile(file), utf8);
}
class _Pipe implements Pipe {
final ReadPipe _readPipe;
final WritePipe _writePipe;
ReadPipe get read => _readPipe;
WritePipe get write => _writePipe;
_Pipe(this._readPipe, this._writePipe);
static Future<_Pipe> create() {
final completer = Completer<_Pipe>.sync();
_File._dispatchWithNamespace(_IOService.fileCreatePipe, [null])
.then((response) {
final filePointers = (response as List).cast<int>();
completer.complete(_Pipe(
_ReadPipe(_RandomAccessFile(filePointers[0], '')),
_WritePipe(_RandomAccessFile(filePointers[1], ''))));
});
return completer.future;
}
factory _Pipe.createSync() {
final filePointers = _File._createPipe(_Namespace._namespace);
return _Pipe(_ReadPipe(_RandomAccessFile(filePointers[0] as int, '')),
_WritePipe(_RandomAccessFile(filePointers[1] as int, '')));
}
}

View file

@ -37,18 +37,19 @@ class _IOService {
static const int fileIdentical = 28; static const int fileIdentical = 28;
static const int fileStat = 29; static const int fileStat = 29;
static const int fileLock = 30; static const int fileLock = 30;
static const int socketLookup = 31; static const int fileCreatePipe = 31;
static const int socketListInterfaces = 32; static const int socketLookup = 32;
static const int socketReverseLookup = 33; static const int socketListInterfaces = 33;
static const int directoryCreate = 34; static const int socketReverseLookup = 34;
static const int directoryDelete = 35; static const int directoryCreate = 35;
static const int directoryExists = 36; static const int directoryDelete = 36;
static const int directoryCreateTemp = 37; static const int directoryExists = 37;
static const int directoryListStart = 38; static const int directoryCreateTemp = 38;
static const int directoryListNext = 39; static const int directoryListStart = 39;
static const int directoryListStop = 40; static const int directoryListNext = 40;
static const int directoryRename = 41; static const int directoryListStop = 41;
static const int sslProcessFilter = 42; static const int directoryRename = 42;
static const int sslProcessFilter = 43;
external static Future<Object?> _dispatch(int request, List data); external static Future<Object?> _dispatch(int request, List data);
} }

View file

@ -869,9 +869,16 @@ abstract class ResourceHandle {
/// Creates wrapper around current stdout. /// Creates wrapper around current stdout.
external factory ResourceHandle.fromStdout(Stdout stdout); external factory ResourceHandle.fromStdout(Stdout stdout);
// Creates wrapper around a readable pipe.
external factory ResourceHandle.fromReadPipe(ReadPipe pipe);
// Creates wrapper around a writeable pipe.
external factory ResourceHandle.fromWritePipe(WritePipe pipe);
/// Extracts opened file from resource handle. /// Extracts opened file from resource handle.
/// ///
/// This can also be used when receiving stdin and stdout handles. /// This can also be used when receiving stdin and stdout handles and read
/// and write pipes.
/// ///
/// If this resource handle is not a file or stdio handle, the behavior of the /// If this resource handle is not a file or stdio handle, the behavior of the
/// returned [RandomAccessFile] is completely unspecified. /// returned [RandomAccessFile] is completely unspecified.
@ -898,6 +905,20 @@ abstract class ResourceHandle {
/// the returned [RawDatagramSocket] is completely unspecified. /// the returned [RawDatagramSocket] is completely unspecified.
/// Be very careful to avoid using a handle incorrectly. /// Be very careful to avoid using a handle incorrectly.
RawDatagramSocket toRawDatagramSocket(); RawDatagramSocket toRawDatagramSocket();
/// Extracts a read pipe from resource handle.
///
/// If this resource handle is not a readable pipe, the behavior of the
/// returned [ReadPipe] is completely unspecified.
/// Be very careful to avoid using a handle incorrectly.
ReadPipe toReadPipe();
/// Extracts a write pipe from resource handle.
///
/// If this resource handle is not a writeable pipe, the behavior of the
/// returned [ReadPipe] is completely unspecified.
/// Be very careful to avoid using a handle incorrectly.
WritePipe toWritePipe();
} }
/// Control message part of the [SocketMessage] received by a call to /// Control message part of the [SocketMessage] received by a call to

View file

@ -0,0 +1,48 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// Dart test program for testing the dart:io `Pipe` class
import 'dart:io';
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
testReadFromClosedPipe() async {
final pipe = await Pipe.create();
pipe.write.close();
Expect.isTrue(await pipe.read.isEmpty);
}
testCreateSync() async {
final pipe = Pipe.createSync();
pipe.write.close();
Expect.isTrue(await pipe.read.isEmpty);
}
testMultipleWritesAndReads() async {
final pipe = await Pipe.create();
int count = 0;
pipe.write.add([count]);
await pipe.read.listen((event) {
Expect.listEquals([count], event);
++count;
if (count < 10) {
pipe.write.add([count]);
} else {
pipe.write.close();
}
}, onDone: () => Expect.equals(10, count));
}
main() async {
asyncStart();
try {
await testReadFromClosedPipe();
await testCreateSync();
await testMultipleWritesAndReads();
} finally {
asyncEnd();
}
}

View file

@ -58,7 +58,27 @@ void testStreamIsEmpty() {
}); });
} }
void main() { Future<void> testStreamAppendedToAfterOpen() async {
asyncStart();
final pipe = Pipe.createSync();
pipe.write.add("Hello World".codeUnits);
int i = 0;
await pipe.read.listen((event) {
Expect.listEquals("Hello World".codeUnits, event);
if (i < 10) {
pipe.write.add("Hello World".codeUnits);
++i;
} else {
pipe.write.close();
}
}).asFuture();
asyncEnd();
}
void main() async {
testPauseResumeCancelStream(); testPauseResumeCancelStream();
testStreamIsEmpty(); testStreamIsEmpty();
await testStreamAppendedToAfterOpen();
} }

View file

@ -828,6 +828,140 @@ Future testStdioMessage(String tempDirPath, {bool caller = false}) async {
}); });
} }
Future testReadPipeMessage(String uniqueName) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return;
}
final address =
InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
final server = await RawServerSocket.bind(address, 0, shared: false);
server.listen((RawSocket socket) async {
socket.listen((e) async {
switch (e) {
case RawSocketEvent.read:
final SocketMessage? message = socket.readMessage();
if (message == null) {
return;
}
Expect.equals('Hello', String.fromCharCodes(message.data));
Expect.equals(1, message.controlMessages.length);
final SocketControlMessage controlMessage =
message.controlMessages[0];
final handles = controlMessage.extractHandles();
Expect.isNotNull(handles);
Expect.equals(1, handles.length);
final receivedPipe = handles[0].toReadPipe();
Expect.equals('Hello over pipe!',
await receivedPipe.transform(utf8.decoder).join());
socket.write('server replied'.codeUnits);
break;
case RawSocketEvent.readClosed:
socket.close();
server.close();
break;
}
});
});
final RawServerSocket testServer = await createTestServer();
final testPipe = await Pipe.create();
// Send a message containing an open pipe.
final socket = await RawSocket.connect(address, 0);
socket.listen((e) {
switch (e) {
case RawSocketEvent.write:
socket.sendMessage(<SocketControlMessage>[
SocketControlMessage.fromHandles(
<ResourceHandle>[ResourceHandle.fromReadPipe(testPipe.read)])
], 'Hello'.codeUnits);
testPipe.write.add('Hello over pipe!'.codeUnits);
testPipe.write.close();
break;
case RawSocketEvent.read:
final data = socket.read();
if (data == null) {
return;
}
final dataString = String.fromCharCodes(data);
Expect.equals('server replied', dataString);
socket.close();
testPipe.write.close();
testServer.close();
}
});
}
Future testWritePipeMessage(String uniqueName) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return;
}
final address =
InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
final server = await RawServerSocket.bind(address, 0, shared: false);
server.listen((RawSocket socket) async {
socket.listen((e) async {
switch (e) {
case RawSocketEvent.read:
final SocketMessage? message = socket.readMessage();
if (message == null) {
return;
}
Expect.equals('Hello', String.fromCharCodes(message.data));
Expect.equals(1, message.controlMessages.length);
final SocketControlMessage controlMessage =
message.controlMessages[0];
final handles = controlMessage.extractHandles();
Expect.isNotNull(handles);
Expect.equals(1, handles.length);
final receivedPipe = handles[0].toWritePipe();
receivedPipe.add('Hello over pipe!'.codeUnits);
receivedPipe.close();
socket.write('server replied'.codeUnits);
break;
case RawSocketEvent.readClosed:
socket.close();
server.close();
break;
}
});
});
final RawServerSocket testServer = await createTestServer();
final testPipe = await Pipe.create();
// Send a message containing an open pipe.
final socket = await RawSocket.connect(address, 0);
socket.listen((e) async {
switch (e) {
case RawSocketEvent.write:
socket.sendMessage(<SocketControlMessage>[
SocketControlMessage.fromHandles(
<ResourceHandle>[ResourceHandle.fromWritePipe(testPipe.write)])
], 'Hello'.codeUnits);
Expect.equals('Hello over pipe!',
await testPipe.read.transform(utf8.decoder).join());
break;
case RawSocketEvent.read:
final data = socket.read();
if (data == null) {
return;
}
final dataString = String.fromCharCodes(data);
Expect.equals('server replied', dataString);
socket.close();
testPipe.write.close();
testServer.close();
}
});
}
Future testDeleteFile(String tempDirPath) async { Future testDeleteFile(String tempDirPath) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return; return;
@ -954,6 +1088,12 @@ void main(List<String> args) async {
await withTempDir('unix_socket_test', (Directory dir) async { await withTempDir('unix_socket_test', (Directory dir) async {
await testStdioMessage('${dir.path}', caller: true); await testStdioMessage('${dir.path}', caller: true);
}); });
await withTempDir('unix_socket_test', (Directory dir) async {
await testReadPipeMessage('${dir.path}');
});
await withTempDir('unix_socket_test', (Directory dir) async {
await testWritePipeMessage('${dir.path}');
});
await withTempDir('unix_socket_test', (Directory dir) async { await withTempDir('unix_socket_test', (Directory dir) async {
await testDeleteFile('${dir.path}'); await testDeleteFile('${dir.path}');
}); });

View file

@ -0,0 +1,50 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// Dart test program for testing the dart:io `Pipe` class
// @dart = 2.9
import 'dart:io';
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
testReadFromClosedPipe() async {
final pipe = await Pipe.create();
pipe.write.close();
Expect.isTrue(await pipe.read.isEmpty);
}
testCreateSync() async {
final pipe = Pipe.createSync();
pipe.write.close();
Expect.isTrue(await pipe.read.isEmpty);
}
testMultipleWritesAndReads() async {
final pipe = await Pipe.create();
int count = 0;
pipe.write.add([count]);
await pipe.read.listen((event) {
Expect.listEquals([count], event);
++count;
if (count < 10) {
pipe.write.add([count]);
} else {
pipe.write.close();
}
}, onDone: () => Expect.equals(10, count));
}
main() async {
asyncStart();
try {
await testReadFromClosedPipe();
await testCreateSync();
await testMultipleWritesAndReads();
} finally {
asyncEnd();
}
}

View file

@ -60,7 +60,27 @@ void testStreamIsEmpty() {
}); });
} }
void main() { Future<void> testStreamAppendedToAfterOpen() async {
asyncStart();
final pipe = Pipe.createSync();
pipe.write.add("Hello World".codeUnits);
int i = 0;
await pipe.read.listen((event) {
Expect.listEquals("Hello World".codeUnits, event);
if (i < 10) {
pipe.write.add("Hello World".codeUnits);
++i;
} else {
pipe.write.close();
}
}).asFuture();
asyncEnd();
}
void main() async {
testPauseResumeCancelStream(); testPauseResumeCancelStream();
testStreamIsEmpty(); testStreamIsEmpty();
await testStreamAppendedToAfterOpen();
} }

View file

@ -738,7 +738,7 @@ Future testSocketMessage(String uniqueName) async {
}); });
} }
Future testStdioMessage(String tempDirPath, {bool caller: false}) async { Future testStdioMessage(String tempDirPath, {bool caller = false}) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return; return;
} }
@ -831,6 +831,140 @@ Future testStdioMessage(String tempDirPath, {bool caller: false}) async {
return completer.future; return completer.future;
} }
Future testReadPipeMessage(String uniqueName) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return;
}
final address =
InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
final server = await RawServerSocket.bind(address, 0, shared: false);
server.listen((RawSocket socket) async {
socket.listen((e) async {
switch (e) {
case RawSocketEvent.read:
final SocketMessage message = socket.readMessage();
if (message == null) {
return;
}
Expect.equals('Hello', String.fromCharCodes(message.data));
Expect.equals(1, message.controlMessages.length);
final SocketControlMessage controlMessage =
message.controlMessages[0];
final handles = controlMessage.extractHandles();
Expect.isNotNull(handles);
Expect.equals(1, handles.length);
final receivedPipe = handles[0].toReadPipe();
Expect.equals('Hello over pipe!',
await receivedPipe.transform(utf8.decoder).join());
socket.write('server replied'.codeUnits);
break;
case RawSocketEvent.readClosed:
socket.close();
server.close();
break;
}
});
});
final RawServerSocket testServer = await createTestServer();
final testPipe = await Pipe.create();
// Send a message containing an open pipe.
final socket = await RawSocket.connect(address, 0);
socket.listen((e) {
switch (e) {
case RawSocketEvent.write:
socket.sendMessage(<SocketControlMessage>[
SocketControlMessage.fromHandles(
<ResourceHandle>[ResourceHandle.fromReadPipe(testPipe.read)])
], 'Hello'.codeUnits);
testPipe.write.add('Hello over pipe!'.codeUnits);
testPipe.write.close();
break;
case RawSocketEvent.read:
final data = socket.read();
if (data == null) {
return;
}
final dataString = String.fromCharCodes(data);
Expect.equals('server replied', dataString);
socket.close();
testPipe.write.close();
testServer.close();
}
});
}
Future testWritePipeMessage(String uniqueName) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return;
}
final address =
InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
final server = await RawServerSocket.bind(address, 0, shared: false);
server.listen((RawSocket socket) async {
socket.listen((e) async {
switch (e) {
case RawSocketEvent.read:
final SocketMessage message = socket.readMessage();
if (message == null) {
return;
}
Expect.equals('Hello', String.fromCharCodes(message.data));
Expect.equals(1, message.controlMessages.length);
final SocketControlMessage controlMessage =
message.controlMessages[0];
final handles = controlMessage.extractHandles();
Expect.isNotNull(handles);
Expect.equals(1, handles.length);
final receivedPipe = handles[0].toWritePipe();
receivedPipe.add('Hello over pipe!'.codeUnits);
receivedPipe.close();
socket.write('server replied'.codeUnits);
break;
case RawSocketEvent.readClosed:
socket.close();
server.close();
break;
}
});
});
final RawServerSocket testServer = await createTestServer();
final testPipe = await Pipe.create();
// Send a message containing an open pipe.
final socket = await RawSocket.connect(address, 0);
socket.listen((e) async {
switch (e) {
case RawSocketEvent.write:
socket.sendMessage(<SocketControlMessage>[
SocketControlMessage.fromHandles(
<ResourceHandle>[ResourceHandle.fromWritePipe(testPipe.write)])
], 'Hello'.codeUnits);
Expect.equals('Hello over pipe!',
await testPipe.read.transform(utf8.decoder).join());
break;
case RawSocketEvent.read:
final data = socket.read();
if (data == null) {
return;
}
final dataString = String.fromCharCodes(data);
Expect.equals('server replied', dataString);
socket.close();
testPipe.write.close();
testServer.close();
}
});
}
Future testDeleteFile(String tempDirPath) async { Future testDeleteFile(String tempDirPath) async {
if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) { if (!Platform.isMacOS && !Platform.isLinux && !Platform.isAndroid) {
return; return;
@ -955,6 +1089,12 @@ void main(List<String> args) async {
await withTempDir('unix_socket_test', (Directory dir) async { await withTempDir('unix_socket_test', (Directory dir) async {
await testSocketMessage('${dir.path}'); await testSocketMessage('${dir.path}');
}); });
await withTempDir('unix_socket_test', (Directory dir) async {
await testReadPipeMessage('${dir.path}');
});
await withTempDir('unix_socket_test', (Directory dir) async {
await testWritePipeMessage('${dir.path}');
});
await withTempDir('unix_socket_test', (Directory dir) async { await withTempDir('unix_socket_test', (Directory dir) async {
await testStdioMessage('${dir.path}', caller: true); await testStdioMessage('${dir.path}', caller: true);
}); });