std: use futex-based locks and thread parker on Hermit

This commit is contained in:
joboet 2022-09-06 10:44:05 +02:00
parent 4a09adf99f
commit 262193e044
No known key found for this signature in database
GPG key ID: 704E0149B0194B3C
8 changed files with 53 additions and 456 deletions

View file

@ -1656,12 +1656,13 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.2.0"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ab7905ea95c6d9af62940f9d7dd9596d54c334ae2c15300c482051292d5637f"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"compiler_builtins",
"libc",
"rustc-std-workspace-alloc",
"rustc-std-workspace-core",
]
@ -4608,7 +4609,7 @@ dependencies = [
"dlmalloc",
"fortanix-sgx-abi",
"hashbrown",
"hermit-abi 0.2.0",
"hermit-abi 0.2.6",
"libc",
"miniz_oxide 0.4.0",
"object 0.26.2",

View file

@ -42,7 +42,7 @@ dlmalloc = { version = "0.2.3", features = ['rustc-dep-of-std'] }
fortanix-sgx-abi = { version = "0.5.0", features = ['rustc-dep-of-std'] }
[target.'cfg(target_os = "hermit")'.dependencies]
hermit-abi = { version = "0.2.0", features = ['rustc-dep-of-std'] }
hermit-abi = { version = "0.2.6", features = ['rustc-dep-of-std'] }
[target.wasm32-wasi.dependencies]
wasi = { version = "0.11.0", features = ['rustc-dep-of-std'], default-features = false }

View file

@ -1,90 +0,0 @@
use crate::ffi::c_void;
use crate::ptr;
use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use crate::sys::hermit::abi;
use crate::sys::locks::Mutex;
use crate::sys_common::lazy_box::{LazyBox, LazyInit};
use crate::time::Duration;
// The implementation is inspired by Andrew D. Birrell's paper
// "Implementing Condition Variables with Semaphores"
pub struct Condvar {
counter: AtomicUsize,
sem1: *const c_void,
sem2: *const c_void,
}
pub(crate) type MovableCondvar = LazyBox<Condvar>;
impl LazyInit for Condvar {
fn init() -> Box<Self> {
Box::new(Self::new())
}
}
unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}
impl Condvar {
pub fn new() -> Self {
let mut condvar =
Self { counter: AtomicUsize::new(0), sem1: ptr::null(), sem2: ptr::null() };
unsafe {
let _ = abi::sem_init(&mut condvar.sem1, 0);
let _ = abi::sem_init(&mut condvar.sem2, 0);
}
condvar
}
pub unsafe fn notify_one(&self) {
if self.counter.load(SeqCst) > 0 {
self.counter.fetch_sub(1, SeqCst);
abi::sem_post(self.sem1);
abi::sem_timedwait(self.sem2, 0);
}
}
pub unsafe fn notify_all(&self) {
let counter = self.counter.swap(0, SeqCst);
for _ in 0..counter {
abi::sem_post(self.sem1);
}
for _ in 0..counter {
abi::sem_timedwait(self.sem2, 0);
}
}
pub unsafe fn wait(&self, mutex: &Mutex) {
self.counter.fetch_add(1, SeqCst);
mutex.unlock();
abi::sem_timedwait(self.sem1, 0);
abi::sem_post(self.sem2);
mutex.lock();
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
self.counter.fetch_add(1, SeqCst);
mutex.unlock();
let millis = dur.as_millis().min(u32::MAX as u128) as u32;
let res = if millis > 0 {
abi::sem_timedwait(self.sem1, millis)
} else {
abi::sem_trywait(self.sem1)
};
abi::sem_post(self.sem2);
mutex.lock();
res == 0
}
}
impl Drop for Condvar {
fn drop(&mut self) {
unsafe {
let _ = abi::sem_destroy(self.sem1);
let _ = abi::sem_destroy(self.sem2);
}
}
}

View file

@ -0,0 +1,39 @@
use super::abi;
use crate::ptr::null;
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
// Calculate the timeout as a relative timespec.
//
// Overflows are rounded up to an infinite timeout (None).
let timespec = timeout.and_then(|dur| {
Some(abi::timespec {
tv_sec: dur.as_secs().try_into().ok()?,
tv_nsec: dur.subsec_nanos().into(),
})
});
let r = unsafe {
abi::futex_wait(
futex.as_mut_ptr(),
expected,
timespec.as_ref().map_or(null(), |t| t as *const abi::timespec),
abi::FUTEX_RELATIVE_TIMEOUT,
)
};
r != -abi::errno::ETIMEDOUT
}
#[inline]
pub fn futex_wake(futex: &AtomicU32) -> bool {
unsafe { abi::futex_wake(futex.as_mut_ptr(), 1) > 0 }
}
#[inline]
pub fn futex_wake_all(futex: &AtomicU32) {
unsafe {
abi::futex_wake(futex.as_mut_ptr(), i32::MAX);
}
}

View file

@ -25,6 +25,7 @@
pub mod env;
pub mod fd;
pub mod fs;
pub mod futex;
#[path = "../unsupported/io.rs"]
pub mod io;
pub mod memchr;
@ -45,14 +46,14 @@
pub mod thread_local_key;
pub mod time;
mod condvar;
mod mutex;
mod rwlock;
#[path = "../unix/locks"]
pub mod locks {
pub use super::condvar::*;
pub use super::mutex::*;
pub use super::rwlock::*;
mod futex_condvar;
mod futex_mutex;
mod futex_rwlock;
pub(crate) use futex_condvar::MovableCondvar;
pub(crate) use futex_mutex::{MovableMutex, Mutex};
pub(crate) use futex_rwlock::{MovableRwLock, RwLock};
}
use crate::io::ErrorKind;

View file

@ -1,212 +0,0 @@
use crate::cell::UnsafeCell;
use crate::collections::VecDeque;
use crate::hint;
use crate::ops::{Deref, DerefMut, Drop};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sys::hermit::abi;
/// This type provides a lock based on busy waiting to realize mutual exclusion
///
/// # Description
///
/// This structure behaves a lot like a common mutex. There are some differences:
///
/// - By using busy waiting, it can be used outside the runtime.
/// - It is a so called ticket lock and is completely fair.
#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
struct Spinlock<T: ?Sized> {
queue: AtomicUsize,
dequeue: AtomicUsize,
data: UnsafeCell<T>,
}
unsafe impl<T: ?Sized + Send> Sync for Spinlock<T> {}
unsafe impl<T: ?Sized + Send> Send for Spinlock<T> {}
/// A guard to which the protected data can be accessed
///
/// When the guard falls out of scope it will release the lock.
struct SpinlockGuard<'a, T: ?Sized + 'a> {
dequeue: &'a AtomicUsize,
data: &'a mut T,
}
impl<T> Spinlock<T> {
pub const fn new(user_data: T) -> Spinlock<T> {
Spinlock {
queue: AtomicUsize::new(0),
dequeue: AtomicUsize::new(1),
data: UnsafeCell::new(user_data),
}
}
#[inline]
fn obtain_lock(&self) {
let ticket = self.queue.fetch_add(1, Ordering::SeqCst) + 1;
let mut counter: u16 = 0;
while self.dequeue.load(Ordering::SeqCst) != ticket {
counter += 1;
if counter < 100 {
hint::spin_loop();
} else {
counter = 0;
unsafe {
abi::yield_now();
}
}
}
}
#[inline]
pub unsafe fn lock(&self) -> SpinlockGuard<'_, T> {
self.obtain_lock();
SpinlockGuard { dequeue: &self.dequeue, data: &mut *self.data.get() }
}
}
impl<T: ?Sized + Default> Default for Spinlock<T> {
fn default() -> Spinlock<T> {
Spinlock::new(Default::default())
}
}
impl<'a, T: ?Sized> Deref for SpinlockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
&*self.data
}
}
impl<'a, T: ?Sized> DerefMut for SpinlockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
&mut *self.data
}
}
impl<'a, T: ?Sized> Drop for SpinlockGuard<'a, T> {
/// The dropping of the SpinlockGuard will release the lock it was created from.
fn drop(&mut self) {
self.dequeue.fetch_add(1, Ordering::SeqCst);
}
}
/// Realize a priority queue for tasks
struct PriorityQueue {
queues: [Option<VecDeque<abi::Tid>>; abi::NO_PRIORITIES],
prio_bitmap: u64,
}
impl PriorityQueue {
pub const fn new() -> PriorityQueue {
PriorityQueue {
queues: [
None, None, None, None, None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None, None, None, None, None,
None, None, None,
],
prio_bitmap: 0,
}
}
/// Add a task id by its priority to the queue
pub fn push(&mut self, prio: abi::Priority, id: abi::Tid) {
let i: usize = prio.into().into();
self.prio_bitmap |= (1 << i) as u64;
if let Some(queue) = &mut self.queues[i] {
queue.push_back(id);
} else {
let mut queue = VecDeque::new();
queue.push_back(id);
self.queues[i] = Some(queue);
}
}
fn pop_from_queue(&mut self, queue_index: usize) -> Option<abi::Tid> {
if let Some(queue) = &mut self.queues[queue_index] {
let id = queue.pop_front();
if queue.is_empty() {
self.prio_bitmap &= !(1 << queue_index as u64);
}
id
} else {
None
}
}
/// Pop the task handle with the highest priority from the queue
pub fn pop(&mut self) -> Option<abi::Tid> {
for i in 0..abi::NO_PRIORITIES {
if self.prio_bitmap & (1 << i) != 0 {
return self.pop_from_queue(i);
}
}
None
}
}
struct MutexInner {
locked: bool,
blocked_task: PriorityQueue,
}
impl MutexInner {
pub const fn new() -> MutexInner {
MutexInner { locked: false, blocked_task: PriorityQueue::new() }
}
}
pub struct Mutex {
inner: Spinlock<MutexInner>,
}
pub type MovableMutex = Mutex;
unsafe impl Send for Mutex {}
unsafe impl Sync for Mutex {}
impl Mutex {
pub const fn new() -> Mutex {
Mutex { inner: Spinlock::new(MutexInner::new()) }
}
#[inline]
pub unsafe fn lock(&self) {
loop {
let mut guard = self.inner.lock();
if guard.locked == false {
guard.locked = true;
return;
} else {
let prio = abi::get_priority();
let id = abi::getpid();
guard.blocked_task.push(prio, id);
abi::block_current_task();
drop(guard);
abi::yield_now();
}
}
}
#[inline]
pub unsafe fn unlock(&self) {
let mut guard = self.inner.lock();
guard.locked = false;
if let Some(tid) = guard.blocked_task.pop() {
abi::wakeup_task(tid);
}
}
#[inline]
pub unsafe fn try_lock(&self) -> bool {
let mut guard = self.inner.lock();
if guard.locked == false {
guard.locked = true;
}
guard.locked
}
}

View file

@ -1,143 +0,0 @@
use crate::cell::UnsafeCell;
use crate::sys::locks::{MovableCondvar, Mutex};
pub struct RwLock {
lock: Mutex,
cond: MovableCondvar,
state: UnsafeCell<State>,
}
pub type MovableRwLock = RwLock;
enum State {
Unlocked,
Reading(usize),
Writing,
}
unsafe impl Send for RwLock {}
unsafe impl Sync for RwLock {}
// This rwlock implementation is a relatively simple implementation which has a
// condition variable for readers/writers as well as a mutex protecting the
// internal state of the lock. A current downside of the implementation is that
// unlocking the lock will notify *all* waiters rather than just readers or just
// writers. This can cause lots of "thundering stampede" problems. While
// hopefully correct this implementation is very likely to want to be changed in
// the future.
impl RwLock {
pub const fn new() -> RwLock {
RwLock {
lock: Mutex::new(),
cond: MovableCondvar::new(),
state: UnsafeCell::new(State::Unlocked),
}
}
#[inline]
pub unsafe fn read(&self) {
self.lock.lock();
while !(*self.state.get()).inc_readers() {
self.cond.wait(&self.lock);
}
self.lock.unlock();
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
self.lock.lock();
let ok = (*self.state.get()).inc_readers();
self.lock.unlock();
return ok;
}
#[inline]
pub unsafe fn write(&self) {
self.lock.lock();
while !(*self.state.get()).inc_writers() {
self.cond.wait(&self.lock);
}
self.lock.unlock();
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
self.lock.lock();
let ok = (*self.state.get()).inc_writers();
self.lock.unlock();
return ok;
}
#[inline]
pub unsafe fn read_unlock(&self) {
self.lock.lock();
let notify = (*self.state.get()).dec_readers();
self.lock.unlock();
if notify {
// FIXME: should only wake up one of these some of the time
self.cond.notify_all();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
self.lock.lock();
(*self.state.get()).dec_writers();
self.lock.unlock();
// FIXME: should only wake up one of these some of the time
self.cond.notify_all();
}
}
impl State {
fn inc_readers(&mut self) -> bool {
match *self {
State::Unlocked => {
*self = State::Reading(1);
true
}
State::Reading(ref mut cnt) => {
*cnt += 1;
true
}
State::Writing => false,
}
}
fn inc_writers(&mut self) -> bool {
match *self {
State::Unlocked => {
*self = State::Writing;
true
}
State::Reading(_) | State::Writing => false,
}
}
fn dec_readers(&mut self) -> bool {
let zero = match *self {
State::Reading(ref mut cnt) => {
*cnt -= 1;
*cnt == 0
}
State::Unlocked | State::Writing => invalid(),
};
if zero {
*self = State::Unlocked;
}
zero
}
fn dec_writers(&mut self) {
match *self {
State::Writing => {}
State::Unlocked | State::Reading(_) => invalid(),
}
*self = State::Unlocked;
}
}
fn invalid() -> ! {
panic!("inconsistent rwlock");
}

View file

@ -7,6 +7,7 @@
target_os = "openbsd",
target_os = "dragonfly",
target_os = "fuchsia",
target_os = "hermit",
))] {
mod futex;
pub use futex::Parker;