New shared_mutex

Experimental sync utils
New semaphore<>
New cond_variable
New owned_mutex
This commit is contained in:
Nekotekina 2017-01-24 23:19:52 +03:00 committed by Ivan
parent 98fc131d47
commit 1c14d872a8
20 changed files with 940 additions and 543 deletions

View file

@ -300,21 +300,24 @@ struct atomic_storage<T, 2> : atomic_storage<T, 0>
static inline bool bts(T& dest, uint bit)
{
bool result;
__asm__("lock btsw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ushort _bit = (ushort)bit;
__asm__("lock btsw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
static inline bool btr(T& dest, uint bit)
{
bool result;
__asm__("lock btrw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ushort _bit = (ushort)bit;
__asm__("lock btrw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
static inline bool btc(T& dest, uint bit)
{
bool result;
__asm__("lock btcw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ushort _bit = (ushort)bit;
__asm__("lock btcw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
#endif
@ -498,21 +501,24 @@ struct atomic_storage<T, 8> : atomic_storage<T, 0>
static inline bool bts(T& dest, uint bit)
{
bool result;
__asm__("lock btsq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ullong _bit = bit;
__asm__("lock btsq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
static inline bool btr(T& dest, uint bit)
{
bool result;
__asm__("lock btrq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ullong _bit = bit;
__asm__("lock btrq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
static inline bool btc(T& dest, uint bit)
{
bool result;
__asm__("lock btcq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc");
ullong _bit = bit;
__asm__("lock btcq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc");
return result;
}
#endif

View file

@ -1,6 +1,6 @@
#include "File.h"
#include "mutex.h"
#include "StrFmt.h"
#include "SharedMutex.h"
#include "BEType.h"
#include "Crypto/sha1.h"

View file

@ -1,90 +0,0 @@
#include "Utilities/Semaphore.h"
#include <mutex>
#include <condition_variable>
struct benaphore::internal
{
std::mutex mutex;
std::size_t acq_order{};
std::size_t rel_order{};
std::condition_variable cond;
};
void benaphore::wait_hard()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
// Notify non-zero waiter queue size
if (m_value.exchange(-1) == 1)
{
// Return immediately (acquired)
m_value = 0;
return;
}
// Remember the order
const std::size_t order = ++m_data->acq_order;
// Wait for the appropriate rel_order (TODO)
while (m_data->rel_order < order)
{
m_data->cond.wait(lock);
}
if (order == m_data->acq_order && m_data->acq_order == m_data->rel_order)
{
// Cleaup
m_data->acq_order = 0;
m_data->rel_order = 0;
m_value.compare_and_swap(-1, 0);
}
}
void benaphore::post_hard()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
if (m_value.compare_and_swap(0, 1) != -1)
{
// Do nothing (released)
return;
}
if (m_data->acq_order == m_data->rel_order)
{
m_value = 1;
return;
}
// Awake one thread
m_data->rel_order += 1;
// Unlock and notify
lock.unlock();
m_data->cond.notify_one();
}
void benaphore::initialize_once()
{
if (UNLIKELY(!m_data))
{
auto ptr = new benaphore::internal;
if (!m_data.compare_and_swap_test(nullptr, ptr))
{
delete ptr;
}
}
}
benaphore::~benaphore()
{
delete m_data;
}

View file

@ -1,47 +0,0 @@
#pragma once
#include "types.h"
#include "Atomic.h"
// Binary semaphore
class benaphore
{
struct internal;
// Reserved value (-1) enforces *_hard() calls
atomic_t<u32> m_value{};
atomic_t<internal*> m_data{};
void wait_hard();
void post_hard();
public:
constexpr benaphore() = default;
~benaphore();
// Initialize internal data
void initialize_once();
void wait()
{
if (UNLIKELY(!m_value.compare_and_swap_test(1, 0)))
{
wait_hard();
}
}
bool try_wait()
{
return m_value.compare_and_swap_test(1, 0);
}
void post()
{
if (UNLIKELY(!m_value.compare_and_swap_test(0, 1)))
{
post_hard();
}
}
};

View file

@ -1,187 +0,0 @@
#include "SharedMutex.h"
#include <mutex>
#include <condition_variable>
struct shared_mutex::internal
{
std::mutex mutex;
std::size_t rq_size{}; // Reader queue size (threads waiting on m_rcv)
std::size_t wq_size{}; // Writer queue size (threads waiting on m_wcv and m_ocv)
std::condition_variable rcv; // Reader queue
std::condition_variable wcv; // Writer queue
std::condition_variable ocv; // For current exclusive owner
};
void shared_mutex::lock_shared_hard()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
// Validate
if ((m_ctrl & SM_INVALID_BIT) != 0) throw std::runtime_error("shared_mutex::lock_shared(): Invalid bit");
if ((m_ctrl & SM_READER_MASK) == 0) throw std::runtime_error("shared_mutex::lock_shared(): No readers");
// Notify non-zero reader queue size
m_ctrl |= SM_WAITERS_BIT, m_data->rq_size++;
// Fix excess reader count
if ((--m_ctrl & SM_READER_MASK) == 0 && m_data->wq_size)
{
// Notify exclusive owner
m_data->ocv.notify_one();
}
// Obtain the reader lock
while (true)
{
const auto ctrl = m_ctrl.load();
// Check writers and reader limit
if (m_data->wq_size || (ctrl & ~SM_WAITERS_BIT) >= SM_READER_MAX)
{
m_data->rcv.wait(lock);
continue;
}
if (m_ctrl.compare_and_swap_test(ctrl, ctrl + 1))
{
break;
}
}
if (!--m_data->rq_size && !m_data->wq_size)
{
m_ctrl &= ~SM_WAITERS_BIT;
}
}
void shared_mutex::unlock_shared_notify()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
if ((m_ctrl & SM_READER_MASK) == 0 && m_data->wq_size)
{
// Notify exclusive owner
lock.unlock();
m_data->ocv.notify_one();
}
else if (m_data->rq_size)
{
// Notify other readers
lock.unlock();
m_data->rcv.notify_one();
}
}
void shared_mutex::lock_hard()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
// Validate
if ((m_ctrl & SM_INVALID_BIT) != 0) throw std::runtime_error("shared_mutex::lock(): Invalid bit");
// Notify non-zero writer queue size
m_ctrl |= SM_WAITERS_BIT, m_data->wq_size++;
// Obtain the writer lock
while (true)
{
const auto ctrl = m_ctrl.load();
if (ctrl & SM_WRITER_LOCK)
{
m_data->wcv.wait(lock);
continue;
}
if (m_ctrl.compare_and_swap_test(ctrl, ctrl | SM_WRITER_LOCK))
{
break;
}
}
// Wait for remaining readers
while ((m_ctrl & SM_READER_MASK) != 0)
{
m_data->ocv.wait(lock);
}
if (!--m_data->wq_size && !m_data->rq_size)
{
m_ctrl &= ~SM_WAITERS_BIT;
}
}
void shared_mutex::unlock_notify()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
if (m_data->wq_size)
{
// Notify next exclusive owner
lock.unlock();
m_data->wcv.notify_one();
}
else if (m_data->rq_size)
{
// Notify all readers
lock.unlock();
m_data->rcv.notify_all();
}
}
void shared_mutex::lock_upgrade_hard()
{
unlock_shared();
lock();
}
void shared_mutex::lock_degrade_hard()
{
initialize_once();
std::unique_lock<std::mutex> lock(m_data->mutex);
m_ctrl -= SM_WRITER_LOCK - 1;
if (m_data->rq_size)
{
// Notify all readers
lock.unlock();
m_data->rcv.notify_all();
}
else if (m_data->wq_size)
{
// Notify next exclusive owner
lock.unlock();
m_data->wcv.notify_one();
}
}
void shared_mutex::initialize_once()
{
if (UNLIKELY(!m_data))
{
auto ptr = new shared_mutex::internal;
if (!m_data.compare_and_swap_test(nullptr, ptr))
{
delete ptr;
}
}
}
shared_mutex::~shared_mutex()
{
delete m_data;
}

View file

@ -1,179 +0,0 @@
#pragma once
#include "types.h"
#include "Atomic.h"
//! An attempt to create effective implementation of "shared mutex", lock-free in optimistic case.
//! All locking and unlocking may be done by a single LOCK XADD or LOCK CMPXCHG instruction.
//! MSVC implementation of std::shared_timed_mutex seems suboptimal.
//! std::shared_mutex is not available until C++17.
class shared_mutex final
{
enum : u32
{
SM_WRITER_LOCK = 1u << 31, // Exclusive lock flag, must be MSB
SM_WAITERS_BIT = 1u << 30, // Flag set if m_wq_size or m_rq_size is non-zero
SM_INVALID_BIT = 1u << 29, // Unreachable reader count bit (may be set by incorrect unlock_shared() call)
SM_READER_MASK = SM_WAITERS_BIT - 1, // Valid reader count bit mask
SM_READER_MAX = 1u << 24, // Max reader count
};
atomic_t<u32> m_ctrl{}; // Control variable: reader count | SM_* flags
struct internal;
atomic_t<internal*> m_data{}; // Internal data
void lock_shared_hard();
void unlock_shared_notify();
void lock_hard();
void unlock_notify();
void lock_upgrade_hard();
void lock_degrade_hard();
public:
constexpr shared_mutex() = default;
// Initialize internal data
void initialize_once();
~shared_mutex();
bool try_lock_shared()
{
const u32 ctrl = m_ctrl.load();
return ctrl < SM_READER_MAX && m_ctrl.compare_and_swap_test(ctrl, ctrl + 1);
}
void lock_shared()
{
// Optimization: unconditional increment, compensated later
if (UNLIKELY(m_ctrl++ >= SM_READER_MAX))
{
lock_shared_hard();
}
}
void unlock_shared()
{
if (UNLIKELY(m_ctrl-- >= SM_READER_MAX))
{
unlock_shared_notify();
}
}
bool try_lock()
{
return !m_ctrl && m_ctrl.compare_and_swap_test(0, SM_WRITER_LOCK);
}
void lock()
{
if (UNLIKELY(!m_ctrl.compare_and_swap_test(0, SM_WRITER_LOCK)))
{
lock_hard();
}
}
void unlock()
{
m_ctrl &= ~SM_WRITER_LOCK;
if (UNLIKELY(m_ctrl))
{
unlock_notify();
}
}
bool try_lock_upgrade()
{
return m_ctrl == 1 && m_ctrl.compare_and_swap_test(1, SM_WRITER_LOCK);
}
bool try_lock_degrade()
{
return m_ctrl == SM_WRITER_LOCK && m_ctrl.compare_and_swap_test(SM_WRITER_LOCK, 1);
}
void lock_upgrade()
{
if (UNLIKELY(!m_ctrl.compare_and_swap_test(1, SM_WRITER_LOCK)))
{
lock_upgrade_hard();
}
}
void lock_degrade()
{
if (UNLIKELY(!m_ctrl.compare_and_swap_test(SM_WRITER_LOCK, 1)))
{
lock_degrade_hard();
}
}
};
//! Simplified shared (reader) lock implementation.
//! std::shared_lock may be used instead if necessary.
class reader_lock final
{
shared_mutex& m_mutex;
public:
reader_lock(const reader_lock&) = delete;
reader_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_shared();
}
~reader_lock()
{
m_mutex.unlock_shared();
}
};
//! Simplified exclusive (writer) lock implementation.
//! std::lock_guard may or std::unique_lock be used instead if necessary.
class writer_lock final
{
shared_mutex& m_mutex;
public:
writer_lock(const writer_lock&) = delete;
writer_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock();
}
~writer_lock()
{
m_mutex.unlock();
}
};
// Exclusive (writer) lock in the scope of shared (reader) lock.
class upgraded_lock final
{
shared_mutex& m_mutex;
public:
upgraded_lock(const writer_lock&) = delete;
upgraded_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_upgrade();
}
~upgraded_lock()
{
m_mutex.lock_degrade();
}
};

View file

@ -2043,7 +2043,7 @@ void thread_ctrl::push_atexit(task_stack task)
thread_ctrl::thread_ctrl(std::string&& name)
: m_name(std::move(name))
{
CHECK_STORAGE(std::thread, m_thread);
static_assert(sizeof(std::thread) <= sizeof(m_thread), "Small storage");
#pragma push_macro("new")
#undef new

107
Utilities/cond.cpp Normal file
View file

@ -0,0 +1,107 @@
#include "cond.h"
#include "sync.h"
bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
{
verify(HERE), _old != -1; // Very unlikely: it requires 2^32 distinct threads to wait simultaneously
#ifdef _WIN32
LARGE_INTEGER timeout;
timeout.QuadPart = _timeout * -10;
if (HRESULT rc = NtWaitForKeyedEvent(nullptr, &m_value, false, _timeout == -1 ? nullptr : &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
// Retire
if (!m_value.fetch_op([](u32& value) { if (value) value--; }))
{
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
return true;
}
return false;
}
return true;
#elif __linux__
timespec timeout;
timeout.tv_sec = _timeout / 1000000;
timeout.tv_nsec = (_timeout % 1000000) * 1000;
for (u32 value = _old + 1;; value = m_value)
{
const int err = futex((int*)&m_value.raw(), FUTEX_WAIT_PRIVATE, value, _timeout == -1 ? nullptr : &timeout, nullptr, 0) == 0
? 0
: errno;
// Normal or timeout wakeup
if (!err || (_timeout != -1 && err == ETIMEDOUT))
{
// Cleanup (remove waiter)
verify(HERE), m_value--;
return !err;
}
// Not a wakeup
verify(HERE), err == EAGAIN;
}
#else
// TODO
std::this_thread::sleep_for(std::chrono::microseconds(50));
verify(HERE), m_value--;
return true;
#endif
}
void cond_variable::imp_wake(u32 _count) noexcept
{
#ifdef _WIN32
// Try to subtract required amount of waiters
const u32 count = m_value.atomic_op([=](u32& value)
{
if (value > _count)
{
value -= _count;
return _count;
}
return std::exchange(value, 0);
});
for (u32 i = count; i > 0; i--)
{
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
}
#elif __linux__
for (u32 i = _count; i > 0; sched_yield())
{
const u32 value = m_value;
// Constrain remaining amount with imaginary waiter count
if (i > value)
{
i = value;
}
if (!value || i == 0)
{
// Nothing to do
return;
}
if (const int res = futex((int*)&m_value.raw(), FUTEX_WAKE_PRIVATE, i > INT_MAX ? INT_MAX : i, nullptr, nullptr, 0))
{
verify(HERE), res >= 0 && res <= i;
i -= res;
}
if (!m_value || i == 0)
{
// Escape
return;
}
}
#endif
}

50
Utilities/cond.h Normal file
View file

@ -0,0 +1,50 @@
#pragma once
#include "types.h"
#include "Atomic.h"
// Lightweight condition variable
class cond_variable
{
// Internal waiter counter
atomic_t<u32> m_value{0};
protected:
// Internal waiting function
bool imp_wait(u32 _old, u64 _timeout) noexcept;
// Try to notify up to _count threads
void imp_wake(u32 _count) noexcept;
public:
constexpr cond_variable() = default;
// Intrusive wait algorithm for lockable objects
template <typename T, void (T::*Unlock)() = &T::unlock, void (T::*Lock)() = &T::lock>
explicit_bool_t wait(T& object, u64 usec_timeout = -1)
{
const u32 _old = m_value.fetch_add(1); // Increment waiter counter
(object.*Unlock)();
const bool res = imp_wait(_old, usec_timeout);
(object.*Lock)();
return res;
}
// Wake one thread
void notify_one() noexcept
{
if (m_value)
{
imp_wake(1);
}
}
// Wake all threads
void notify_all() noexcept
{
if (m_value)
{
imp_wake(-1);
}
}
};

245
Utilities/mutex.cpp Normal file
View file

@ -0,0 +1,245 @@
#include "mutex.h"
#include "sync.h"
#ifdef _WIN32
thread_local const u32 owned_mutex::g_tid = GetCurrentThreadId();
#elif __linux__
#include <sys/types.h>
thread_local const u32 owned_mutex::g_tid = syscall(SYS_gettid) + 1;
static_assert(sizeof(pid_t) == sizeof(u32), "Unexpected sizeof(pid_t)");
#else
#include <vector>
thread_local const u32 owned_mutex::g_tid = []() -> u32
{
static std::mutex g_tid_mutex;
static std::vector<bool> g_tid_map(1);
thread_local const struct tid_alloc
{
u32 id = 0;
tid_alloc()
{
std::lock_guard<std::mutex> lock(g_tid_mutex);
// Allocate
while (++id < g_tid_map.size())
{
if (!g_tid_map[id])
{
g_tid_map[id] = true;
return;
}
}
g_tid_map.push_back(true);
}
~tid_alloc()
{
std::lock_guard<std::mutex> lock(g_tid_mutex);
// Erase
g_tid_map[id] = false;
}
} g_tid;
return g_tid.id;
}();
#endif
void shared_mutex::imp_lock_shared(s64 _old)
{
verify("shared_mutex overflow" HERE), _old <= c_max;
// 1) Wait as a writer, notify the next writer
// 2) Wait as a reader, until the value > 0
lock();
_old = m_value.fetch_add(c_one - c_min);
if (_old)
{
imp_unlock(_old);
}
#ifdef _WIN32
if (_old + c_one - c_min < 0)
{
NtWaitForKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
#else
for (s64 value = m_value; value < 0; value = m_value)
{
if (futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_PRIVATE, value >> 32, nullptr, nullptr, 0) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
#endif
}
void shared_mutex::imp_unlock_shared(s64 _old)
{
verify("shared_mutex overflow" HERE), _old + c_min <= c_max;
// Check reader count, notify the writer if necessary (set c_sig)
if ((_old + c_min) % c_one == 0) // TODO
{
verify(HERE), !atomic_storage<s64>::bts(m_value.raw(), 0);
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0;
#endif
}
}
void shared_mutex::imp_lock(s64 _old)
{
verify("shared_mutex overflow" HERE), _old <= c_max;
#ifdef _WIN32
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
verify(HERE), atomic_storage<s64>::btr(m_value.raw(), 0);
#else
for (s64 value = m_value; (m_value & c_sig) == 0 || !atomic_storage<s64>::btr(m_value.raw(), 0); value = m_value)
{
if (futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
#endif
}
void shared_mutex::imp_unlock(s64 _old)
{
verify("shared_mutex overflow" HERE), _old + c_one <= c_max;
// 1) Notify the next writer if necessary (set c_sig)
// 2) Notify all readers otherwise if necessary
if (_old + c_one <= 0)
{
verify(HERE), !atomic_storage<s64>::bts(m_value.raw(), 0);
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0;
#endif
}
else if (s64 count = -_old / c_min)
{
#ifdef _WIN32
while (count--)
{
NtReleaseKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
#else
verify(HERE), futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0) >= 0;
#endif
}
}
void shared_mutex::imp_lock_upgrade()
{
unlock_shared();
lock();
}
void shared_mutex::imp_lock_degrade()
{
unlock();
lock_shared();
}
bool shared_mutex::try_lock_shared()
{
// Conditional decrement
return m_value.fetch_op([](s64& value) { if (value >= c_min) value -= c_min; }) >= c_min;
}
bool shared_mutex::try_lock()
{
// Conditional decrement (TODO: obtain c_sig)
return m_value.compare_and_swap_test(c_one, 0);
}
bool shared_mutex::try_lock_upgrade()
{
// TODO
return m_value.compare_and_swap_test(c_one - c_min, 0);
}
bool shared_mutex::try_lock_degrade()
{
// TODO
return m_value.compare_and_swap_test(0, c_one - c_min);
}
bool owned_mutex::lock() noexcept
{
if (m_value && m_owner == g_tid)
{
return false;
}
#ifdef _WIN32
if (m_value++)
{
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
}
m_owner.store(g_tid);
#else
u32 _last = ++m_value;
if (_last == 1 && m_owner.compare_and_swap_test(0, g_tid))
{
return true;
}
while (!m_owner.compare_and_swap_test(0, g_tid))
{
if (futex((int*)&m_value.raw(), FUTEX_WAIT_PRIVATE, _last, nullptr, nullptr, 0))
{
_last = m_value.load();
}
}
#endif
return true;
}
bool owned_mutex::try_lock() noexcept
{
if (m_value || !m_value.compare_and_swap_test(0, 1))
{
return false;
}
m_owner.store(g_tid);
return true;
}
bool owned_mutex::unlock() noexcept
{
if (UNLIKELY(m_owner != g_tid))
{
return false;
}
m_owner.store(0);
if (--m_value)
{
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
futex((int*)&m_value.raw(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
#endif
}
return true;
}

218
Utilities/mutex.h Normal file
View file

@ -0,0 +1,218 @@
#pragma once
#include "types.h"
#include "Atomic.h"
// Shared mutex.
class shared_mutex final
{
enum : s64
{
c_one = 1ull << 31, // Fixed-point 1.0 value (one writer)
c_min = 0x00000002, // Fixed-point 1.0/max_readers value
c_sig = 0x00000001,
c_max = c_one
};
atomic_t<s64> m_value{c_one}; // Semaphore-alike counter
void imp_lock_shared(s64 _old);
void imp_unlock_shared(s64 _old);
void imp_lock(s64 _old);
void imp_unlock(s64 _old);
void imp_lock_upgrade();
void imp_lock_degrade();
public:
constexpr shared_mutex() = default;
bool try_lock_shared();
void lock_shared()
{
const s64 value = m_value.load();
// Fast path: decrement if positive
if (UNLIKELY(value < c_min || value > c_one || !m_value.compare_and_swap_test(value, value - c_min)))
{
imp_lock_shared(value);
}
}
void unlock_shared()
{
// Unconditional increment
const s64 value = m_value.fetch_add(c_min);
if (value < 0 || value > c_one - c_min)
{
imp_unlock_shared(value);
}
}
bool try_lock();
void lock()
{
// Unconditional decrement
const s64 value = m_value.fetch_sub(c_one);
if (value != c_one)
{
imp_lock(value);
}
}
void unlock()
{
// Unconditional increment
const s64 value = m_value.fetch_add(c_one);
if (value != 0)
{
imp_unlock(value);
}
}
bool try_lock_upgrade();
void lock_upgrade()
{
// TODO
if (!m_value.compare_and_swap_test(c_one - c_min, 0))
{
imp_lock_upgrade();
}
}
bool try_lock_degrade();
void lock_degrade()
{
// TODO
if (!m_value.compare_and_swap_test(0, c_one - c_min))
{
imp_lock_degrade();
}
}
};
// Simplified shared (reader) lock implementation, std::shared_lock compatible.
class reader_lock final
{
shared_mutex& m_mutex;
public:
reader_lock(const reader_lock&) = delete;
explicit reader_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_shared();
}
~reader_lock()
{
m_mutex.unlock_shared();
}
};
// Simplified exclusive (writer) lock implementation, std::lock_guard compatible.
class writer_lock final
{
shared_mutex& m_mutex;
public:
writer_lock(const writer_lock&) = delete;
explicit writer_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock();
}
~writer_lock()
{
m_mutex.unlock();
}
};
// Exclusive (writer) lock in the scope of shared (reader) lock (experimental).
class upgraded_lock final
{
shared_mutex& m_mutex;
public:
upgraded_lock(const writer_lock&) = delete;
explicit upgraded_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_upgrade();
}
~upgraded_lock()
{
m_mutex.lock_degrade();
}
};
// Normal mutex with owner registration.
class owned_mutex
{
atomic_t<u32> m_value{0};
atomic_t<u32> m_owner{0};
protected:
// Thread id
static thread_local const u32 g_tid;
public:
constexpr owned_mutex() = default;
// Returns false if current thread already owns the mutex.
bool lock() noexcept;
// Returns false if locked by any thread.
bool try_lock() noexcept;
// Returns false if current thread doesn't own the mutex.
bool unlock() noexcept;
// Check state.
bool is_locked() const { return m_value != 0; }
// Check owner.
bool is_owned() const { return m_owner == g_tid; }
};
// Recursive lock for owned_mutex (experimental).
class recursive_lock final
{
owned_mutex& m_mutex;
const bool m_first;
public:
recursive_lock(const recursive_lock&) = delete;
explicit recursive_lock(owned_mutex& mutex)
: m_mutex(mutex)
, m_first(mutex.lock())
{
}
// Check whether the lock "owns" the mutex
explicit operator bool() const
{
return m_first;
}
~recursive_lock()
{
if (m_first && !m_mutex.unlock())
{
}
}
};

162
Utilities/sema.cpp Normal file
View file

@ -0,0 +1,162 @@
#include "sema.h"
#include "sync.h"
void semaphore_base::imp_wait(bool lsb)
{
// 1. Obtain LSB, reset it
// 2. Notify other posters if necessary
#ifdef _WIN32
if (!lsb)
{
while ((m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0))
{
// Wait infinitely until signaled
verify(HERE), NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr) == ERROR_SUCCESS;
}
}
// Notify instantly
LARGE_INTEGER timeout;
timeout.QuadPart = 0;
if (HRESULT rc = NtReleaseKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
}
#elif __linux__
if (!lsb)
{
for (s32 value = m_value; (m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0); value = m_value)
{
if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, -2) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
}
verify(HERE), futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, 1) >= 0;
#else
if (lsb)
{
return;
}
while ((m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0))
{
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
#endif
}
void semaphore_base::imp_post(s32 _old)
{
verify("semaphore_base: overflow" HERE), _old < 0;
// 1. Set LSB, waiting until it can be set if necessary
// 2. Notify waiter
#ifdef _WIN32
while ((_old & 1) == 0 && atomic_storage<s32>::bts(m_value.raw(), 0))
{
static_assert(ERROR_SUCCESS == 0, "Unexpected ERROR_SUCCESS value");
LARGE_INTEGER timeout;
timeout.QuadPart = -500; // ~50us
if (HRESULT rc = NtWaitForKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
}
}
LARGE_INTEGER timeout;
timeout.QuadPart = 0; // Instant for the first attempt
while (HRESULT rc = NtReleaseKeyedEvent(nullptr, &m_value, false, &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
if (!timeout.QuadPart)
{
timeout.QuadPart = -500; // ~50us
NtDelayExecution(false, &timeout);
}
if ((m_value & 1) == 0)
{
break;
}
}
#elif __linux__
for (s32 value = m_value; (_old & 1) == 0 && atomic_storage<s32>::bts(m_value.raw(), 0); value = m_value)
{
if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, 1) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
if (1 + 0 == verify(HERE, 1 + futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, -2)))
{
usleep(50);
}
#else
if (_old & 1)
{
return;
}
while (m_value & 1 || atomic_storage<s32>::bts(m_value.raw(), 0))
{
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
#endif
}
bool semaphore_base::try_wait()
{
// Conditional decrement
const s32 value = m_value.fetch_op([](s32& value)
{
if (value > 0 || value & 1)
{
if (value <= 1)
{
value &= ~1;
}
value -= 1 << 1;
}
});
if (value & 1 && value <= 1)
{
imp_wait(true);
return true;
}
return value > 0 || value & 1;
}
bool semaphore_base::try_post(s32 _max)
{
// Conditional increment
const s32 value = m_value.fetch_op([&](s32& value)
{
if (value < _max)
{
if (value < 0)
{
value |= 1;
}
value += 1 << 1;
}
});
if (value < 0)
{
imp_post(value ^ 1);
}
return value < _max;
}

110
Utilities/sema.h Normal file
View file

@ -0,0 +1,110 @@
#pragma once
#include "types.h"
#include "Atomic.h"
// Lightweight semaphore helper class
class semaphore_base
{
// Semaphore value (shifted; negative values imply 0 with waiters, LSB is used to ping-pong signals between threads)
atomic_t<s32> m_value;
void imp_wait(bool lsb);
void imp_post(s32 _old);
protected:
explicit constexpr semaphore_base(s32 value)
: m_value{value}
{
}
void wait()
{
// Unconditional decrement
if (UNLIKELY(m_value.sub_fetch(1 << 1) < 0))
{
imp_wait(false);
}
}
bool try_wait();
void post(s32 _max)
{
// Unconditional increment
const s32 value = m_value.fetch_add(1 << 1);
if (UNLIKELY(value < 0 || value >= _max))
{
imp_post(value & ~1);
}
}
bool try_post(s32 _max);
public:
// Get current semaphore value
s32 get() const
{
// Load value
const s32 value = m_value;
// Return only positive value
return value < 0 ? 0 : value >> 1;
}
};
// Lightweight semaphore template (default arguments define binary semaphore and Def == Max)
template <s32 Max = 1, s32 Def = Max>
class semaphore final : public semaphore_base
{
static_assert(Max >= 0 && Max < (1 << 30), "semaphore<>: Max is out of bounds");
static_assert(Def >= 0 && Def < (1 << 30), "semaphore<>: Def is out of bounds");
static_assert(Def <= Max, "semaphore<>: Def is too big");
using base = semaphore_base;
public:
// Default constructor (recommended)
constexpr semaphore()
: base{Def << 1}
{
}
// Explicit value constructor (not recommended)
explicit constexpr semaphore(s32 value)
: base{value << 1}
{
}
// Obtain a semaphore
void wait()
{
return base::wait();
}
// Try to obtain a semaphore
explicit_bool_t try_wait()
{
return base::try_wait();
}
// Return a semaphore
void post()
{
return base::post(Max << 1);
}
// Try to return a semaphore
explicit_bool_t try_post()
{
return base::try_post(Max << 1);
}
// Get max semaphore value
static constexpr s32 size()
{
return Max;
}
};

View file

@ -30,16 +30,12 @@
#define SAFE_BUFFERS
#define NEVER_INLINE __attribute__((noinline))
#define FORCE_INLINE __attribute__((always_inline)) inline
// Some platforms don't support thread_local well yet.
#define thread_local __thread
#endif
#define CHECK_SIZE(type, size) static_assert(sizeof(type) == size, "Invalid " #type " type size")
#define CHECK_ALIGN(type, align) static_assert(alignof(type) == align, "Invalid " #type " type alignment")
#define CHECK_MAX_SIZE(type, size) static_assert(sizeof(type) <= size, #type " type size is too big")
#define CHECK_SIZE_ALIGN(type, size, align) CHECK_SIZE(type, size); CHECK_ALIGN(type, align)
#define CHECK_STORAGE(type, storage) static_assert(sizeof(type) <= sizeof(storage) && alignof(type) <= alignof(decltype(storage)), #type " is too small")
// Return 32 bit sizeof() to avoid widening/narrowing conversions with size_t
#define SIZE_32(...) static_cast<u32>(sizeof(__VA_ARGS__))
@ -533,7 +529,7 @@ struct verify_impl
// Verification (can be safely disabled)
if (!verify_func()(std::forward<T>(value)))
{
fmt::raw_verify_error(cause, fmt::get_type_info<uint>(), N);
fmt::raw_verify_error(cause, nullptr, N);
}
return verify_impl<N + 1>{cause};

View file

@ -1,6 +1,6 @@
#pragma once
#include "Utilities/SharedMutex.h"
#include "Utilities/mutex.h"
#include <set>

View file

@ -3,7 +3,7 @@
#include <memory>
#include <unordered_map>
#include "Utilities/SharedMutex.h"
#include "Utilities/mutex.h"
// IPC manager for objects of type T and IPC keys of type K.
// External declaration of g_ipc is required.

View file

@ -1,7 +1,7 @@
#pragma once
#include "Utilities/types.h"
#include "Utilities/SharedMutex.h"
#include "Utilities/mutex.h"
#include <memory>
#include <vector>

View file

@ -4,7 +4,7 @@
#include "wait_engine.h"
#include "Utilities/Thread.h"
#include "Utilities/SharedMutex.h"
#include "Utilities/mutex.h"
#include <unordered_set>

View file

@ -73,6 +73,9 @@
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="..\Utilities\AutoPause.cpp" />
<ClCompile Include="..\Utilities\cond.cpp">
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="..\Utilities\dynamic_library.cpp" />
<ClCompile Include="..\Utilities\JIT.cpp">
<PrecompiledHeader>NotUsing</PrecompiledHeader>
@ -84,11 +87,11 @@
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="..\Utilities\Config.cpp" />
<ClCompile Include="..\Utilities\rXml.cpp" />
<ClCompile Include="..\Utilities\Semaphore.cpp">
<ClCompile Include="..\Utilities\mutex.cpp">
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="..\Utilities\SharedMutex.cpp">
<ClCompile Include="..\Utilities\rXml.cpp" />
<ClCompile Include="..\Utilities\sema.cpp">
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="..\Utilities\StrFmt.cpp">
@ -259,11 +262,7 @@
<ClCompile Include="Emu\RSX\Common\TextureUtils.cpp" />
<ClCompile Include="Emu\RSX\Common\VertexProgramDecompiler.cpp" />
<ClCompile Include="Emu\RSX\gcm_printing.cpp">
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug - LLVM|x64'">NotUsing</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release - LLVM|x64'">NotUsing</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug - MemLeak|x64'">NotUsing</PrecompiledHeader>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
</ClCompile>
<ClCompile Include="Emu\RSX\gcm_enums.cpp">
<PrecompiledHeader>NotUsing</PrecompiledHeader>
@ -394,20 +393,21 @@
<ClInclude Include="..\Utilities\BitField.h" />
<ClInclude Include="..\Utilities\bit_set.h" />
<ClInclude Include="..\Utilities\cfmt.h" />
<ClInclude Include="..\Utilities\cond.h" />
<ClInclude Include="..\Utilities\dynamic_library.h" />
<ClInclude Include="..\Utilities\event.h" />
<ClInclude Include="..\Utilities\geometry.h" />
<ClInclude Include="..\Utilities\GSL.h" />
<ClInclude Include="..\Utilities\JIT.h" />
<ClInclude Include="..\Utilities\lockless.h" />
<ClInclude Include="..\Utilities\mutex.h" />
<ClInclude Include="..\Utilities\sema.h" />
<ClInclude Include="..\Utilities\SleepQueue.h" />
<ClInclude Include="..\Utilities\sync.h" />
<ClInclude Include="..\Utilities\Log.h" />
<ClInclude Include="..\Utilities\File.h" />
<ClInclude Include="..\Utilities\Config.h" />
<ClInclude Include="..\Utilities\rXml.h" />
<ClInclude Include="..\Utilities\Semaphore.h" />
<ClInclude Include="..\Utilities\SharedMutex.h" />
<ClInclude Include="..\Utilities\StrFmt.h" />
<ClInclude Include="..\Utilities\StrUtil.h" />
<ClInclude Include="..\Utilities\Thread.h" />
@ -689,4 +689,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View file

@ -182,9 +182,6 @@
<ClCompile Include="Emu\RSX\Common\VertexProgramDecompiler.cpp">
<Filter>Emu\GPU\RSX\Common</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\Semaphore.cpp">
<Filter>Utilities</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\VirtualMemory.cpp">
<Filter>Utilities</Filter>
</ClCompile>
@ -650,9 +647,6 @@
<ClCompile Include="Emu\VFS.cpp">
<Filter>Emu</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\SharedMutex.cpp">
<Filter>Utilities</Filter>
</ClCompile>
<ClCompile Include="Emu\Memory\wait_engine.cpp">
<Filter>Emu\Memory</Filter>
</ClCompile>
@ -887,6 +881,15 @@
<ClCompile Include="Emu\RSX\gcm_printing.cpp">
<Filter>Emu\GPU\RSX</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\mutex.cpp">
<Filter>Utilities</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\cond.cpp">
<Filter>Utilities</Filter>
</ClCompile>
<ClCompile Include="..\Utilities\sema.cpp">
<Filter>Utilities</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="Crypto\aes.h">
@ -1093,9 +1096,6 @@
<ClInclude Include="Emu\RSX\Common\VertexProgramDecompiler.h">
<Filter>Emu\GPU\RSX\Common</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\Semaphore.h">
<Filter>Utilities</Filter>
</ClInclude>
<ClInclude Include="Emu\Cell\Common.h">
<Filter>Emu\Cell</Filter>
</ClInclude>
@ -1108,9 +1108,6 @@
<ClInclude Include="Emu\Cell\SPUAnalyser.h">
<Filter>Emu\Cell</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\SharedMutex.h">
<Filter>Utilities</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\Atomic.h">
<Filter>Utilities</Filter>
</ClInclude>
@ -1708,5 +1705,14 @@
<ClInclude Include="Emu\RSX\Common\TextGlyphs.h">
<Filter>Emu\GPU\RSX\Common</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\mutex.h">
<Filter>Utilities</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\cond.h">
<Filter>Utilities</Filter>
</ClInclude>
<ClInclude Include="..\Utilities\sema.h">
<Filter>Utilities</Filter>
</ClInclude>
</ItemGroup>
</Project>