/* * Copyright (c) 2024, Dan Klishch * * SPDX-License-Identifier: BSD-2-Clause */ #pragma once #include #include #include #include namespace AK { template class AsyncStreamTransform : public AsyncInputStream { public: AsyncStreamTransform(MaybeOwned&& stream, AK::Generator>&& generator) : m_stream(move(stream)) , m_generator(move(generator)) { } ~AsyncStreamTransform() { // 1. Assert that nobody is awaiting on the resource. VERIFY(!m_generator_has_awaiters); // 2. If resource is open, perform Reset AO. if (is_open()) reset(); } void reset() override { VERIFY(is_open()); m_stream->reset(); if (!m_generator_has_awaiters) m_generator.destroy(); m_is_open = false; } Coroutine> close() override { VERIFY(is_open()); TemporaryChange await_guard(m_generator_has_awaiters, true); if (!m_generator.is_done()) { Variant> chunk_or_eof = co_await m_generator.next(); if (chunk_or_eof.has()) { reset(); co_return Error::from_errno(EBUSY); } else { m_is_open = false; auto& error_or_eof = chunk_or_eof.get>(); if (error_or_eof.is_error()) { co_return error_or_eof.release_error(); } else { if (m_stream.is_owned()) CO_TRY(co_await m_stream->close()); co_return {}; } } } else { m_is_open = false; if (m_stream.is_owned()) CO_TRY(co_await m_stream->close()); co_return {}; } } bool is_open() const override { return m_is_open; } Coroutine> enqueue_some(Badge) override { VERIFY(is_open()); TemporaryChange await_guard(m_generator_has_awaiters, true); if (m_generator.is_done()) co_return false; Variant> chunk_or_eof = co_await m_generator.next(); if (chunk_or_eof.has()) { co_return true; } else { auto& error_or_eof = chunk_or_eof.get>(); if (error_or_eof.is_error()) { m_is_open = false; co_return error_or_eof.release_error(); } else { co_return false; } } } protected: using Generator = AK::Generator>; MaybeOwned m_stream; private: Generator m_generator; bool m_is_open { true }; bool m_generator_has_awaiters { false }; }; } #ifdef USING_AK_GLOBALLY using AK::AsyncStreamTransform; #endif