mirror of
https://github.com/SerenityOS/serenity
synced 2024-07-21 18:15:58 +00:00
LibCore: Introduce SharedSingleProducerCircularQueue
This new class with an admittedly long OOP-y name provides a circular queue in shared memory. The queue is a lock-free synchronous queue implemented with atomics, and its implementation is significantly simplified by only accounting for one producer (and multiple consumers). It is intended to be used as a producer-consumer communication datastructure across processes. The original motivation behind this class is efficient short-period transfer of audio data in userspace. This class includes formal proofs of several correctness properties of the main queue operations `enqueue` and `dequeue`. These proofs are not 100% complete in their existing form as the invariants they depend on are "handwaved". This seems fine to me right now, as any proof is better than no proof :^). Anyways, the proofs should build confidence that the implemented algorithms, which are only roughly based on existing work, operate correctly in even the worst-case concurrency scenarios.
This commit is contained in:
parent
b0a2572577
commit
6b13436ef6
|
@ -386,6 +386,10 @@
|
|||
#cmakedefine01 SH_LANGUAGE_SERVER_DEBUG
|
||||
#endif
|
||||
|
||||
#ifndef SHARED_QUEUE_DEBUG
|
||||
#cmakedefine01 SHARED_QUEUE_DEBUG
|
||||
#endif
|
||||
|
||||
#ifndef SHELL_JOB_DEBUG
|
||||
#cmakedefine01 SHELL_JOB_DEBUG
|
||||
#endif
|
||||
|
|
|
@ -164,6 +164,7 @@ set(SERVICE_DEBUG ON)
|
|||
set(SH_DEBUG ON)
|
||||
set(SHELL_JOB_DEBUG ON)
|
||||
set(SH_LANGUAGE_SERVER_DEBUG ON)
|
||||
set(SHARED_QUEUE_DEBUG ON)
|
||||
set(SIGNAL_DEBUG ON)
|
||||
set(SLAVEPTY_DEBUG ON)
|
||||
set(SMP_DEBUG ON)
|
||||
|
|
|
@ -5,6 +5,7 @@ set(TEST_SOURCES
|
|||
TestLibCoreDeferredInvoke.cpp
|
||||
TestLibCoreStream.cpp
|
||||
TestLibCoreFilePermissionsMask.cpp
|
||||
TestLibCoreSharedSingleProducerCircularQueue.cpp
|
||||
)
|
||||
|
||||
foreach(source IN LISTS TEST_SOURCES)
|
||||
|
@ -13,5 +14,6 @@ endforeach()
|
|||
|
||||
# NOTE: Required because of the LocalServer tests
|
||||
target_link_libraries(TestLibCoreStream LibThreading)
|
||||
target_link_libraries(TestLibCoreSharedSingleProducerCircularQueue LibThreading)
|
||||
|
||||
install(FILES long_lines.txt 10kb.txt small.txt DESTINATION usr/Tests/LibCore)
|
||||
|
|
203
Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp
Normal file
203
Tests/LibCore/TestLibCoreSharedSingleProducerCircularQueue.cpp
Normal file
|
@ -0,0 +1,203 @@
|
|||
/*
|
||||
* Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
||||
#include "sched.h"
|
||||
#include <LibCore/SharedCircularQueue.h>
|
||||
#include <LibTest/TestCase.h>
|
||||
#include <LibThreading/Thread.h>
|
||||
|
||||
using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
|
||||
using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
|
||||
|
||||
Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
|
||||
|
||||
// These first two cases don't multithread at all.
|
||||
|
||||
TEST_CASE(simple_enqueue)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
for (size_t i = 0; i < queue.size() - 1; ++i)
|
||||
EXPECT(!queue.try_enqueue((int)i).is_error());
|
||||
|
||||
auto result = queue.try_enqueue(0);
|
||||
EXPECT(result.is_error());
|
||||
EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
|
||||
}
|
||||
|
||||
TEST_CASE(simple_dequeue)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
auto const test_count = 10;
|
||||
for (int i = 0; i < test_count; ++i)
|
||||
(void)queue.try_enqueue(i);
|
||||
for (int i = 0; i < test_count; ++i) {
|
||||
auto const element = queue.try_dequeue();
|
||||
EXPECT(!element.is_error());
|
||||
EXPECT_EQ(element.value(), i);
|
||||
}
|
||||
}
|
||||
|
||||
// There is one parallel consumer, but nobody is producing at the same time.
|
||||
TEST_CASE(simple_multithread)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
auto const test_count = 10;
|
||||
|
||||
for (int i = 0; i < test_count; ++i)
|
||||
(void)queue.try_enqueue(i);
|
||||
|
||||
auto second_thread = Threading::Thread::construct([&queue]() {
|
||||
auto copied_queue = queue;
|
||||
for (int i = 0; i < test_count; ++i) {
|
||||
QueueError result = TestQueue::QueueStatus::Invalid;
|
||||
do {
|
||||
result = copied_queue.try_dequeue();
|
||||
if (!result.is_error())
|
||||
EXPECT_EQ(result.value(), i);
|
||||
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
|
||||
|
||||
if (result.is_error())
|
||||
FAIL("Unexpected error while dequeueing.");
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
second_thread->start();
|
||||
(void)second_thread->join();
|
||||
|
||||
EXPECT_EQ(queue.weak_used(), (size_t)0);
|
||||
}
|
||||
|
||||
// There is one parallel consumer and one parallel producer.
|
||||
TEST_CASE(producer_consumer_multithread)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
// Ensure that we have the possibility of filling the queue up.
|
||||
auto const test_count = queue.size() * 4;
|
||||
|
||||
Atomic<bool> other_thread_running { false };
|
||||
|
||||
auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
|
||||
auto copied_queue = queue;
|
||||
other_thread_running.store(true);
|
||||
for (size_t i = 0; i < test_count; ++i) {
|
||||
QueueError result = TestQueue::QueueStatus::Invalid;
|
||||
do {
|
||||
result = copied_queue.try_dequeue();
|
||||
if (!result.is_error())
|
||||
EXPECT_EQ(result.value(), (int)i);
|
||||
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
|
||||
|
||||
if (result.is_error())
|
||||
FAIL("Unexpected error while dequeueing.");
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
second_thread->start();
|
||||
|
||||
while (!other_thread_running.load())
|
||||
;
|
||||
|
||||
for (size_t i = 0; i < test_count; ++i) {
|
||||
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
|
||||
do {
|
||||
result = queue.try_enqueue((int)i);
|
||||
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
|
||||
|
||||
if (result.is_error())
|
||||
FAIL("Unexpected error while enqueueing.");
|
||||
}
|
||||
|
||||
(void)second_thread->join();
|
||||
|
||||
EXPECT_EQ(queue.weak_used(), (size_t)0);
|
||||
}
|
||||
|
||||
// There are multiple parallel consumers, but nobody is producing at the same time.
|
||||
TEST_CASE(multi_consumer)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
// This needs to be divisible by 4!
|
||||
size_t const test_count = queue.size() - 4;
|
||||
Atomic<size_t> dequeue_count = 0;
|
||||
|
||||
auto threads = {
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
};
|
||||
|
||||
for (size_t i = 0; i < test_count; ++i)
|
||||
(void)queue.try_enqueue((int)i);
|
||||
|
||||
for (auto thread : threads)
|
||||
thread->start();
|
||||
for (auto thread : threads)
|
||||
(void)thread->join();
|
||||
|
||||
EXPECT_EQ(queue.weak_used(), (size_t)0);
|
||||
EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
|
||||
}
|
||||
|
||||
// There are multiple parallel consumers and one parallel producer.
|
||||
TEST_CASE(single_producer_multi_consumer)
|
||||
{
|
||||
auto queue = MUST(TestQueue::try_create());
|
||||
// Choose a higher number to provoke possible race conditions.
|
||||
size_t const test_count = queue.size() * 8;
|
||||
Atomic<size_t> dequeue_count = 0;
|
||||
|
||||
auto threads = {
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
|
||||
};
|
||||
for (auto thread : threads)
|
||||
thread->start();
|
||||
|
||||
for (size_t i = 0; i < test_count; ++i) {
|
||||
ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
|
||||
do {
|
||||
result = queue.try_enqueue((int)i);
|
||||
// After we put something in the first time, let's wait while nobody has dequeued yet.
|
||||
while (dequeue_count.load() == 0)
|
||||
;
|
||||
// Give others time to do something.
|
||||
sched_yield();
|
||||
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
|
||||
|
||||
if (result.is_error())
|
||||
FAIL("Unexpected error while enqueueing.");
|
||||
}
|
||||
|
||||
for (auto thread : threads)
|
||||
(void)thread->join();
|
||||
|
||||
EXPECT_EQ(queue.weak_used(), (size_t)0);
|
||||
EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
|
||||
}
|
||||
|
||||
Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
|
||||
{
|
||||
return [&queue, &dequeue_count, test_count]() {
|
||||
auto copied_queue = queue;
|
||||
for (size_t i = 0; i < test_count / 4; ++i) {
|
||||
QueueError result = TestQueue::QueueStatus::Invalid;
|
||||
do {
|
||||
result = copied_queue.try_dequeue();
|
||||
if (!result.is_error())
|
||||
dequeue_count.fetch_add(1);
|
||||
// Give others time to do something.
|
||||
sched_yield();
|
||||
} while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
|
||||
|
||||
if (result.is_error())
|
||||
FAIL("Unexpected error while dequeueing.");
|
||||
}
|
||||
return (intptr_t)0;
|
||||
};
|
||||
}
|
230
Userland/Libraries/LibCore/SharedCircularQueue.h
Normal file
230
Userland/Libraries/LibCore/SharedCircularQueue.h
Normal file
|
@ -0,0 +1,230 @@
|
|||
/*
|
||||
* Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <AK/Assertions.h>
|
||||
#include <AK/Atomic.h>
|
||||
#include <AK/BuiltinWrappers.h>
|
||||
#include <AK/Debug.h>
|
||||
#include <AK/Error.h>
|
||||
#include <AK/Format.h>
|
||||
#include <AK/Function.h>
|
||||
#include <AK/NonnullRefPtr.h>
|
||||
#include <AK/NumericLimits.h>
|
||||
#include <AK/Platform.h>
|
||||
#include <AK/RefCounted.h>
|
||||
#include <AK/RefPtr.h>
|
||||
#include <AK/String.h>
|
||||
#include <AK/Types.h>
|
||||
#include <AK/Variant.h>
|
||||
#include <AK/Weakable.h>
|
||||
#include <LibCore/AnonymousBuffer.h>
|
||||
#include <LibCore/System.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <sched.h>
|
||||
#include <sys/mman.h>
|
||||
|
||||
namespace Core {
|
||||
|
||||
// A circular lock-free queue (or a buffer) with a single producer,
|
||||
// residing in shared memory and designed to be accessible to multiple processes.
|
||||
// This implementation makes use of the fact that any producer-related code can be sure that
|
||||
// it's the only producer-related code that is running, which simplifies a bunch of the synchronization code.
|
||||
// The exclusivity and liveliness for critical sections in this class is proven to be correct
|
||||
// under the assumption of correct synchronization primitives, i.e. atomics.
|
||||
// In many circumstances, this is enough for cross-process queues.
|
||||
// This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory.
|
||||
// It is a synthetic pointer to the actual shared memory, which is abstracted away from the user.
|
||||
// FIXME: Make this independent of shared memory, so that we can move it to AK.
|
||||
// clang-format off
|
||||
template<typename T, size_t Size = 32>
|
||||
// Size must be a power of two, which speeds up the modulus operations for indexing.
|
||||
requires(popcount(Size) == 1)
|
||||
class SharedSingleProducerCircularQueue final {
|
||||
// clang-format on
|
||||
|
||||
public:
|
||||
using ValueType = T;
|
||||
|
||||
enum class QueueStatus : u8 {
|
||||
Invalid = 0,
|
||||
Full,
|
||||
Empty,
|
||||
};
|
||||
|
||||
SharedSingleProducerCircularQueue() = default;
|
||||
SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue<ValueType, Size>& queue) = default;
|
||||
|
||||
SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default;
|
||||
SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default;
|
||||
|
||||
// Allocates a new circular queue in shared memory.
|
||||
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create()
|
||||
{
|
||||
auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
|
||||
return try_create_internal(fd, true);
|
||||
}
|
||||
|
||||
// Uses an existing circular queue from given shared memory.
|
||||
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create(int fd)
|
||||
{
|
||||
return try_create_internal(fd, false);
|
||||
}
|
||||
|
||||
constexpr size_t size() const { return Size; }
|
||||
// These functions are provably inconsistent and should only be used as hints to the actual capacity and used count.
|
||||
ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); }
|
||||
ALWAYS_INLINE size_t weak_used() const
|
||||
{
|
||||
auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed);
|
||||
auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed);
|
||||
return head - tail;
|
||||
}
|
||||
|
||||
ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; }
|
||||
ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }
|
||||
|
||||
ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
|
||||
ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); }
|
||||
|
||||
ErrorOr<void, QueueStatus> try_enqueue(ValueType to_insert)
|
||||
{
|
||||
VERIFY(!m_queue.is_null());
|
||||
if (!can_enqueue())
|
||||
return QueueStatus::Full;
|
||||
auto our_tail = m_queue->m_queue->m_tail.load() % Size;
|
||||
m_queue->m_queue->m_data[our_tail] = to_insert;
|
||||
++m_queue->m_queue->m_tail;
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
ALWAYS_INLINE bool can_enqueue() const
|
||||
{
|
||||
return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size);
|
||||
}
|
||||
|
||||
// Repeatedly try to enqueue, using the wait_function to wait if it's not possible
|
||||
ErrorOr<void> try_blocking_enqueue(ValueType to_insert, Function<void()> wait_function)
|
||||
{
|
||||
ErrorOr<void, QueueStatus> result;
|
||||
while (true) {
|
||||
result = try_enqueue(to_insert);
|
||||
|
||||
if (result.is_error()) {
|
||||
if (result.error() == QueueStatus::Full)
|
||||
wait_function();
|
||||
else
|
||||
return Error::from_string_literal("Unexpected error while enqueuing"sv);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
ErrorOr<ValueType, QueueStatus> try_dequeue()
|
||||
{
|
||||
VERIFY(!m_queue.is_null());
|
||||
while (true) {
|
||||
// The >= is not strictly necessary, but it feels safer :^)
|
||||
if (head() >= m_queue->m_queue->m_tail.load())
|
||||
return QueueStatus::Empty;
|
||||
|
||||
// This CAS only succeeds if nobody is currently dequeuing.
|
||||
auto size_max = NumericLimits<size_t>::max();
|
||||
if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) {
|
||||
auto old_head = m_queue->m_queue->m_head.load();
|
||||
auto data = move(m_queue->m_queue->m_data[old_head % Size]);
|
||||
m_queue->m_queue->m_head.fetch_add(1);
|
||||
m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
|
||||
return { move(data) };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing.
|
||||
size_t head() const
|
||||
{
|
||||
return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load());
|
||||
}
|
||||
|
||||
private:
|
||||
struct SharedMemorySPCQ {
|
||||
SharedMemorySPCQ() = default;
|
||||
SharedMemorySPCQ(SharedMemorySPCQ const&) = delete;
|
||||
SharedMemorySPCQ(SharedMemorySPCQ&&) = delete;
|
||||
~SharedMemorySPCQ() = default;
|
||||
|
||||
// Invariant: tail >= head
|
||||
// Invariant: head and tail are monotonically increasing
|
||||
// Invariant: tail always points to the next free location where an enqueue can happen.
|
||||
// Invariant: head always points to the element to be dequeued next.
|
||||
// Invariant: tail is only modified by enqueue functions.
|
||||
// Invariant: head is only modified by dequeue functions.
|
||||
// An empty queue is signalled with: tail = head
|
||||
// A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array)
|
||||
// FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant.
|
||||
// The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough.
|
||||
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_tail { 0 };
|
||||
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head { 0 };
|
||||
CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head_protector { NumericLimits<size_t>::max() };
|
||||
|
||||
alignas(ValueType) Array<ValueType, Size> m_data;
|
||||
};
|
||||
|
||||
class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> {
|
||||
friend class SharedSingleProducerCircularQueue;
|
||||
|
||||
public:
|
||||
SharedMemorySPCQ* m_queue;
|
||||
void* m_raw;
|
||||
int m_fd;
|
||||
|
||||
~RefCountedSharedMemorySPCQ()
|
||||
{
|
||||
MUST(System::close(m_fd));
|
||||
MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
|
||||
dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
|
||||
}
|
||||
|
||||
private:
|
||||
RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd)
|
||||
: m_queue(queue)
|
||||
, m_raw(reinterpret_cast<void*>(queue))
|
||||
, m_fd(fd)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> try_create_internal(int fd, bool is_new)
|
||||
{
|
||||
auto name = String::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
|
||||
auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
|
||||
dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);
|
||||
|
||||
SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);
|
||||
|
||||
if (!shared_queue)
|
||||
return Error::from_string_literal("Unexpected error when creating shared queue from raw memory"sv);
|
||||
|
||||
return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) };
|
||||
}
|
||||
|
||||
SharedSingleProducerCircularQueue(String name, RefPtr<RefCountedSharedMemorySPCQ> queue)
|
||||
: m_queue(queue)
|
||||
, m_name(move(name))
|
||||
{
|
||||
}
|
||||
|
||||
RefPtr<RefCountedSharedMemorySPCQ> m_queue;
|
||||
|
||||
String m_name {};
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in a new issue