From 54daf496e2e957a9f69aa88bf6c42520a2dbfa02 Mon Sep 17 00:00:00 2001 From: joboet Date: Mon, 25 Apr 2022 15:19:50 +0200 Subject: [PATCH 1/3] std: directly use pthread in UNIX parker implementation Mutex and Condvar are being replaced by more efficient implementations, which need thread parking themselves (see #93740). Therefore use the pthread synchronization primitives directly. Also, avoid allocating because the Parker struct is being placed in an Arc anyways. --- library/std/src/sys/unix/mod.rs | 1 + library/std/src/sys/unix/thread_parker.rs | 303 ++++++++++++++++++ library/std/src/sys/windows/thread_parker.rs | 20 +- .../std/src/sys_common/thread_parker/futex.rs | 18 +- .../src/sys_common/thread_parker/generic.rs | 24 +- .../std/src/sys_common/thread_parker/mod.rs | 2 + library/std/src/thread/mod.rs | 32 +- 7 files changed, 372 insertions(+), 28 deletions(-) create mode 100644 library/std/src/sys/unix/thread_parker.rs diff --git a/library/std/src/sys/unix/mod.rs b/library/std/src/sys/unix/mod.rs index aedeb02e656..8e909aab7f0 100644 --- a/library/std/src/sys/unix/mod.rs +++ b/library/std/src/sys/unix/mod.rs @@ -39,6 +39,7 @@ pub mod thread; pub mod thread_local_dtor; pub mod thread_local_key; +pub mod thread_parker; pub mod time; #[cfg(target_os = "espidf")] diff --git a/library/std/src/sys/unix/thread_parker.rs b/library/std/src/sys/unix/thread_parker.rs new file mode 100644 index 00000000000..90bdb46c4b0 --- /dev/null +++ b/library/std/src/sys/unix/thread_parker.rs @@ -0,0 +1,303 @@ +//! Thread parking without `futex` using the `pthread` synchronization primitives. + +#![cfg(not(any( + target_os = "linux", + target_os = "android", + all(target_os = "emscripten", target_feature = "atomics") +)))] + +use crate::cell::UnsafeCell; +use crate::marker::PhantomPinned; +use crate::pin::Pin; +use crate::ptr::addr_of_mut; +use crate::sync::atomic::AtomicUsize; +use crate::sync::atomic::Ordering::SeqCst; +use crate::time::Duration; + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +unsafe fn lock(lock: *mut libc::pthread_mutex_t) { + let r = libc::pthread_mutex_lock(lock); + debug_assert_eq!(r, 0); +} + +unsafe fn unlock(lock: *mut libc::pthread_mutex_t) { + let r = libc::pthread_mutex_unlock(lock); + debug_assert_eq!(r, 0); +} + +unsafe fn notify_one(cond: *mut libc::pthread_cond_t) { + let r = libc::pthread_cond_signal(cond); + debug_assert_eq!(r, 0); +} + +unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) { + let r = libc::pthread_cond_wait(cond, lock); + debug_assert_eq!(r, 0); +} + +const TIMESPEC_MAX: libc::timespec = + libc::timespec { tv_sec: ::MAX, tv_nsec: 1_000_000_000 - 1 }; + +fn saturating_cast_to_time_t(value: u64) -> libc::time_t { + if value > ::MAX as u64 { ::MAX } else { value as libc::time_t } +} + +// This implementation is used on systems that support pthread_condattr_setclock +// where we configure the condition variable to use the monotonic clock (instead of +// the default system clock). This approach avoids all problems that result +// from changes made to the system time. +#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "espidf")))] +unsafe fn wait_timeout( + cond: *mut libc::pthread_cond_t, + lock: *mut libc::pthread_mutex_t, + dur: Duration, +) { + use crate::mem; + + let mut now: libc::timespec = mem::zeroed(); + let r = libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut now); + assert_eq!(r, 0); + // Nanosecond calculations can't overflow because both values are below 1e9. + let nsec = dur.subsec_nanos() + now.tv_nsec as u32; + let sec = saturating_cast_to_time_t(dur.as_secs()) + .checked_add((nsec / 1_000_000_000) as libc::time_t) + .and_then(|s| s.checked_add(now.tv_sec)); + let nsec = nsec % 1_000_000_000; + let timeout = + sec.map(|s| libc::timespec { tv_sec: s, tv_nsec: nsec as _ }).unwrap_or(TIMESPEC_MAX); + let r = libc::pthread_cond_timedwait(cond, lock, &timeout); + assert!(r == libc::ETIMEDOUT || r == 0); +} + +// This implementation is modeled after libcxx's condition_variable +// https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46 +// https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367 +#[cfg(any(target_os = "macos", target_os = "ios", target_os = "espidf"))] +unsafe fn wait_timeout( + cond: *mut libc::pthread_cond_t, + lock: *mut libc::pthread_mutex_t, + mut dur: Duration, +) { + use crate::ptr; + + // 1000 years + let max_dur = Duration::from_secs(1000 * 365 * 86400); + + if dur > max_dur { + // OSX implementation of `pthread_cond_timedwait` is buggy + // with super long durations. When duration is greater than + // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait` + // in macOS Sierra return error 316. + // + // This program demonstrates the issue: + // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c + // + // To work around this issue, and possible bugs of other OSes, timeout + // is clamped to 1000 years, which is allowable per the API of `park_timeout` + // because of spurious wakeups. + dur = max_dur; + } + + let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 }; + let r = libc::gettimeofday(&mut sys_now, ptr::null_mut()); + debug_assert_eq!(r, 0); + let nsec = dur.subsec_nanos() as libc::c_long + (sys_now.tv_usec * 1000) as libc::c_long; + let extra = (nsec / 1_000_000_000) as libc::time_t; + let nsec = nsec % 1_000_000_000; + let seconds = saturating_cast_to_time_t(dur.as_secs()); + let timeout = sys_now + .tv_sec + .checked_add(extra) + .and_then(|s| s.checked_add(seconds)) + .map(|s| libc::timespec { tv_sec: s, tv_nsec: nsec }) + .unwrap_or(TIMESPEC_MAX); + // And wait! + let r = libc::pthread_cond_timedwait(cond, lock, &timeout); + debug_assert!(r == libc::ETIMEDOUT || r == 0); +} + +pub struct Parker { + state: AtomicUsize, + lock: UnsafeCell, + cvar: UnsafeCell, + // The `pthread` primitives require a stable address, so make this struct `!Unpin`. + _pinned: PhantomPinned, +} + +impl Parker { + /// Construct the UNIX parker in-place. + /// + /// # Safety + /// The constructed parker must never be moved. + pub unsafe fn new(parker: *mut Parker) { + // Use the default mutex implementation to allow for simpler initialization. + // This could lead to undefined behaviour when deadlocking. This is avoided + // by not deadlocking. Note in particular the unlocking operation before any + // panic, as code after the panic could try to park again. + addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY)); + addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER)); + + cfg_if::cfg_if! { + if #[cfg(any( + target_os = "macos", + target_os = "ios", + target_os = "l4re", + target_os = "android", + target_os = "redox" + ))] { + addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER)); + } else if #[cfg(target_os = "espidf")] { + let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null()); + assert_eq!(r, 0); + } else { + use crate::mem::MaybeUninit; + let mut attr = MaybeUninit::::uninit(); + let r = libc::pthread_condattr_init(attr.as_mut_ptr()); + assert_eq!(r, 0); + let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC); + assert_eq!(r, 0); + let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr()); + assert_eq!(r, 0); + let r = libc::pthread_condattr_destroy(attr.as_mut_ptr()); + assert_eq!(r, 0); + } + } + } + + // This implementation doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. + pub unsafe fn park(self: Pin<&Self>) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + // Otherwise we need to coordinate going to sleep + lock(self.lock.get()); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + + unlock(self.lock.get()); + + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => { + unlock(self.lock.get()); + + panic!("inconsistent park state") + } + } + + loop { + wait(self.cvar.get(), self.lock.get()); + + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => break, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } + } + + unlock(self.lock.get()); + } + + // This implementation doesn't require `unsafe`, but other implementations + // may assume this is only called by the thread that owns the Parker. Use + // `Pin` to guarantee a stable address for the mutex and condition variable. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, and + // afterwards we start coordinating for a sleep. + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + lock(self.lock.get()); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + unlock(self.lock.get()); + + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => { + unlock(self.lock.get()); + panic!("inconsistent park_timeout state") + } + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + wait_timeout(self.cvar.get(), self.lock.get(), dur); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => unlock(self.lock.get()), // got a notification, hurray! + PARKED => unlock(self.lock.get()), // no notification, alas + n => { + unlock(self.lock.get()); + panic!("inconsistent park_timeout state: {n}") + } + } + } + + pub fn unpark(self: Pin<&Self>) { + // To ensure the unparked thread will observe any writes we made + // before this call, we must perform a release operation that `park` + // can synchronize with. To do that we must write `NOTIFIED` even if + // `state` is already `NOTIFIED`. That is why this must be a swap + // rather than a compare-and-swap that returns if it reads `NOTIFIED` + // on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + unsafe { + lock(self.lock.get()); + unlock(self.lock.get()); + notify_one(self.cvar.get()); + } + } +} + +impl Drop for Parker { + fn drop(&mut self) { + unsafe { + libc::pthread_cond_destroy(self.cvar.get_mut()); + libc::pthread_mutex_destroy(self.lock.get_mut()); + } + } +} + +unsafe impl Sync for Parker {} +unsafe impl Send for Parker {} diff --git a/library/std/src/sys/windows/thread_parker.rs b/library/std/src/sys/windows/thread_parker.rs index 3497da51dee..51448344475 100644 --- a/library/std/src/sys/windows/thread_parker.rs +++ b/library/std/src/sys/windows/thread_parker.rs @@ -58,6 +58,7 @@ // [4]: Windows Internals, Part 1, ISBN 9780735671300 use crate::convert::TryFrom; +use crate::pin::Pin; use crate::ptr; use crate::sync::atomic::{ AtomicI8, AtomicPtr, @@ -95,13 +96,16 @@ pub struct Parker { // Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using // Ordering::Acquire when reading this state in park() after waking up. impl Parker { - pub fn new() -> Self { - Self { state: AtomicI8::new(EMPTY) } + /// Construct the Windows parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Self { state: AtomicI8::new(EMPTY) }); } // Assumes this is only called by the thread that owns the Parker, - // which means that `self.state != PARKED`. - pub unsafe fn park(&self) { + // which means that `self.state != PARKED`. This implementation doesn't require `Pin`, + // but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the // first case. if self.state.fetch_sub(1, Acquire) == NOTIFIED { @@ -132,8 +136,9 @@ pub unsafe fn park(&self) { } // Assumes this is only called by the thread that owns the Parker, - // which means that `self.state != PARKED`. - pub unsafe fn park_timeout(&self, timeout: Duration) { + // which means that `self.state != PARKED`. This implementation doesn't require `Pin`, + // but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) { // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the // first case. if self.state.fetch_sub(1, Acquire) == NOTIFIED { @@ -184,7 +189,8 @@ pub unsafe fn park_timeout(&self, timeout: Duration) { } } - pub fn unpark(&self) { + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and // wake the thread in the first case. // diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parker/futex.rs index fbf6231ff4a..d9e2f39e345 100644 --- a/library/std/src/sys_common/thread_parker/futex.rs +++ b/library/std/src/sys_common/thread_parker/futex.rs @@ -1,3 +1,4 @@ +use crate::pin::Pin; use crate::sync::atomic::AtomicU32; use crate::sync::atomic::Ordering::{Acquire, Release}; use crate::sys::futex::{futex_wait, futex_wake}; @@ -32,14 +33,15 @@ pub struct Parker { // Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using // Ordering::Acquire when checking for this state in park(). impl Parker { - #[inline] - pub const fn new() -> Self { - Parker { state: AtomicU32::new(EMPTY) } + /// Construct the futex parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Self { state: AtomicU32::new(EMPTY) }); } // Assumes this is only called by the thread that owns the Parker, // which means that `self.state != PARKED`. - pub unsafe fn park(&self) { + pub unsafe fn park(self: Pin<&Self>) { // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the // first case. if self.state.fetch_sub(1, Acquire) == NOTIFIED { @@ -58,8 +60,9 @@ pub unsafe fn park(&self) { } // Assumes this is only called by the thread that owns the Parker, - // which means that `self.state != PARKED`. - pub unsafe fn park_timeout(&self, timeout: Duration) { + // which means that `self.state != PARKED`. This implementation doesn't + // require `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) { // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the // first case. if self.state.fetch_sub(1, Acquire) == NOTIFIED { @@ -78,8 +81,9 @@ pub unsafe fn park_timeout(&self, timeout: Duration) { } } + // This implementation doesn't require `Pin`, but other implementations do. #[inline] - pub fn unpark(&self) { + pub fn unpark(self: Pin<&Self>) { // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and // wake the thread in the first case. // diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parker/generic.rs index ffb61200e15..f3d8b34d3fd 100644 --- a/library/std/src/sys_common/thread_parker/generic.rs +++ b/library/std/src/sys_common/thread_parker/generic.rs @@ -1,5 +1,6 @@ //! Parker implementation based on a Mutex and Condvar. +use crate::pin::Pin; use crate::sync::atomic::AtomicUsize; use crate::sync::atomic::Ordering::SeqCst; use crate::sync::{Condvar, Mutex}; @@ -16,13 +17,18 @@ pub struct Parker { } impl Parker { - pub fn new() -> Self { - Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() } + /// Construct the generic parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }); } - // This implementation doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. - pub unsafe fn park(&self) { + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { // If we were previously notified then we consume this notification and // return quickly. if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { @@ -55,9 +61,8 @@ pub unsafe fn park(&self) { } } - // This implementation doesn't require `unsafe`, but other implementations - // may assume this is only called by the thread that owns the Parker. - pub unsafe fn park_timeout(&self, dur: Duration) { + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { // Like `park` above we have a fast path for an already-notified thread, and // afterwards we start coordinating for a sleep. // return quickly. @@ -88,7 +93,8 @@ pub unsafe fn park_timeout(&self, dur: Duration) { } } - pub fn unpark(&self) { + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { // To ensure the unparked thread will observe any writes we made // before this call, we must perform a release operation that `park` // can synchronize with. To do that we must write `NOTIFIED` even if diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs index ba896bd676b..ea0204cd357 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -8,6 +8,8 @@ pub use futex::Parker; } else if #[cfg(windows)] { pub use crate::sys::thread_parker::Parker; + } else if #[cfg(target_family = "unix")] { + pub use crate::sys::thread_parker::Parker; } else { mod generic; pub use generic::Parker; diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 5309dc47ac4..99da5f7a87e 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -163,6 +163,8 @@ use crate::num::NonZeroUsize; use crate::panic; use crate::panicking; +use crate::pin::Pin; +use crate::ptr::addr_of_mut; use crate::str; use crate::sync::Arc; use crate::sys::thread as imp; @@ -923,7 +925,7 @@ pub fn sleep(dur: Duration) { pub fn park() { // SAFETY: park_timeout is called on the parker owned by this thread. unsafe { - current().inner.parker.park(); + current().inner.as_ref().parker().park(); } } @@ -987,7 +989,7 @@ pub fn park_timeout_ms(ms: u32) { pub fn park_timeout(dur: Duration) { // SAFETY: park_timeout is called on the parker owned by this thread. unsafe { - current().inner.parker.park_timeout(dur); + current().inner.as_ref().parker().park_timeout(dur); } } @@ -1073,6 +1075,12 @@ struct Inner { parker: Parker, } +impl Inner { + fn parker(self: Pin<&Self>) -> Pin<&Parker> { + unsafe { Pin::map_unchecked(self, |inner| &inner.parker) } + } +} + #[derive(Clone)] #[stable(feature = "rust1", since = "1.0.0")] /// A handle to a thread. @@ -1094,14 +1102,28 @@ struct Inner { /// /// [`thread::current`]: current pub struct Thread { - inner: Arc, + inner: Pin>, } impl Thread { // Used only internally to construct a thread object without spawning // Panics if the name contains nuls. pub(crate) fn new(name: Option) -> Thread { - Thread { inner: Arc::new(Inner { name, id: ThreadId::new(), parker: Parker::new() }) } + // We have to use `unsafe` here to constuct the `Parker` in-place, + // which is required for the UNIX implementation. + // + // SAFETY: We pin the Arc immediately after creation, so its address never + // changes. + let inner = unsafe { + let mut arc = Arc::::new_uninit(); + let ptr = Arc::get_mut_unchecked(&mut arc).as_mut_ptr(); + addr_of_mut!((*ptr).name).write(name); + addr_of_mut!((*ptr).id).write(ThreadId::new()); + Parker::new(addr_of_mut!((*ptr).parker)); + Pin::new_unchecked(arc.assume_init()) + }; + + Thread { inner } } /// Atomically makes the handle's token available if it is not already. @@ -1137,7 +1159,7 @@ pub(crate) fn new(name: Option) -> Thread { #[stable(feature = "rust1", since = "1.0.0")] #[inline] pub fn unpark(&self) { - self.inner.parker.unpark(); + self.inner.as_ref().parker().unpark(); } /// Gets the thread's unique identifier. From 550273361d8c8fc9b2c4d63e0b515d37af9d63f8 Mon Sep 17 00:00:00 2001 From: joboet Date: Wed, 27 Apr 2022 19:28:27 +0200 Subject: [PATCH 2/3] std: simplify UNIX parker timeouts --- library/std/src/sys/unix/thread_parker.rs | 74 ++++++----------------- library/std/src/sys/unix/time.rs | 4 +- 2 files changed, 20 insertions(+), 58 deletions(-) diff --git a/library/std/src/sys/unix/thread_parker.rs b/library/std/src/sys/unix/thread_parker.rs index 90bdb46c4b0..fd83f2f73d6 100644 --- a/library/std/src/sys/unix/thread_parker.rs +++ b/library/std/src/sys/unix/thread_parker.rs @@ -41,52 +41,18 @@ unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t const TIMESPEC_MAX: libc::timespec = libc::timespec { tv_sec: ::MAX, tv_nsec: 1_000_000_000 - 1 }; -fn saturating_cast_to_time_t(value: u64) -> libc::time_t { - if value > ::MAX as u64 { ::MAX } else { value as libc::time_t } -} - -// This implementation is used on systems that support pthread_condattr_setclock -// where we configure the condition variable to use the monotonic clock (instead of -// the default system clock). This approach avoids all problems that result -// from changes made to the system time. -#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "espidf")))] unsafe fn wait_timeout( cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t, dur: Duration, ) { - use crate::mem; + // Use the system clock on systems that do not support pthread_condattr_setclock. + // This unfortunately results in problems when the system time changes. + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "espidf"))] + let (now, dur) = { + use super::time::SystemTime; + use crate::cmp::min; - let mut now: libc::timespec = mem::zeroed(); - let r = libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut now); - assert_eq!(r, 0); - // Nanosecond calculations can't overflow because both values are below 1e9. - let nsec = dur.subsec_nanos() + now.tv_nsec as u32; - let sec = saturating_cast_to_time_t(dur.as_secs()) - .checked_add((nsec / 1_000_000_000) as libc::time_t) - .and_then(|s| s.checked_add(now.tv_sec)); - let nsec = nsec % 1_000_000_000; - let timeout = - sec.map(|s| libc::timespec { tv_sec: s, tv_nsec: nsec as _ }).unwrap_or(TIMESPEC_MAX); - let r = libc::pthread_cond_timedwait(cond, lock, &timeout); - assert!(r == libc::ETIMEDOUT || r == 0); -} - -// This implementation is modeled after libcxx's condition_variable -// https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46 -// https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367 -#[cfg(any(target_os = "macos", target_os = "ios", target_os = "espidf"))] -unsafe fn wait_timeout( - cond: *mut libc::pthread_cond_t, - lock: *mut libc::pthread_mutex_t, - mut dur: Duration, -) { - use crate::ptr; - - // 1000 years - let max_dur = Duration::from_secs(1000 * 365 * 86400); - - if dur > max_dur { // OSX implementation of `pthread_cond_timedwait` is buggy // with super long durations. When duration is greater than // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait` @@ -98,23 +64,19 @@ unsafe fn wait_timeout( // To work around this issue, and possible bugs of other OSes, timeout // is clamped to 1000 years, which is allowable per the API of `park_timeout` // because of spurious wakeups. - dur = max_dur; - } + let dur = min(dur, Duration::from_secs(1000 * 365 * 86400)); + let now = SystemTime::now().t; + (now, dur) + }; + // Use the monotonic clock on other systems. + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "espidf")))] + let (now, dur) = { + use super::time::Timespec; - let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 }; - let r = libc::gettimeofday(&mut sys_now, ptr::null_mut()); - debug_assert_eq!(r, 0); - let nsec = dur.subsec_nanos() as libc::c_long + (sys_now.tv_usec * 1000) as libc::c_long; - let extra = (nsec / 1_000_000_000) as libc::time_t; - let nsec = nsec % 1_000_000_000; - let seconds = saturating_cast_to_time_t(dur.as_secs()); - let timeout = sys_now - .tv_sec - .checked_add(extra) - .and_then(|s| s.checked_add(seconds)) - .map(|s| libc::timespec { tv_sec: s, tv_nsec: nsec }) - .unwrap_or(TIMESPEC_MAX); - // And wait! + (Timespec::now(libc::CLOCK_MONOTONIC), dur) + }; + + let timeout = now.checked_add_duration(&dur).map(|t| t.t).unwrap_or(TIMESPEC_MAX); let r = libc::pthread_cond_timedwait(cond, lock, &timeout); debug_assert!(r == libc::ETIMEDOUT || r == 0); } diff --git a/library/std/src/sys/unix/time.rs b/library/std/src/sys/unix/time.rs index 498c94d0cdc..d43ceec9c8a 100644 --- a/library/std/src/sys/unix/time.rs +++ b/library/std/src/sys/unix/time.rs @@ -132,7 +132,7 @@ pub struct Instant { #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SystemTime { - t: Timespec, + pub(in crate::sys::unix) t: Timespec, } pub const UNIX_EPOCH: SystemTime = SystemTime { t: Timespec::zero() }; @@ -279,7 +279,7 @@ pub struct Instant { #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SystemTime { - t: Timespec, + pub(in crate::sys::unix) t: Timespec, } pub const UNIX_EPOCH: SystemTime = SystemTime { t: Timespec::zero() }; From 1285fb746649dbad417733a4741cb98e88a497f3 Mon Sep 17 00:00:00 2001 From: joboet Date: Thu, 28 Apr 2022 15:34:43 +0200 Subject: [PATCH 3/3] std: update debuginfo check to match type definition --- src/test/debuginfo/thread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/debuginfo/thread.rs b/src/test/debuginfo/thread.rs index 531c37a3421..388d50c5cdc 100644 --- a/src/test/debuginfo/thread.rs +++ b/src/test/debuginfo/thread.rs @@ -14,7 +14,7 @@ // // cdb-command:dx t,d // cdb-check:t,d : [...] [Type: std::thread::Thread *] -// cdb-check: [...] inner : {...} [Type: alloc::sync::Arc] +// cdb-check:[...] inner [...][Type: core::pin::Pin >] use std::thread;