AK: Add DuplexMemoryStream class.

This class is similar to BufferStream because it is possible to both
read and write to it. However, it differs in the following ways:

  - DuplexMemoryStream keeps a history of 64KiB and discards the rest,
    BufferStream always keeps everything around.

  - DuplexMemoryStream tracks reading and writing seperately, the
    following is valid:

        DuplexMemoryStream stream;
        stream << 42;
        int value;
        stream >> value;

    For BufferStream it would read:

        BufferStream stream;
        stream << 42;
        int value;
        stream.seek(0);
        stream >> value;

In the future I would like to replace all usages of BufferStream with
InputMemoryStream, OutputMemoryStream (doesn't exist yet) and
DuplexMemoryStream. For now I just add DuplexMemoryStream though.
This commit is contained in:
asynts 2020-08-18 18:04:17 +02:00 committed by Andreas Kling
parent a15556638d
commit 30abadcff9
3 changed files with 222 additions and 1 deletions

View file

@ -50,6 +50,7 @@ class Utf32View;
class Utf8View;
class InputStream;
class InputMemoryStream;
class DuplexMemoryStream;
template<typename T>
class Span;
@ -125,6 +126,7 @@ using AK::Bytes;
using AK::CircularQueue;
using AK::DebugLogStream;
using AK::DoublyLinkedList;
using AK::DuplexMemoryStream;
using AK::FixedArray;
using AK::FlyString;
using AK::Function;

View file

@ -26,10 +26,12 @@
#pragma once
#include <AK/ByteBuffer.h>
#include <AK/Concepts.h>
#include <AK/Forward.h>
#include <AK/Span.h>
#include <AK/StdLibExtras.h>
#include <AK/Vector.h>
namespace AK::Detail {
@ -52,7 +54,7 @@ protected:
namespace AK {
class InputStream : public AK::Detail::Stream {
class InputStream : public virtual AK::Detail::Stream {
public:
virtual size_t read(Bytes) = 0;
virtual bool read_or_error(Bytes) = 0;
@ -60,6 +62,17 @@ public:
virtual bool discard_or_error(size_t count) = 0;
};
class OutputStream : public virtual AK::Detail::Stream {
public:
virtual size_t write(ReadonlyBytes) = 0;
virtual bool write_or_error(ReadonlyBytes) = 0;
};
class DuplexStream
: public InputStream
, public OutputStream {
};
#if defined(__cpp_concepts) && !defined(__COVERITY__)
template<Concepts::Integral Integral>
#else
@ -71,8 +84,21 @@ InputStream& operator>>(InputStream& stream, Integral& value)
return stream;
}
#if defined(__cpp_concepts) && !defined(__COVERITY__)
template<Concepts::Integral Integral>
#else
template<typename Integral, typename EnableIf<IsIntegral<Integral>::value, int>::Type = 0>
#endif
OutputStream& operator<<(OutputStream& stream, Integral value)
{
stream.write_or_error({ &value, sizeof(value) });
return stream;
}
#ifndef KERNEL
// FIXME: clang-format adds spaces before the #if for some reason.
// clang-format off
#if defined(__cpp_concepts) && !defined(__COVERITY__)
template<Concepts::FloatingPoint FloatingPoint>
#else
@ -84,7 +110,19 @@ InputStream& operator>>(InputStream& stream, FloatingPoint& value)
return stream;
}
#if defined(__cpp_concepts) && !defined(__COVERITY__)
template<Concepts::FloatingPoint FloatingPoint>
#else
template<typename FloatingPoint, typename EnableIf<IsFloatingPoint<FloatingPoint>::value, int>::Type = 0>
#endif
OutputStream& operator<<(OutputStream& stream, FloatingPoint value)
{
stream.write_or_error({ &value, sizeof(value) });
return stream;
}
#endif
// clang-format on
inline InputStream& operator>>(InputStream& stream, bool& value)
{
@ -92,12 +130,24 @@ inline InputStream& operator>>(InputStream& stream, bool& value)
return stream;
}
inline OutputStream& operator<<(OutputStream& stream, bool value)
{
stream.write_or_error({ &value, sizeof(value) });
return stream;
}
inline InputStream& operator>>(InputStream& stream, Bytes bytes)
{
stream.read_or_error(bytes);
return stream;
}
inline OutputStream& operator<<(OutputStream& stream, ReadonlyBytes bytes)
{
stream.write_or_error(bytes);
return stream;
}
class InputMemoryStream final : public InputStream {
friend InputMemoryStream& operator>>(InputMemoryStream& stream, String& string);
@ -224,7 +274,103 @@ private:
size_t m_offset { 0 };
};
// All data written to this stream can be read from it. Reading and writing is done
// using different offsets, meaning that it is not necessary to seek to the start
// before reading; this behaviour differs from BufferStream.
//
// The stream keeps a history of 64KiB which means that seeking backwards is well
// defined. Data past that point will be discarded.
class DuplexMemoryStream final : public DuplexStream {
public:
static constexpr size_t chunk_size = 4 * 1024;
static constexpr size_t history_size = 64 * 1024;
bool eof() const override { return m_write_offset == m_read_offset; }
bool discard_or_error(size_t count) override
{
if (m_write_offset - m_read_offset < count) {
m_error = true;
return false;
}
m_read_offset += count;
try_discard_chunks();
return true;
}
size_t read(Bytes bytes) override
{
size_t nread = 0;
while (bytes.size() - nread > 0 && m_write_offset - m_read_offset - nread > 0) {
const auto chunk_index = (m_read_offset - m_base_offset) / chunk_size;
const auto chunk_bytes = m_chunks[chunk_index].bytes().slice(m_read_offset % chunk_size).trim(m_write_offset - m_read_offset - nread);
nread += chunk_bytes.copy_trimmed_to(bytes.slice(nread));
}
m_read_offset += nread;
try_discard_chunks();
return nread;
}
bool read_or_error(Bytes bytes) override
{
if (m_write_offset - m_read_offset < bytes.size()) {
m_error = true;
return false;
}
read(bytes);
return true;
}
size_t write(ReadonlyBytes bytes) override
{
size_t nwritten = 0;
while (bytes.size() - nwritten > 0) {
if ((m_write_offset + nwritten) % chunk_size == 0)
m_chunks.append(ByteBuffer::create_uninitialized(chunk_size));
nwritten += bytes.copy_trimmed_to(m_chunks.last().bytes().slice(m_write_offset % chunk_size));
}
m_write_offset += nwritten;
return nwritten;
}
bool write_or_error(ReadonlyBytes bytes) override
{
write(bytes);
return true;
}
void seek(size_t offset)
{
ASSERT(offset >= m_base_offset);
ASSERT(offset <= m_write_offset);
m_read_offset = offset;
}
size_t offset() const { return m_read_offset; }
size_t remaining() const { return m_write_offset - m_read_offset; }
private:
void try_discard_chunks()
{
while (m_read_offset - m_base_offset >= history_size + chunk_size) {
m_chunks.take_first();
m_base_offset += chunk_size;
}
}
Vector<ByteBuffer> m_chunks;
size_t m_write_offset { 0 };
size_t m_read_offset { 0 };
size_t m_base_offset { 0 };
};
}
using AK::DuplexMemoryStream;
using AK::InputMemoryStream;
using AK::InputStream;

View file

@ -26,6 +26,7 @@
#include <AK/TestSuite.h>
#include <AK/FixedArray.h>
#include <AK/Stream.h>
static bool compare(ReadonlyBytes lhs, ReadonlyBytes rhs)
@ -108,4 +109,76 @@ TEST_CASE(seeking_slicing_offset)
EXPECT(compare({ expected2, sizeof(expected2) }, { actual2, sizeof(actual2) }));
}
TEST_CASE(duplex_simple)
{
DuplexMemoryStream stream;
EXPECT(stream.eof());
stream << 42;
EXPECT(!stream.eof());
int value;
stream >> value;
EXPECT_EQ(value, 42);
EXPECT(stream.eof());
}
TEST_CASE(duplex_seek_into_history)
{
DuplexMemoryStream stream;
FixedArray<u8> one_kibibyte { 1024 };
EXPECT_EQ(stream.remaining(), 0ul);
for (size_t idx = 0; idx < 256; ++idx) {
stream << one_kibibyte;
}
EXPECT_EQ(stream.remaining(), 256 * 1024ul);
for (size_t idx = 0; idx < 128; ++idx) {
stream >> one_kibibyte;
}
EXPECT_EQ(stream.remaining(), 128 * 1024ul);
// We now have 128KiB on the stream. Because the stream has a
// history size of 64KiB, we should be able to seek to 64KiB.
static_assert(DuplexMemoryStream::history_size == 64 * 1024);
stream.seek(64 * 1024);
EXPECT_EQ(stream.remaining(), 192 * 1024ul);
for (size_t idx = 0; idx < 192; ++idx) {
stream >> one_kibibyte;
}
EXPECT(stream.eof());
}
TEST_CASE(duplex_wild_seeking)
{
DuplexMemoryStream stream;
int input0 = 42, input1 = 13, input2 = -12;
int output0, output1, output2;
stream << input2;
stream << input0 << input1;
stream.seek(0);
stream << input2 << input0;
stream.seek(4);
stream >> output0 >> output1 >> output2;
EXPECT(!stream.eof());
EXPECT_EQ(input0, output0);
EXPECT_EQ(input1, output1);
EXPECT_EQ(input2, output2);
stream.discard_or_error(4);
EXPECT(stream.eof());
}
TEST_MAIN(Stream)