Busy waiting added

This commit is contained in:
Nekotekina 2017-02-15 18:07:42 +03:00
parent b637bd3866
commit b1aa87b515
7 changed files with 196 additions and 373 deletions

View file

@ -153,7 +153,7 @@ struct fmt_class_string
out += prefix;
for (u64 i = 0; i < 64; i++)
for (u64 i = 0; i < 63; i++)
{
const u64 mask = 1ull << i;
@ -161,13 +161,18 @@ struct fmt_class_string
{
fmt(out, i);
if (arg > mask)
if (arg >> (i + 1))
{
out += delim;
}
}
}
if (arg & (1ull << 63))
{
fmt(out, 63);
}
out += suffix;
}

View file

@ -24,7 +24,7 @@ bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
}
return true;
#elif __linux__
#else
timespec timeout;
timeout.tv_sec = _timeout / 1000000;
timeout.tv_nsec = (_timeout % 1000000) * 1000;
@ -46,11 +46,6 @@ bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
// Not a wakeup
verify(HERE), err == EAGAIN;
}
#else
// TODO
std::this_thread::sleep_for(std::chrono::microseconds(50));
verify(HERE), m_value--;
return true;
#endif
}
@ -73,7 +68,7 @@ void cond_variable::imp_wake(u32 _count) noexcept
{
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
}
#elif __linux__
#else
for (u32 i = _count; i > 0; sched_yield())
{
const u32 value = m_value;
@ -104,4 +99,3 @@ void cond_variable::imp_wake(u32 _count) noexcept
}
#endif
}

View file

@ -1,81 +1,47 @@
#include "mutex.h"
#include "sync.h"
#ifdef _WIN32
thread_local const u32 owned_mutex::g_tid = GetCurrentThreadId();
#elif __linux__
#include <sys/types.h>
thread_local const u32 owned_mutex::g_tid = syscall(SYS_gettid) + 1;
static_assert(sizeof(pid_t) == sizeof(u32), "Unexpected sizeof(pid_t)");
#else
#include <vector>
thread_local const u32 owned_mutex::g_tid = []() -> u32
{
static std::mutex g_tid_mutex;
static std::vector<bool> g_tid_map(1);
thread_local const struct tid_alloc
{
u32 id = 0;
tid_alloc()
{
std::lock_guard<std::mutex> lock(g_tid_mutex);
// Allocate
while (++id < g_tid_map.size())
{
if (!g_tid_map[id])
{
g_tid_map[id] = true;
return;
}
}
g_tid_map.push_back(true);
}
~tid_alloc()
{
std::lock_guard<std::mutex> lock(g_tid_mutex);
// Erase
g_tid_map[id] = false;
}
} g_tid;
return g_tid.id;
}();
#endif
void shared_mutex::imp_lock_shared(s64 _old)
{
verify("shared_mutex overflow" HERE), _old <= c_max;
// 1) Wait as a writer, notify the next writer
// 2) Wait as a reader, until the value > 0
lock();
_old = m_value.fetch_add(c_one - c_min);
if (_old)
for (int i = 0; i < 10; i++)
{
imp_unlock(_old);
busy_wait();
const s64 value = m_value.load();
if (value >= c_min && m_value.compare_and_swap_test(value, value - c_min))
{
return;
}
}
// Acquire writer lock
imp_wait(m_value.load());
// Convert value
s64 value = m_value.fetch_add(c_one - c_min);
if (value != 0)
{
imp_unlock(value);
}
#ifdef _WIN32
if (_old + c_one - c_min < 0)
// Wait as a reader if necessary
if (value + c_one - c_min < 0)
{
NtWaitForKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
#else
for (s64 value = m_value; value < 0; value = m_value)
// Use resulting value
value += c_one - c_min;
while (value < 0)
{
if (futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_PRIVATE, value >> 32, nullptr, nullptr, 0) == -1)
{
verify(HERE), errno == EAGAIN;
}
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_PRIVATE, int(value >> 32), nullptr, nullptr, 0);
value = m_value.load();
}
#endif
}
@ -84,62 +50,94 @@ void shared_mutex::imp_unlock_shared(s64 _old)
{
verify("shared_mutex overflow" HERE), _old + c_min <= c_max;
// Check reader count, notify the writer if necessary (set c_sig)
if ((_old + c_min) % c_one == 0) // TODO
// Check reader count, notify the writer if necessary
if ((_old + c_min) % c_one == 0)
{
verify(HERE), !atomic_storage<s64>::bts(m_value.raw(), 0);
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0;
futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
#endif
}
}
void shared_mutex::imp_wait(s64 _old)
{
#ifdef _WIN32
if (m_value.sub_fetch(c_one))
{
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
}
#else
_old = m_value.fetch_sub(c_one);
// Return immediately if locked
while (_old != c_one)
{
// Load new value
const s64 value = m_value.load();
// Detect addition (unlock op)
if (value / c_one > _old / c_one)
{
return;
}
futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0);
// Update old value
_old = value;
}
#endif
}
void shared_mutex::imp_lock(s64 _old)
{
verify("shared_mutex overflow" HERE), _old <= c_max;
#ifdef _WIN32
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
verify(HERE), atomic_storage<s64>::btr(m_value.raw(), 0);
#else
for (s64 value = m_value; (m_value & c_sig) == 0 || !atomic_storage<s64>::btr(m_value.raw(), 0); value = m_value)
for (int i = 0; i < 10; i++)
{
if (futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0) == -1)
busy_wait();
const s64 value = m_value.load();
if (value == c_one && m_value.compare_and_swap_test(c_one, 0))
{
verify(HERE), errno == EAGAIN;
return;
}
}
#endif
imp_wait(m_value.load());
}
void shared_mutex::imp_unlock(s64 _old)
{
verify("shared_mutex overflow" HERE), _old + c_one <= c_max;
// 1) Notify the next writer if necessary (set c_sig)
// 1) Notify the next writer if necessary
// 2) Notify all readers otherwise if necessary
#ifdef _WIN32
if (_old + c_one <= 0)
{
verify(HERE), !atomic_storage<s64>::bts(m_value.raw(), 0);
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0;
#endif
}
else if (s64 count = -_old / c_min)
{
#ifdef _WIN32
while (count--)
{
NtReleaseKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
#else
verify(HERE), futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0) >= 0;
#endif
}
#else
if (_old + c_one <= 0)
{
futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
}
else if (s64 count = -_old / c_min)
{
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
#endif
}
void shared_mutex::imp_lock_upgrade()
@ -177,69 +175,3 @@ bool shared_mutex::try_lock_degrade()
// TODO
return m_value.compare_and_swap_test(0, c_one - c_min);
}
bool owned_mutex::lock() noexcept
{
if (m_value && m_owner == g_tid)
{
return false;
}
#ifdef _WIN32
if (m_value++)
{
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
}
m_owner.store(g_tid);
#else
u32 _last = ++m_value;
if (_last == 1 && m_owner.compare_and_swap_test(0, g_tid))
{
return true;
}
while (!m_owner.compare_and_swap_test(0, g_tid))
{
if (futex((int*)&m_value.raw(), FUTEX_WAIT_PRIVATE, _last, nullptr, nullptr, 0))
{
_last = m_value.load();
}
}
#endif
return true;
}
bool owned_mutex::try_lock() noexcept
{
if (m_value || !m_value.compare_and_swap_test(0, 1))
{
return false;
}
m_owner.store(g_tid);
return true;
}
bool owned_mutex::unlock() noexcept
{
if (UNLIKELY(m_owner != g_tid))
{
return false;
}
m_owner.store(0);
if (--m_value)
{
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
futex((int*)&m_value.raw(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
#endif
}
return true;
}

View file

@ -9,8 +9,7 @@ class shared_mutex final
enum : s64
{
c_one = 1ull << 31, // Fixed-point 1.0 value (one writer)
c_min = 0x00000002, // Fixed-point 1.0/max_readers value
c_sig = 0x00000001,
c_min = 0x00000001, // Fixed-point 1.0/max_readers value
c_max = c_one
};
@ -18,7 +17,7 @@ class shared_mutex final
void imp_lock_shared(s64 _old);
void imp_unlock_shared(s64 _old);
void imp_wait(s64 _old);
void imp_lock(s64 _old);
void imp_unlock(s64 _old);
@ -56,8 +55,8 @@ public:
void lock()
{
// Unconditional decrement
const s64 value = m_value.fetch_sub(c_one);
// Try to lock
const s64 value = m_value.compare_and_swap(c_one, 0);
if (value != c_one)
{
@ -80,7 +79,6 @@ public:
void lock_upgrade()
{
// TODO
if (!m_value.compare_and_swap_test(c_one - c_min, 0))
{
imp_lock_upgrade();
@ -91,7 +89,6 @@ public:
void lock_degrade()
{
// TODO
if (!m_value.compare_and_swap_test(0, c_one - c_min))
{
imp_lock_degrade();
@ -103,15 +100,16 @@ public:
class reader_lock final
{
shared_mutex& m_mutex;
bool m_upgraded = false;
void lock()
{
m_mutex.lock_shared();
m_upgraded ? m_mutex.lock() : m_mutex.lock_shared();
}
void unlock()
{
m_mutex.unlock_shared();
m_upgraded ? m_mutex.unlock() : m_mutex.unlock_shared();
}
friend class cond_variable;
@ -125,6 +123,16 @@ public:
lock();
}
// One-way lock upgrade
void upgrade()
{
if (!m_upgraded)
{
m_mutex.lock_upgrade();
m_upgraded = true;
}
}
~reader_lock()
{
unlock();
@ -136,95 +144,29 @@ class writer_lock final
{
shared_mutex& m_mutex;
void lock()
{
m_mutex.lock();
}
void unlock()
{
m_mutex.unlock();
}
friend class cond_variable;
public:
writer_lock(const writer_lock&) = delete;
explicit writer_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock();
lock();
}
~writer_lock()
{
m_mutex.unlock();
}
};
// Exclusive (writer) lock in the scope of shared (reader) lock (experimental).
class upgraded_lock final
{
shared_mutex& m_mutex;
public:
upgraded_lock(const writer_lock&) = delete;
explicit upgraded_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_upgrade();
}
~upgraded_lock()
{
m_mutex.lock_degrade();
}
};
// Normal mutex with owner registration.
class owned_mutex
{
atomic_t<u32> m_value{0};
atomic_t<u32> m_owner{0};
protected:
// Thread id
static thread_local const u32 g_tid;
public:
constexpr owned_mutex() = default;
// Returns false if current thread already owns the mutex.
bool lock() noexcept;
// Returns false if locked by any thread.
bool try_lock() noexcept;
// Returns false if current thread doesn't own the mutex.
bool unlock() noexcept;
// Check state.
bool is_locked() const { return m_value != 0; }
// Check owner.
bool is_owned() const { return m_owner == g_tid; }
};
// Recursive lock for owned_mutex (experimental).
class recursive_lock final
{
owned_mutex& m_mutex;
const bool m_first;
public:
recursive_lock(const recursive_lock&) = delete;
explicit recursive_lock(owned_mutex& mutex)
: m_mutex(mutex)
, m_first(mutex.lock())
{
}
// Check whether the lock "owns" the mutex
explicit operator bool() const
{
return m_first;
}
~recursive_lock()
{
if (m_first && !m_mutex.unlock())
{
}
unlock();
}
};

View file

@ -1,50 +1,59 @@
#include "sema.h"
#include "sync.h"
void semaphore_base::imp_wait(bool lsb)
void semaphore_base::imp_wait()
{
// 1. Obtain LSB, reset it
// 2. Notify other posters if necessary
for (int i = 0; i < 10; i++)
{
busy_wait();
const s32 value = m_value.load();
if (value > 0 && m_value.compare_and_swap_test(value, value - 1))
{
return;
}
}
#ifdef _WIN32
if (!lsb)
{
while ((m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0))
{
// Wait infinitely until signaled
verify(HERE), NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr) == ERROR_SUCCESS;
}
}
const s32 value = m_value.fetch_sub(1);
// Notify instantly
LARGE_INTEGER timeout;
timeout.QuadPart = 0;
if (HRESULT rc = NtReleaseKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout))
if (value <= 0)
{
verify(HERE), rc == WAIT_TIMEOUT;
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
}
#elif __linux__
if (!lsb)
{
for (s32 value = m_value; (m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0); value = m_value)
{
if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, -2) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
}
verify(HERE), futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, 1) >= 0;
#else
if (lsb)
while (true)
{
return;
}
// Try hard way
const s32 value = m_value.op_fetch([](s32& value)
{
// Use sign bit to acknowledge waiter presence
if (value && value > INT32_MIN)
{
value--;
while ((m_value & 1) == 0 || !atomic_storage<s32>::btr(m_value.raw(), 0))
{
std::this_thread::sleep_for(std::chrono::microseconds(50));
if (value < 0)
{
// Remove sign bit
value -= INT32_MIN;
}
}
else
{
// Set sign bit
value = INT32_MIN;
}
});
if (value >= 0)
{
// Signal other waiter to wake up or to restore sign bit
futex(&m_value.raw(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
return;
}
futex(&m_value.raw(), FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0);
}
#endif
}
@ -53,62 +62,10 @@ void semaphore_base::imp_post(s32 _old)
{
verify("semaphore_base: overflow" HERE), _old < 0;
// 1. Set LSB, waiting until it can be set if necessary
// 2. Notify waiter
#ifdef _WIN32
while ((_old & 1) == 0 && atomic_storage<s32>::bts(m_value.raw(), 0))
{
static_assert(ERROR_SUCCESS == 0, "Unexpected ERROR_SUCCESS value");
LARGE_INTEGER timeout;
timeout.QuadPart = -500; // ~50us
if (HRESULT rc = NtWaitForKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
}
}
LARGE_INTEGER timeout;
timeout.QuadPart = 0; // Instant for the first attempt
while (HRESULT rc = NtReleaseKeyedEvent(nullptr, &m_value, false, &timeout))
{
verify(HERE), rc == WAIT_TIMEOUT;
if (!timeout.QuadPart)
{
timeout.QuadPart = -500; // ~50us
NtDelayExecution(false, &timeout);
}
if ((m_value & 1) == 0)
{
break;
}
}
#elif __linux__
for (s32 value = m_value; (_old & 1) == 0 && atomic_storage<s32>::bts(m_value.raw(), 0); value = m_value)
{
if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, 1) == -1)
{
verify(HERE), errno == EAGAIN;
}
}
if (1 + 0 == verify(HERE, 1 + futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, -2)))
{
usleep(50);
}
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
if (_old & 1)
{
return;
}
while (m_value & 1 || atomic_storage<s32>::bts(m_value.raw(), 0))
{
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
futex(&m_value.raw(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
#endif
}
@ -117,24 +74,13 @@ bool semaphore_base::try_wait()
// Conditional decrement
const s32 value = m_value.fetch_op([](s32& value)
{
if (value > 0 || value & 1)
if (value > 0)
{
if (value <= 1)
{
value &= ~1;
}
value -= 1 << 1;
value -= 1;
}
});
if (value & 1 && value <= 1)
{
imp_wait(true);
return true;
}
return value > 0 || value & 1;
return value > 0;
}
bool semaphore_base::try_post(s32 _max)
@ -144,18 +90,13 @@ bool semaphore_base::try_post(s32 _max)
{
if (value < _max)
{
if (value < 0)
{
value |= 1;
}
value += 1 << 1;
value += 1;
}
});
if (value < 0)
{
imp_post(value ^ 1);
imp_post(value);
}
return value < _max;

View file

@ -6,10 +6,10 @@
// Lightweight semaphore helper class
class semaphore_base
{
// Semaphore value (shifted; negative values imply 0 with waiters, LSB is used to ping-pong signals between threads)
// Semaphore value
atomic_t<s32> m_value;
void imp_wait(bool lsb);
void imp_wait();
void imp_post(s32 _old);
@ -23,10 +23,13 @@ protected:
void wait()
{
// Unconditional decrement
if (UNLIKELY(m_value.sub_fetch(1 << 1) < 0))
// Load value
const s32 value = m_value.load();
// Conditional decrement
if (UNLIKELY(value <= 0 || !m_value.compare_and_swap_test(value, value - 1)))
{
imp_wait(false);
imp_wait();
}
}
@ -35,11 +38,11 @@ protected:
void post(s32 _max)
{
// Unconditional increment
const s32 value = m_value.fetch_add(1 << 1);
const s32 value = m_value.fetch_add(1);
if (UNLIKELY(value < 0 || value >= _max))
{
imp_post(value & ~1);
imp_post(value);
}
}
@ -53,7 +56,7 @@ public:
const s32 value = m_value;
// Return only positive value
return value < 0 ? 0 : value >> 1;
return value < 0 ? 0 : value;
}
};
@ -61,8 +64,8 @@ public:
template <s32 Max = 1, s32 Def = Max>
class semaphore final : public semaphore_base
{
static_assert(Max >= 0 && Max < (1 << 30), "semaphore<>: Max is out of bounds");
static_assert(Def >= 0 && Def < (1 << 30), "semaphore<>: Def is out of bounds");
static_assert(Max >= 0, "semaphore<>: Max is out of bounds");
static_assert(Def >= 0, "semaphore<>: Def is out of bounds");
static_assert(Def <= Max, "semaphore<>: Def is too big");
using base = semaphore_base;
@ -70,13 +73,13 @@ class semaphore final : public semaphore_base
public:
// Default constructor (recommended)
constexpr semaphore()
: base{Def << 1}
: base{Def}
{
}
// Explicit value constructor (not recommended)
explicit constexpr semaphore(s32 value)
: base{value << 1}
: base{value}
{
}
@ -95,13 +98,13 @@ public:
// Return a semaphore
void post()
{
return base::post(Max << 1);
return base::post(Max);
}
// Try to return a semaphore
explicit_bool_t try_post()
{
return base::try_post(Max << 1);
return base::try_post(Max);
}
// Get max semaphore value

View file

@ -898,3 +898,9 @@ constexpr FORCE_INLINE error_code::not_an_error not_an_error(const T& value)
{
return static_cast<error_code::not_an_error>(static_cast<s32>(value));
}
// Synchronization helper (cache-friendly busy waiting)
inline void busy_wait(std::size_t count = 100)
{
while (count--) _mm_pause();
}