AK: Add CircularDuplexStream class.

This commit is contained in:
asynts 2020-08-25 14:57:02 +02:00 committed by Andreas Kling
parent 8e08d9f70a
commit a82fead38a
4 changed files with 236 additions and 1 deletions

140
AK/CircularDuplexStream.h Normal file
View file

@ -0,0 +1,140 @@
/*
* Copyright (c) 2020, the SerenityOS developers.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <AK/CircularQueue.h>
#include <AK/Stream.h>
namespace AK {
// FIXME: There are a lot of raw loops here, that's not necessary an issue but it
// has to be verified that the optimizer is able to insert memcpy instead.
template<size_t Capacity>
class CircularDuplexStream : public AK::DuplexStream {
public:
size_t write(ReadonlyBytes bytes) override
{
const auto nwritten = min(bytes.size(), Capacity - m_queue.size());
for (size_t idx = 0; idx < nwritten; ++idx)
m_queue.enqueue(bytes[idx]);
m_total_written += nwritten;
return nwritten;
}
bool write_or_error(ReadonlyBytes bytes) override
{
if (Capacity - m_queue.size() < bytes.size()) {
m_error = true;
return false;
}
write(bytes);
return true;
}
size_t read(Bytes bytes) override
{
const auto nread = min(bytes.size(), m_queue.size());
for (size_t idx = 0; idx < nread; ++idx)
bytes[idx] = m_queue.dequeue();
return nread;
}
size_t read(Bytes bytes, size_t seekback)
{
ASSERT(seekback <= Capacity);
if (seekback > m_total_written) {
m_error = true;
return 0;
}
const auto nread = min(bytes.size(), seekback);
for (size_t idx = 0; idx < nread; ++idx) {
const auto index = (m_total_written - seekback + idx) % Capacity;
bytes[idx] = m_queue.m_storage[index];
}
return nread;
}
bool read_or_error(Bytes bytes) override
{
if (m_queue.size() < bytes.size()) {
m_error = true;
return false;
}
read(bytes);
return true;
}
bool discard_or_error(size_t count) override
{
if (m_queue.size() < count) {
m_error = true;
return false;
}
for (size_t idx = 0; idx < count; ++idx)
m_queue.dequeue();
return true;
}
bool eof() const override { return m_queue.size() == 0; }
size_t remaining_contigous_space() const
{
return min(Capacity - m_queue.size(), m_queue.capacity() - (m_queue.head_index() + m_queue.size()) % Capacity);
}
Bytes reserve_contigous_space(size_t count)
{
ASSERT(count <= remaining_contigous_space());
Bytes bytes { m_queue.m_storage + (m_queue.head_index() + m_queue.size()) % Capacity, count };
m_queue.m_size += count;
m_total_written += count;
return bytes;
}
private:
CircularQueue<u8, Capacity> m_queue;
size_t m_total_written { 0 };
};
}
using AK::CircularDuplexStream;

View file

@ -27,13 +27,15 @@
#pragma once
#include <AK/Assertions.h>
#include <AK/Forward.h>
#include <AK/StdLibExtras.h>
#include <AK/Types.h>
namespace AK {
template<typename T, size_t Capacity>
class CircularQueue {
friend CircularDuplexStream<Capacity>;
public:
CircularQueue()
{

View file

@ -51,6 +51,10 @@ class Utf8View;
class InputStream;
class InputMemoryStream;
class DuplexMemoryStream;
class OutputStream;
template<size_t Capacity>
class CircularDuplexStream;
template<typename T>
class Span;
@ -123,6 +127,7 @@ using AK::Bitmap;
using AK::BufferStream;
using AK::ByteBuffer;
using AK::Bytes;
using AK::CircularDuplexStream;
using AK::CircularQueue;
using AK::DebugLogStream;
using AK::DoublyLinkedList;
@ -143,6 +148,7 @@ using AK::LogStream;
using AK::NonnullOwnPtr;
using AK::NonnullRefPtr;
using AK::Optional;
using AK::OutputStream;
using AK::OwnPtr;
using AK::ReadonlyBytes;
using AK::RefPtr;

View file

@ -0,0 +1,87 @@
/*
* Copyright (c) 2020, the SerenityOS developers
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <AK/TestSuite.h>
#include <AK/CircularDuplexStream.h>
TEST_CASE(works_like_a_queue)
{
constexpr size_t capacity = 32;
CircularQueue<u8, capacity> queue;
CircularDuplexStream<capacity> stream;
for (size_t idx = 0; idx < capacity; ++idx) {
queue.enqueue(static_cast<u8>(idx % 256));
stream << static_cast<u8>(idx % 256);
}
for (size_t idx = 0; idx < capacity; ++idx) {
u8 byte;
stream >> byte;
EXPECT_EQ(queue.dequeue(), byte);
}
EXPECT(stream.eof());
}
TEST_CASE(overwritting_is_well_defined)
{
constexpr size_t half_capacity = 16;
constexpr size_t capacity = 2 * half_capacity;
CircularDuplexStream<capacity> stream;
for (size_t idx = 0; idx < capacity; ++idx)
stream << static_cast<u8>(idx % 256);
u8 bytes[half_capacity];
stream >> Bytes { bytes, sizeof(bytes) };
for (size_t idx = 0; idx < half_capacity; ++idx) {
EXPECT_EQ(bytes[idx], idx % 256);
}
for (size_t idx = 0; idx < half_capacity; ++idx)
stream << static_cast<u8>(idx % 256);
for (size_t idx = 0; idx < capacity; ++idx) {
u8 byte;
stream >> byte;
if (idx < half_capacity)
EXPECT_EQ(byte, half_capacity + idx % 256);
else
EXPECT_EQ(byte, idx % 256 - half_capacity);
}
EXPECT(stream.eof());
}
TEST_MAIN(CircularDuplexStream)