Use futex locks on wasm+atomics.

This commit is contained in:
Mara Bos 2022-04-19 09:21:11 +02:00
parent 65987ae8f5
commit 8f2913cc24
5 changed files with 6 additions and 354 deletions

View file

@ -1,102 +0,0 @@
use crate::arch::wasm32;
use crate::cmp;
use crate::mem;
use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use crate::sys::locks::Mutex;
use crate::time::Duration;
pub struct Condvar {
cnt: AtomicUsize,
}
pub type MovableCondvar = Condvar;
// Condition variables are implemented with a simple counter internally that is
// likely to cause spurious wakeups. Blocking on a condition variable will first
// read the value of the internal counter, unlock the given mutex, and then
// block if and only if the counter's value is still the same. Notifying a
// condition variable will modify the counter (add one for now) and then wake up
// a thread waiting on the address of the counter.
//
// A thread waiting on the condition variable will as a result avoid going to
// sleep if it's notified after the lock is unlocked but before it fully goes to
// sleep. A sleeping thread is guaranteed to be woken up at some point as it can
// only be woken up with a call to `wake`.
//
// Note that it's possible for 2 or more threads to be woken up by a call to
// `notify_one` with this implementation. That can happen where the modification
// of `cnt` causes any threads in the middle of `wait` to avoid going to sleep,
// and the subsequent `wake` may wake up a thread that's actually blocking. We
// consider this a spurious wakeup, though, which all users of condition
// variables must already be prepared to handle. As a result, this source of
// spurious wakeups is currently though to be ok, although it may be problematic
// later on if it causes too many spurious wakeups.
impl Condvar {
pub const fn new() -> Condvar {
Condvar { cnt: AtomicUsize::new(0) }
}
#[inline]
pub unsafe fn init(&mut self) {
// nothing to do
}
pub unsafe fn notify_one(&self) {
self.cnt.fetch_add(1, SeqCst);
// SAFETY: ptr() is always valid
unsafe {
wasm32::memory_atomic_notify(self.ptr(), 1);
}
}
#[inline]
pub unsafe fn notify_all(&self) {
self.cnt.fetch_add(1, SeqCst);
// SAFETY: ptr() is always valid
unsafe {
wasm32::memory_atomic_notify(self.ptr(), u32::MAX); // -1 == "wake everyone"
}
}
pub unsafe fn wait(&self, mutex: &Mutex) {
// "atomically block and unlock" implemented by loading our current
// counter's value, unlocking the mutex, and blocking if the counter
// still has the same value.
//
// Notifications happen by incrementing the counter and then waking a
// thread. Incrementing the counter after we unlock the mutex will
// prevent us from sleeping and otherwise the call to `wake` will
// wake us up once we're asleep.
let ticket = self.cnt.load(SeqCst) as i32;
mutex.unlock();
let val = wasm32::memory_atomic_wait32(self.ptr(), ticket, -1);
// 0 == woken, 1 == not equal to `ticket`, 2 == timeout (shouldn't happen)
debug_assert!(val == 0 || val == 1);
mutex.lock();
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
let ticket = self.cnt.load(SeqCst) as i32;
mutex.unlock();
let nanos = dur.as_nanos();
let nanos = cmp::min(i64::MAX as u128, nanos);
// If the return value is 2 then a timeout happened, so we return
// `false` as we weren't actually notified.
let ret = wasm32::memory_atomic_wait32(self.ptr(), ticket, nanos as i64) != 2;
mutex.lock();
return ret;
}
#[inline]
pub unsafe fn destroy(&self) {
// nothing to do
}
#[inline]
fn ptr(&self) -> *mut i32 {
assert_eq!(mem::size_of::<usize>(), mem::size_of::<i32>());
self.cnt.as_mut_ptr() as *mut i32
}
}

View file

@ -1,64 +0,0 @@
use crate::arch::wasm32;
use crate::mem;
use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst};
pub struct Mutex {
locked: AtomicUsize,
}
pub type MovableMutex = Mutex;
// Mutexes have a pretty simple implementation where they contain an `i32`
// internally that is 0 when unlocked and 1 when the mutex is locked.
// Acquisition has a fast path where it attempts to cmpxchg the 0 to a 1, and
// if it fails it then waits for a notification. Releasing a lock is then done
// by swapping in 0 and then notifying any waiters, if present.
impl Mutex {
pub const fn new() -> Mutex {
Mutex { locked: AtomicUsize::new(0) }
}
#[inline]
pub unsafe fn init(&mut self) {
// nothing to do
}
pub unsafe fn lock(&self) {
while !self.try_lock() {
// SAFETY: the caller must uphold the safety contract for `memory_atomic_wait32`.
let val = unsafe {
wasm32::memory_atomic_wait32(
self.ptr(),
1, // we expect our mutex is locked
-1, // wait infinitely
)
};
// we should have either woke up (0) or got a not-equal due to a
// race (1). We should never time out (2)
debug_assert!(val == 0 || val == 1);
}
}
pub unsafe fn unlock(&self) {
let prev = self.locked.swap(0, SeqCst);
debug_assert_eq!(prev, 1);
wasm32::memory_atomic_notify(self.ptr(), 1); // wake up one waiter, if any
}
#[inline]
pub unsafe fn try_lock(&self) -> bool {
self.locked.compare_exchange(0, 1, SeqCst, SeqCst).is_ok()
}
#[inline]
pub unsafe fn destroy(&self) {
// nothing to do
}
#[inline]
fn ptr(&self) -> *mut i32 {
assert_eq!(mem::size_of::<usize>(), mem::size_of::<i32>());
self.locked.as_mut_ptr() as *mut i32
}
}

View file

@ -1,145 +0,0 @@
use crate::cell::UnsafeCell;
use crate::sys::locks::{Condvar, Mutex};
pub struct RwLock {
lock: Mutex,
cond: Condvar,
state: UnsafeCell<State>,
}
pub type MovableRwLock = RwLock;
enum State {
Unlocked,
Reading(usize),
Writing,
}
unsafe impl Send for RwLock {}
unsafe impl Sync for RwLock {}
// This rwlock implementation is a relatively simple implementation which has a
// condition variable for readers/writers as well as a mutex protecting the
// internal state of the lock. A current downside of the implementation is that
// unlocking the lock will notify *all* waiters rather than just readers or just
// writers. This can cause lots of "thundering stampede" problems. While
// hopefully correct this implementation is very likely to want to be changed in
// the future.
impl RwLock {
pub const fn new() -> RwLock {
RwLock { lock: Mutex::new(), cond: Condvar::new(), state: UnsafeCell::new(State::Unlocked) }
}
#[inline]
pub unsafe fn read(&self) {
self.lock.lock();
while !(*self.state.get()).inc_readers() {
self.cond.wait(&self.lock);
}
self.lock.unlock();
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
self.lock.lock();
let ok = (*self.state.get()).inc_readers();
self.lock.unlock();
return ok;
}
#[inline]
pub unsafe fn write(&self) {
self.lock.lock();
while !(*self.state.get()).inc_writers() {
self.cond.wait(&self.lock);
}
self.lock.unlock();
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
self.lock.lock();
let ok = (*self.state.get()).inc_writers();
self.lock.unlock();
return ok;
}
#[inline]
pub unsafe fn read_unlock(&self) {
self.lock.lock();
let notify = (*self.state.get()).dec_readers();
self.lock.unlock();
if notify {
// FIXME: should only wake up one of these some of the time
self.cond.notify_all();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
self.lock.lock();
(*self.state.get()).dec_writers();
self.lock.unlock();
// FIXME: should only wake up one of these some of the time
self.cond.notify_all();
}
#[inline]
pub unsafe fn destroy(&self) {
self.lock.destroy();
self.cond.destroy();
}
}
impl State {
fn inc_readers(&mut self) -> bool {
match *self {
State::Unlocked => {
*self = State::Reading(1);
true
}
State::Reading(ref mut cnt) => {
*cnt += 1;
true
}
State::Writing => false,
}
}
fn inc_writers(&mut self) -> bool {
match *self {
State::Unlocked => {
*self = State::Writing;
true
}
State::Reading(_) | State::Writing => false,
}
}
fn dec_readers(&mut self) -> bool {
let zero = match *self {
State::Reading(ref mut cnt) => {
*cnt -= 1;
*cnt == 0
}
State::Unlocked | State::Writing => invalid(),
};
if zero {
*self = State::Unlocked;
}
zero
}
fn dec_writers(&mut self) {
match *self {
State::Writing => {}
State::Unlocked | State::Reading(_) => invalid(),
}
*self = State::Unlocked;
}
}
fn invalid() -> ! {
panic!("inconsistent rwlock");
}

View file

@ -53,37 +53,3 @@ pub unsafe fn init() -> Option<Guard> {
None
}
}
// We currently just use our own thread-local to store our
// current thread's ID, and then we lazily initialize it to something allocated
// from a global counter.
pub fn my_id() -> u32 {
use crate::sync::atomic::{AtomicU32, Ordering::SeqCst};
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
#[thread_local]
static mut MY_ID: u32 = 0;
unsafe {
// If our thread ID isn't set yet then we need to allocate one. Do so
// with with a simple "atomically add to a global counter" strategy.
// This strategy doesn't handled what happens when the counter
// overflows, however, so just abort everything once the counter
// overflows and eventually we could have some sort of recycling scheme
// (or maybe this is all totally irrelevant by that point!). In any case
// though we're using a CAS loop instead of a `fetch_add` to ensure that
// the global counter never overflows.
if MY_ID == 0 {
let mut cur = NEXT_ID.load(SeqCst);
MY_ID = loop {
let next = cur.checked_add(1).unwrap_or_else(|| crate::process::abort());
match NEXT_ID.compare_exchange(cur, next, SeqCst, SeqCst) {
Ok(_) => break next,
Err(i) => cur = i,
}
};
}
MY_ID
}
}

View file

@ -49,16 +49,13 @@
cfg_if::cfg_if! {
if #[cfg(target_feature = "atomics")] {
#[path = "atomics/condvar.rs"]
mod condvar;
#[path = "atomics/mutex.rs"]
mod mutex;
#[path = "atomics/rwlock.rs"]
mod rwlock;
#[path = "../unix/locks"]
pub mod locks {
pub use super::condvar::*;
pub use super::mutex::*;
pub use super::rwlock::*;
#![allow(unsafe_op_in_unsafe_fn)]
mod futex;
mod futex_rwlock;
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
pub use futex_rwlock::{RwLock, MovableRwLock};
}
#[path = "atomics/futex.rs"]
pub mod futex;