1
0
mirror of https://github.com/SerenityOS/serenity synced 2024-06-29 07:20:21 +00:00

AK: Introduce asynchronous streams

This commit is contained in:
Dan Klishch 2024-02-20 00:19:30 -05:00 committed by Ali Mohammad Pur
parent b15b51f9da
commit 6662d5de2b
3 changed files with 344 additions and 0 deletions

227
AK/AsyncStream.h Normal file
View File

@ -0,0 +1,227 @@
/*
* Copyright (c) 2024, Dan Klishch <danilklishch@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Badge.h>
#include <AK/ByteBuffer.h>
#include <AK/Coroutine.h>
#include <AK/Error.h>
#include <AK/ScopeGuard.h>
namespace AK {
// AsyncResource represents a generic resource (e. g. POSIX file descriptor, AsyncStream, HTTP
// response body) with a failible and/or asynchronous destructor. Refer to AsynchronousDesign.md
// documentation page for a description tailored for users of the asynchronous resources.
//
// In order to correctly implement methods of AsyncResource, you first have to define (not
// necessarily in code) two abstract operations: Close and Reset. They should have the following
// semantics:
//
// * Close AO:
// 1. Assert that nobody is awaiting on a resource.
// 2. Ensure that further attempts to wait on a resource will assert.
// 3. Shutdown (possibly asynchronously) the associated low-level resource. Shutdown must ensure
// that if the state of a resource is clean, it will remain so indefinitely. The "clean"
// state is resource-specific--for example, streams might define it as "no outstanding writes
// and no unread data".
// 4. Check if the state of the resource is clean. If it is not, call Reset AO and return an
// error (preferably, EBUSY).
// 5. Free (possibly asynchronously) the associated low-level resource.
// 6. Return success.
//
// * Reset AO:
// 1. Schedule returning an error (preferably, ECANCELED) from the current resource awaiters.
// 2. Ensure that further attempts to wait on a resource will assert.
// 3. Free synchronously the associated low-level resource. Preferably, this should be done in a
// way that cleanly indicates an error for the event producer.
// 4. Return synchronously.
class AsyncResource {
AK_MAKE_NONCOPYABLE(AsyncResource);
AK_MAKE_NONMOVABLE(AsyncResource);
public:
AsyncResource() = default;
// Destructor of an AsyncResource must perform the following steps when called:
// 1. Assert that nobody is awaiting on the resource.
// 2. If resource is open, perform Reset AO.
virtual ~AsyncResource() = default;
// reset() must perform the following steps when called:
// 1. Assert that the resource is open.
// 2. Perform Reset AO.
virtual void reset() = 0;
// close() must perform the following steps when called:
// 1. Assert that the object is fully constructed. For example, a socket might assert that it is
// connected.
// 2. Assert that the resource is open.
// 3. Perform Close AO, await and return its result.
virtual Coroutine<ErrorOr<void>> close() = 0;
// Resource is said to be in an error state if either Reset AO was invoked or if an operation on
// a resource has failed and an implementation deemed the error unrecoverable. If a resource is
// being transitioned to an error state because of an internal error, Reset AO (or its
// equivalent) must be executed by an implementation. Resource is said to be open if it is not
// in a error state and Close AO has never been called on it.
virtual bool is_open() const = 0;
};
// AsyncInputStream is a base class for all asynchronous input streams. Refer to
// AsynchronousDesign.md documentation page for a description tailored for users of the streams.
//
// In order to implement a brand new AsyncInputStream, you generally have to define a destructor and
// overload six virtual functions: 3 from AsyncResource and 3 from AsyncInputStream. When
// implementing the AsyncResource interface, please note that AsyncInputStream is considered clean
// if there's no data left to be read.
class AsyncInputStream : public virtual AsyncResource {
public:
struct PeekOrEofResult {
ReadonlyBytes data;
bool is_eof;
};
AsyncInputStream() = default;
ReadonlyBytes buffered_data() const
{
VERIFY(is_open());
return buffered_data_unchecked({});
}
Coroutine<ErrorOr<PeekOrEofResult>> peek_or_eof()
{
VERIFY(is_open());
if (!m_is_reading_peek) {
m_is_reading_peek = true;
auto data = buffered_data_unchecked({});
if (!data.is_empty())
co_return PeekOrEofResult { data, false };
}
bool is_not_eof = CO_TRY(co_await enqueue_some({}));
co_return PeekOrEofResult { buffered_data_unchecked({}), !is_not_eof };
}
Coroutine<ErrorOr<ReadonlyBytes>> peek()
{
auto [data, is_eof] = CO_TRY(co_await peek_or_eof());
if (is_eof) {
reset();
co_return Error::from_errno(EIO);
}
co_return data;
}
Coroutine<ErrorOr<ReadonlyBytes>> read(size_t bytes)
{
m_is_reading_peek = false;
if (bytes) {
auto buffer = buffered_data();
while (buffer.size() < bytes) {
if (!CO_TRY(co_await enqueue_some({}))) {
reset();
co_return Error::from_errno(EIO);
}
buffer = buffered_data_unchecked({});
}
dequeue({}, bytes);
co_return buffer.slice(0, bytes);
} else {
co_return Bytes {};
}
}
template<typename T>
Coroutine<ErrorOr<T>> read_object()
{
auto bytes = CO_TRY(co_await read(sizeof(T)));
union {
T object;
char representation[sizeof(T)];
} reinterpreter = {};
memcpy(&reinterpreter, bytes.data(), sizeof(T));
co_return reinterpreter.object;
}
// If EOF has not been reached, `enqueue_some` should read at least one byte from the underlying
// stream to the internal buffer and return true. Otherwise, it must not change the buffer and
// return false. If read fails and, consequently, `enqueue_some` returns Error, it must
// perform Reset AO (or an equivalent of it). Therefore, all reading errors are considered fatal
// for AsyncInputStream. Additionally, implementation must assert if `enqueue_some` is called
// concurrently. This is the only method that can be interrupted by `reset`.
virtual Coroutine<ErrorOr<bool>> enqueue_some(Badge<AsyncInputStream>) = 0;
// `buffered_data_unchecked` should just return a view of the buffer. It must not invalidate
// previously returned views of the buffer.
virtual ReadonlyBytes buffered_data_unchecked(Badge<AsyncInputStream>) const = 0;
// `dequeue` should remove `bytes` bytes from the buffer. It is guaranteed that this amount of
// bytes will be present in the buffer at the point of the call. `dequeue` must not invalidate
// previously returned views of the buffer. There are some restrictions on `bytes` parameter
// originating from the length condition (see documentation), so if you just use
// AsyncStreamBuffer as the stream buffer, `dequeue` and `enqueue_some` will have amortized
// O(stream_length) complexity.
virtual void dequeue(Badge<AsyncInputStream>, size_t bytes) = 0;
protected:
static Badge<AsyncInputStream> badge() { return {}; }
bool m_is_reading_peek { false };
};
class AsyncOutputStream : public virtual AsyncResource {
public:
AsyncOutputStream() = default;
virtual Coroutine<ErrorOr<size_t>> write_some(ReadonlyBytes buffer) = 0;
virtual Coroutine<ErrorOr<void>> write(ReadonlySpan<ReadonlyBytes> buffers)
{
for (auto buffer : buffers) {
while (!buffer.is_empty()) {
auto nwritten = CO_TRY(co_await write_some(buffer));
buffer = buffer.slice(nwritten);
}
}
co_return {};
}
};
class AsyncStream
: public AsyncInputStream
, public AsyncOutputStream {
public:
AsyncStream() = default;
};
template<typename T>
class StreamWrapper : public virtual AsyncResource {
public:
StreamWrapper(NonnullOwnPtr<T>&& stream)
: m_stream(move(stream))
{
}
void reset() override { return m_stream->reset(); }
Coroutine<ErrorOr<void>> close() override { return m_stream->close(); }
bool is_open() const override { return m_stream->is_open(); }
protected:
NonnullOwnPtr<T> m_stream;
};
}
#ifdef USING_AK_GLOBALLY
using AK::AsyncInputStream;
using AK::AsyncOutputStream;
using AK::AsyncResource;
using AK::AsyncStream;
using AK::StreamWrapper;
#endif

View File

@ -20,6 +20,9 @@ class ByteBuffer;
enum class TrailingCodePointTransformation : u8;
class AsyncInputStream;
class AsyncOutputStream;
class AsyncStream;
class BigEndianInputBitStream;
class BigEndianOutputBitStream;
class Bitmap;
@ -154,6 +157,9 @@ class [[nodiscard]] ErrorOr;
#if USING_AK_GLOBALLY
using AK::Array;
using AK::AsyncInputStream;
using AK::AsyncOutputStream;
using AK::AsyncStream;
using AK::Atomic;
using AK::Badge;
using AK::BigEndianInputBitStream;

View File

@ -0,0 +1,111 @@
# Coroutines and asynchronous streams
## Asynchronous resources
`AK::AsyncResource` class represents a generic resource (e. g. POSIX file descriptor, AsyncStream, HTTP response body) with a failible and/or asynchronous destructor. As an example of such destructor, an asynchronous socket might want to wait for all outstanding transfers to complete and want to notify the calling code if the server acknowledged all buffered writes.
When working with AsyncResource, the only thing you should pay attention to is not leaving the resource open when you are done with it. This is important both for consistency and for not signaling spurious end-of-data errors to the other party. This close/reset hygiene is not hard to achieve in practice: every AsyncResource is automatically reset during destruction and some resources reset on any error returned by any of their interfaces.
The above means that the following snippet is correct:
```cpp
Coroutine<ErrorOr<void>> do_very_meaningful_work()
{
Core::AsyncTCPSocket socket = make_me_a_socket();
// Core::AsyncTCPSocket is AK::AsyncStream (which is itself an AsyncResource, obviously), which
// exhibits reset-on-error behavior.
auto object = CO_TRY(co_await socket.read_object<VeryImportantObject>());
auto response = CO_TRY(process_object(object));
CO_TRY(co_await socket.write(response));
CO_TRY(co_await socket.close());
}
```
and will send a TCP RST if any of the functions (including `process_object`) fails and will close the socket gracefully if everything goes well.
Of course, automatic-reset-on-destruction does not always get rid of explicit resets. If asynchronous resource is not local to a fallible function, you will have to call reset manually in case of errors, i. e.
```cpp
Coroutine<ErrorOr<void>> do_very_meaningful_work_second_time(AsyncStream& stream)
{
auto object = CO_TRY(co_await stream.read_object<VeryImportantObject>());
auto response_or_error = process_object(object);
if (response_or_error.is_error()) {
stream.reset();
co_return response_or_error.release_error();
}
CO_TRY(co_await stream.write(response_or_error.release_value()));
}
```
It might be tempting to just do nothing with a stream in this situation. However, this will leave the stream in an unknown state if the function fails, which would necessitate `stream->is_open()` checks later on in the caller (because doing pretty much anything with a closed stream asserts).
As mentioned in the previous paragraph, in addition to `close` and `reset` methods, resources have an `is_open` method which checks if the resource has already been closed or reset (either explicitly or implicitly by a failed method). `is_open` state cannot change by itself with time (i. e. if a server sends RST when nobody is listening the socket)&mdash;one has to call a method on a resource for `open` state to change.
## Input streams
`AK::AsyncInputStream` is a base class for all asynchronous input streams.
Every AsyncInputStream is effectively buffered by design. This means that all input streams have an internal read buffer to which data is read and references to which are returned from `peek` and `read` in the form of ReadonlyBytes.
Typical workflow with AsyncInputStream consists of a number of peeks (until the caller finds what they are looking for in the stream) followed by a read that removes just-peeked object from the buffer.
The following example reads an object of an unknown size from the stream:
```cpp
Coroutine<ErrorOr<VeryImportantObject>> read_some_very_important_object(AsyncInputStream& stream)
{
size_t byte_size;
while (true) {
auto bytes = CO_TRY(co_await stream.peek());
Optional<size_t> maybe_size = figure_out_size_from_prefix(bytes);
if (maybe_size.has_value()) {
// Yay! We read enough to figure out how long the object is.
byte_size = maybe_size.value();
break;
}
}
auto bytes = CO_TRY(co_await stream.read(byte_size));
auto object_or_error = parse_object_from_data(bytes);
if (object_or_error.is_error()) {
stream.reset();
co_return object_or_error.release_error();
}
co_return object_or_error.release_value();
}
```
Of course, if we know size upfront, reading stuff from a stream is as easy as `stream->read(size)`.
> [!IMPORTANT]
> Note that you should never peek more than is absolutely necessary.
More formally, you should ensure the following: at the time of `read`, let $(s_1, s_2, ..., s_n)$ be a sequence of lengths of ReadonlyBytes views returned from `peek` or `peek_or_eof` since the last read (or since the creation of the stream, if there were no previous reads). If $n \le 1$, then any length is allowed. Otherwise, if $n > 1$, $s_{n - 1}$ MUST NOT be greater than `bytes` parameter. Moreover, if the stream data does not have sub-byte structure and EOF has not been reached, `bytes` SHOULD be greater than $s_{n - 1}$. While the said condition might seem arbitrary, violation of it almost always indicates a bug in the caller's code. The asynchronous streams framework doesn't guarantee linear asymptotic runtime complexity in the case of length condition violation.
It is not hard to fulfill the condition in practice. For example, in the `read_some_very_important_object` example, the condition just requires `figure_out_size_from_prefix` to figure out size if the object is already fully inside `bytes` parameter.
Note that `read` always returns exactly `bytes` bytes. If `bytes` is larger than $s_n$ or $n$ is 0, `read` will return more data than previously peeked (by reading the stream, obviously).
If your use-case doesn't require the knowledge of EOF location (i. e. you know how much to read upfront without relying on EOF), then just use `peek`, `read`, and `close` in the end (if you own the stream, of course). `peek` or `read` will reset the stream and error out if input ended prematurely because of EOF. Likewise, `close` will error out if the data is left after the stream supposedly should have been fully read. EIO is returned for the unexpected stream end and EBUSY&mdash;for not reading the whole stream.
For the EOF-aware applications, AsyncInputStream provides `peek_or_eof`. In contrast to `peek`, `peek_or_eof` returns an additional flag indicating if EOF has been reached. Note that the EOF detection is similar to POSIX streams: first, `peek_or_eof` returns data up to EOF without `is_eof` flag being set, and then, on the next call, it returns the same data with `is_eof` flag set.
As an example, let us abuse the said functionality of `peek_or_eof` to implement a completely reliable `is_eof` for an asynchronous stream:
```cpp
Coroutine<ErrorOr<bool>> accurate_is_eof(AsyncInputStream& stream) {
auto [_, is_eof] = CO_TRY(co_await stream.peek_or_eof());
must_sync(stream.read(0));
co_return is_eof;
}
```
You might notice a seemingly useless read of 0 bytes here. Why do we need it? Well, let us remove it and consider the case when we have a stream with an empty buffer and a single byte left overall. `peek_or_eof` will return that byte without `is_eof` flag set, so `accurate_is_eof` returns false. Next, someone tries to peek that byte with a plain `peek` that will in turn call `peek_or_eof`, which now _will_ set the `is_eof` flag. The flag will then be checked by `peek` and the stream will error out with EIO.
Therefore,
> [!IMPORTANT]
> If you peek something from a stream, you should read it.
What follows is a more formal specification of `peek`, `peek_or_eof`, and `read` behavior. The implementation of them might seem simple but it has a lot of conceptual complexity packed into it.
Each `peek_or_eof` call is classified as either reading or non-reading peek. Reading peeks always read new data, while non-reading peeks only do so when there's no data available in the buffer. In the former case, non-reading peek is called a no-op peek and, in the later case, peek is called a promoted peek. Peek occurring after another peek is always a reading peek while a peek occurring just after `read` call is non-reading. Note that this gives rise to the aforementioned length condition: peek, in some way, is always a request for new unseen data, and an unnecessary peek can, for example, unintentionally read EOF and error out.
Promoted and reading peeks read data from the underlying stream and check if EOF has been encountered (i. e. read returned 0 new bytes). If it was not, peek adds read data to the buffer. Finally, regardless of its type, peek returns a view to the buffer.
Calling `peek` when the EOF has been reached as well as calling `read` that tries to read past EOF is a protocol violation and results in EIO error. Note that EIO (just like any other error) causes the stream to reset, so if one attempts to continue reading from the stream, it will assert. Next, calling read operations concurrently asserts since it is a logic error. And finally, calling `peek`, `peek_or_eof`, or `read` on a stream that isn't open is a logic error as well and asserts too.