implement sync::mpsc as a wrapper around sync::mpmc

This commit is contained in:
Ibraheem Ahmed 2022-10-17 19:11:56 -04:00
parent a43da5a097
commit 31dc5bba89
11 changed files with 23 additions and 2797 deletions

View file

@ -1,82 +0,0 @@
//! Generic support for building blocking abstractions.
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread};
use crate::time::Instant;
struct Inner {
thread: Thread,
woken: AtomicBool,
}
unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}
#[derive(Clone)]
pub struct SignalToken {
inner: Arc<Inner>,
}
pub struct WaitToken {
inner: Arc<Inner>,
}
impl !Send for WaitToken {}
impl !Sync for WaitToken {}
pub fn tokens() -> (WaitToken, SignalToken) {
let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) });
let wait_token = WaitToken { inner: inner.clone() };
let signal_token = SignalToken { inner };
(wait_token, signal_token)
}
impl SignalToken {
pub fn signal(&self) -> bool {
let wake = self
.inner
.woken
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok();
if wake {
self.inner.thread.unpark();
}
wake
}
/// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn to_raw(self) -> *mut u8 {
Arc::into_raw(self.inner) as *mut u8
}
/// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
}
}
impl WaitToken {
pub fn wait(self) {
while !self.inner.woken.load(Ordering::SeqCst) {
thread::park()
}
}
/// Returns `true` if we wake up normally.
pub fn wait_max_until(self, end: Instant) -> bool {
while !self.inner.woken.load(Ordering::SeqCst) {
let now = Instant::now();
if now >= end {
return false;
}
thread::park_timeout(end - now)
}
true
}
}

View file

@ -1,25 +0,0 @@
use crate::ops::{Deref, DerefMut};
#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(target_arch = "aarch64", repr(align(128)))]
#[cfg_attr(not(target_arch = "aarch64"), repr(align(64)))]
pub(super) struct CacheAligned<T>(pub T);
impl<T> Deref for CacheAligned<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for CacheAligned<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> CacheAligned<T> {
pub(super) fn new(t: T) -> Self {
CacheAligned(t)
}
}

View file

@ -143,175 +143,16 @@
#[cfg(all(test, not(target_os = "emscripten")))]
mod sync_tests;
// A description of how Rust's channel implementation works
//
// Channels are supposed to be the basic building block for all other
// concurrent primitives that are used in Rust. As a result, the channel type
// needs to be highly optimized, flexible, and broad enough for use everywhere.
//
// The choice of implementation of all channels is to be built on lock-free data
// structures. The channels themselves are then consequently also lock-free data
// structures. As always with lock-free code, this is a very "here be dragons"
// territory, especially because I'm unaware of any academic papers that have
// gone into great length about channels of these flavors.
//
// ## Flavors of channels
//
// From the perspective of a consumer of this library, there is only one flavor
// of channel. This channel can be used as a stream and cloned to allow multiple
// senders. Under the hood, however, there are actually three flavors of
// channels in play.
//
// * Flavor::Oneshots - these channels are highly optimized for the one-send use
// case. They contain as few atomics as possible and
// involve one and exactly one allocation.
// * Streams - these channels are optimized for the non-shared use case. They
// use a different concurrent queue that is more tailored for this
// use case. The initial allocation of this flavor of channel is not
// optimized.
// * Shared - this is the most general form of channel that this module offers,
// a channel with multiple senders. This type is as optimized as it
// can be, but the previous two types mentioned are much faster for
// their use-cases.
//
// ## Concurrent queues
//
// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
// but recv() obviously blocks. This means that under the hood there must be
// some shared and concurrent queue holding all of the actual data.
//
// With two flavors of channels, two flavors of queues are also used. We have
// chosen to use queues from a well-known author that are abbreviated as SPSC
// and MPSC (single producer, single consumer and multiple producer, single
// consumer). SPSC queues are used for streams while MPSC queues are used for
// shared channels.
//
// ### SPSC optimizations
//
// The SPSC queue found online is essentially a linked list of nodes where one
// half of the nodes are the "queue of data" and the other half of nodes are a
// cache of unused nodes. The unused nodes are used such that an allocation is
// not required on every push() and a free doesn't need to happen on every
// pop().
//
// As found online, however, the cache of nodes is of an infinite size. This
// means that if a channel at one point in its life had 50k items in the queue,
// then the queue will always have the capacity for 50k items. I believed that
// this was an unnecessary limitation of the implementation, so I have altered
// the queue to optionally have a bound on the cache size.
//
// By default, streams will have an unbounded SPSC queue with a small-ish cache
// size. The hope is that the cache is still large enough to have very fast
// send() operations while not too large such that millions of channels can
// coexist at once.
//
// ### MPSC optimizations
//
// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
// a linked list under the hood to earn its unboundedness, but I have not put
// forth much effort into having a cache of nodes similar to the SPSC queue.
//
// For now, I believe that this is "ok" because shared channels are not the most
// common type, but soon we may wish to revisit this queue choice and determine
// another candidate for backend storage of shared channels.
//
// ## Overview of the Implementation
//
// Now that there's a little background on the concurrent queues used, it's
// worth going into much more detail about the channels themselves. The basic
// pseudocode for a send/recv are:
//
//
// send(t) recv()
// queue.push(t) return if queue.pop()
// if increment() == -1 deschedule {
// wakeup() if decrement() > 0
// cancel_deschedule()
// }
// queue.pop()
//
// As mentioned before, there are no locks in this implementation, only atomic
// instructions are used.
//
// ### The internal atomic counter
//
// Every channel has a shared counter with each half to keep track of the size
// of the queue. This counter is used to abort descheduling by the receiver and
// to know when to wake up on the sending side.
//
// As seen in the pseudocode, senders will increment this count and receivers
// will decrement the count. The theory behind this is that if a sender sees a
// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
// then it doesn't need to block.
//
// The recv() method has a beginning call to pop(), and if successful, it needs
// to decrement the count. It is a crucial implementation detail that this
// decrement does *not* happen to the shared counter. If this were the case,
// then it would be possible for the counter to be very negative when there were
// no receivers waiting, in which case the senders would have to determine when
// it was actually appropriate to wake up a receiver.
//
// Instead, the "steal count" is kept track of separately (not atomically
// because it's only used by receivers), and then the decrement() call when
// descheduling will lump in all of the recent steals into one large decrement.
//
// The implication of this is that if a sender sees a -1 count, then there's
// guaranteed to be a waiter waiting!
//
// ## Native Implementation
//
// A major goal of these channels is to work seamlessly on and off the runtime.
// All of the previous race conditions have been worded in terms of
// scheduler-isms (which is obviously not available without the runtime).
//
// For now, native usage of channels (off the runtime) will fall back onto
// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
// is still entirely lock-free, the "deschedule" blocks above are surrounded by
// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
// condition variable.
//
// ## Select
//
// Being able to support selection over channels has greatly influenced this
// design, and not only does selection need to work inside the runtime, but also
// outside the runtime.
//
// The implementation is fairly straightforward. The goal of select() is not to
// return some data, but only to return which channel can receive data without
// blocking. The implementation is essentially the entire blocking procedure
// followed by an increment as soon as its woken up. The cancellation procedure
// involves an increment and swapping out of to_wake to acquire ownership of the
// thread to unblock.
//
// Sadly this current implementation requires multiple allocations, so I have
// seen the throughput of select() be much worse than it should be. I do not
// believe that there is anything fundamental that needs to change about these
// channels, however, in order to support a more efficient select().
//
// FIXME: Select is now removed, so these factors are ready to be cleaned up!
//
// # Conclusion
//
// And now that you've seen all the races that I found and attempted to fix,
// here's the code for you to find some more!
// MPSC channels are built as a wrapper around MPMC channels, which
// were ported from the `crossbeam-channel` crate. MPMC channels are
// not exposed publicly, but if you are curious about the implementation,
// that's where everything is.
use crate::cell::UnsafeCell;
use crate::error;
use crate::fmt;
use crate::mem;
use crate::sync::Arc;
use crate::sync::mpmc;
use crate::time::{Duration, Instant};
mod blocking;
mod mpsc_queue;
mod oneshot;
mod shared;
mod spsc_queue;
mod stream;
mod sync;
mod cache_aligned;
/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
/// This half can only be owned by one thread.
///
@ -341,7 +182,7 @@
#[stable(feature = "rust1", since = "1.0.0")]
#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
pub struct Receiver<T> {
inner: UnsafeCell<Flavor<T>>,
inner: mpmc::Receiver<T>,
}
// The receiver port can be sent from place to place, so long as it
@ -498,7 +339,7 @@ pub struct IntoIter<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
inner: UnsafeCell<Flavor<T>>,
inner: mpmc::Sender<T>,
}
// The send port can be sent from place to place, so long as it
@ -557,7 +398,7 @@ impl<T> !Sync for Sender<T> {}
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<sync::Packet<T>>,
inner: mpmc::Sender<T>,
}
#[stable(feature = "rust1", since = "1.0.0")]
@ -643,34 +484,6 @@ pub enum TrySendError<T> {
Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
}
enum Flavor<T> {
Oneshot(Arc<oneshot::Packet<T>>),
Stream(Arc<stream::Packet<T>>),
Shared(Arc<shared::Packet<T>>),
Sync(Arc<sync::Packet<T>>),
}
#[doc(hidden)]
trait UnsafeFlavor<T> {
fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
unsafe fn inner_mut(&self) -> &mut Flavor<T> {
&mut *self.inner_unsafe().get()
}
unsafe fn inner(&self) -> &Flavor<T> {
&*self.inner_unsafe().get()
}
}
impl<T> UnsafeFlavor<T> for Sender<T> {
fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
&self.inner
}
}
impl<T> UnsafeFlavor<T> for Receiver<T> {
fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
&self.inner
}
}
/// Creates a new asynchronous channel, returning the sender/receiver halves.
/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
/// the same order as it was sent, and no [`send`] will block the calling thread
@ -711,8 +524,8 @@ fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(oneshot::Packet::new());
(Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
let (tx, rx) = mpmc::channel();
(Sender { inner: tx }, Receiver { inner: rx })
}
/// Creates a new synchronous, bounded channel.
@ -760,8 +573,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
let a = Arc::new(sync::Packet::new(bound));
(SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
let (tx, rx) = mpmc::sync_channel(bound);
(SyncSender { inner: tx }, Receiver { inner: rx })
}
////////////////////////////////////////////////////////////////////////////////
@ -769,10 +582,6 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
////////////////////////////////////////////////////////////////////////////////
impl<T> Sender<T> {
fn new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: UnsafeCell::new(inner) }
}
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
@ -802,40 +611,7 @@ fn new(inner: Flavor<T>) -> Sender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
if !p.sent() {
return p.send(t).map_err(SendError);
} else {
let a = Arc::new(stream::Packet::new());
let rx = Receiver::new(Flavor::Stream(a.clone()));
match p.upgrade(rx) {
oneshot::UpSuccess => {
let ret = a.send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
a.send(t).ok().unwrap();
token.signal();
(a, Ok(()))
}
}
}
}
Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
Flavor::Sync(..) => unreachable!(),
};
unsafe {
let tmp = Sender::new(Flavor::Stream(new_inner));
mem::swap(self.inner_mut(), tmp.inner_mut());
}
ret.map_err(SendError)
self.inner.send(t)
}
}
@ -847,57 +623,14 @@ impl<T> Clone for Sender<T> {
/// (including the original) need to be dropped in order for
/// [`Receiver::recv`] to stop blocking.
fn clone(&self) -> Sender<T> {
let packet = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
let sleeper = match p.upgrade(rx) {
oneshot::UpSuccess | oneshot::UpDisconnected => None,
oneshot::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Stream(ref p) => {
let a = Arc::new(shared::Packet::new());
{
let guard = a.postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
let sleeper = match p.upgrade(rx) {
stream::UpSuccess | stream::UpDisconnected => None,
stream::UpWoke(task) => Some(task),
};
a.inherit_blocker(sleeper, guard);
}
a
}
Flavor::Shared(ref p) => {
p.clone_chan();
return Sender::new(Flavor::Shared(p.clone()));
}
Flavor::Sync(..) => unreachable!(),
};
unsafe {
let tmp = Sender::new(Flavor::Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
}
Sender::new(Flavor::Shared(packet))
Sender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_chan(),
Flavor::Stream(ref p) => p.drop_chan(),
Flavor::Shared(ref p) => p.drop_chan(),
Flavor::Sync(..) => unreachable!(),
}
let _ = self.inner;
}
}
@ -913,10 +646,6 @@ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
////////////////////////////////////////////////////////////////////////////////
impl<T> SyncSender<T> {
fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
SyncSender { inner }
}
/// Sends a value on this synchronous channel.
///
/// This function will *block* until space in the internal buffer becomes
@ -955,7 +684,7 @@ fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.inner.send(t).map_err(SendError)
self.inner.send(t)
}
/// Attempts to send a value on this channel without blocking.
@ -1016,15 +745,14 @@ pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
self.inner.clone_chan();
SyncSender::new(self.inner.clone())
SyncSender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
self.inner.drop_chan();
let _ = self.inner;
}
}
@ -1040,10 +768,6 @@ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
////////////////////////////////////////////////////////////////////////////////
impl<T> Receiver<T> {
fn new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: UnsafeCell::new(inner) }
}
/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
@ -1069,35 +793,7 @@ fn new(inner: Flavor<T>) -> Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => match p.try_recv() {
Ok(t) => return Ok(t),
Err(oneshot::Empty) => return Err(TryRecvError::Empty),
Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
Err(oneshot::Upgraded(rx)) => rx,
},
Flavor::Stream(ref p) => match p.try_recv() {
Ok(t) => return Ok(t),
Err(stream::Empty) => return Err(TryRecvError::Empty),
Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
Err(stream::Upgraded(rx)) => rx,
},
Flavor::Shared(ref p) => match p.try_recv() {
Ok(t) => return Ok(t),
Err(shared::Empty) => return Err(TryRecvError::Empty),
Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
},
Flavor::Sync(ref p) => match p.try_recv() {
Ok(t) => return Ok(t),
Err(sync::Empty) => return Err(TryRecvError::Empty),
Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
},
};
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
self.inner.try_recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@ -1156,31 +852,7 @@ pub fn try_recv(&self) -> Result<T, TryRecvError> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn recv(&self) -> Result<T, RecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(RecvError),
Err(oneshot::Upgraded(rx)) => rx,
Err(oneshot::Empty) => unreachable!(),
},
Flavor::Stream(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(RecvError),
Err(stream::Upgraded(rx)) => rx,
Err(stream::Empty) => unreachable!(),
},
Flavor::Shared(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(RecvError),
Err(shared::Empty) => unreachable!(),
},
Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
};
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
self.inner.recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@ -1268,17 +940,7 @@ pub fn recv(&self) -> Result<T, RecvError> {
/// ```
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
// Do an optimistic try_recv to avoid the performance impact of
// Instant::now() in the full-channel case.
match self.try_recv() {
Ok(result) => Ok(result),
Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
// So far in the future that it's practically the same as waiting indefinitely.
None => self.recv().map_err(RecvTimeoutError::from),
},
}
self.inner.recv_timeout(timeout)
}
/// Attempts to wait for a value on this receiver, returning an error if the
@ -1339,46 +1001,7 @@ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
/// ```
#[unstable(feature = "deadline_api", issue = "46316")]
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
use self::RecvTimeoutError::*;
loop {
let port_or_empty = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(Disconnected),
Err(oneshot::Upgraded(rx)) => Some(rx),
Err(oneshot::Empty) => None,
},
Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(Disconnected),
Err(stream::Upgraded(rx)) => Some(rx),
Err(stream::Empty) => None,
},
Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(Disconnected),
Err(shared::Empty) => None,
},
Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
Ok(t) => return Ok(t),
Err(sync::Disconnected) => return Err(Disconnected),
Err(sync::Empty) => None,
},
};
if let Some(new_port) = port_or_empty {
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
// If we're already passed the deadline, and we're here without
// data, return a timeout, else try again.
if Instant::now() >= deadline {
return Err(Timeout);
}
}
self.inner.recv_deadline(deadline)
}
/// Returns an iterator that will block waiting for messages, but never
@ -1500,12 +1123,7 @@ fn into_iter(self) -> IntoIter<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => p.drop_port(),
Flavor::Stream(ref p) => p.drop_port(),
Flavor::Shared(ref p) => p.drop_port(),
Flavor::Sync(ref p) => p.drop_port(),
}
let _ = self.inner;
}
}

View file

@ -1,124 +0,0 @@
//! A mostly lock-free multi-producer, single consumer queue.
//!
//! This module contains an implementation of a concurrent MPSC queue. This
//! queue can be used to share data between threads, and is also used as the
//! building block of channels in rust.
//!
//! Note that the current implementation of this queue has a caveat of the `pop`
//! method, and see the method for more information about it. Due to this
//! caveat, this queue might not be appropriate for all use-cases.
// The original implementation is based off:
// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
//
// Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
// http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
//
// The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
// of today the license of this file is the same as the rest of the codebase:
// https://github.com/rust-lang/rust/pull/42149
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests;
pub use self::PopResult::*;
use core::cell::UnsafeCell;
use core::ptr;
use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, Ordering};
/// A result of the `pop` function.
pub enum PopResult<T> {
/// Some data has been popped
Data(T),
/// The queue is empty
Empty,
/// The queue is in an inconsistent state. Popping data should succeed, but
/// some pushers have yet to make enough progress in order allow a pop to
/// succeed. It is recommended that a pop() occur "in the near future" in
/// order to see if the sender has made progress or not
Inconsistent,
}
struct Node<T> {
next: AtomicPtr<Node<T>>,
value: Option<T>,
}
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
pub struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Node<T> {
Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
}
}
impl<T> Queue<T> {
/// Creates a new queue that is safe to share among multiple producers and
/// one consumer.
pub fn new() -> Queue<T> {
let stub = unsafe { Node::new(None) };
Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
/// Pushes a new value onto this queue.
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));
let prev = self.head.swap(n, Ordering::AcqRel);
(*prev).next.store(n, Ordering::Release);
}
}
/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option<T>`. It is possible for this queue to be in an
/// inconsistent state where many pushes have succeeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
pub fn pop(&self) -> PopResult<T> {
unsafe {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() {
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
let _: Box<Node<T>> = Box::from_raw(tail);
return Data(ret);
}
if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _: Box<Node<T>> = Box::from_raw(cur);
cur = next;
}
}
}
}

View file

@ -1,47 +0,0 @@
use super::{Data, Empty, Inconsistent, Queue};
use crate::sync::mpsc::channel;
use crate::sync::Arc;
use crate::thread;
#[test]
fn test_full() {
let q: Queue<Box<_>> = Queue::new();
q.push(Box::new(1));
q.push(Box::new(2));
}
#[test]
fn test() {
let nthreads = 8;
let nmsgs = if cfg!(miri) { 100 } else { 1000 };
let q = Queue::new();
match q.pop() {
Empty => {}
Inconsistent | Data(..) => panic!(),
}
let (tx, rx) = channel();
let q = Arc::new(q);
for _ in 0..nthreads {
let tx = tx.clone();
let q = q.clone();
thread::spawn(move || {
for i in 0..nmsgs {
q.push(i);
}
tx.send(()).unwrap();
});
}
let mut i = 0;
while i < nthreads * nmsgs {
match q.pop() {
Empty | Inconsistent => {}
Data(_) => i += 1,
}
}
drop(tx);
for _ in 0..nthreads {
rx.recv().unwrap();
}
}

View file

@ -1,315 +0,0 @@
/// Oneshot channels/ports
///
/// This is the initial flavor of channels/ports used for comm module. This is
/// an optimization for the one-use case of a channel. The major optimization of
/// this type is to have one and exactly one allocation when the chan/port pair
/// is created.
///
/// Another possible optimization would be to not use an Arc box because
/// in theory we know when the shared packet can be deallocated (no real need
/// for the atomic reference counting), but I was having trouble how to destroy
/// the data early in a drop of a Port.
///
/// # Implementation
///
/// Oneshots are implemented around one atomic usize variable. This variable
/// indicates both the state of the port/chan but also contains any threads
/// blocked on the port. All atomic operations happen on this one word.
///
/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
/// on behalf of the channel side of things (it can be mentally thought of as
/// consuming the port). This upgrade is then also stored in the shared packet.
/// The one caveat to consider is that when a port sees a disconnected channel
/// it must check for data because there is no "data plus upgrade" state.
pub use self::Failure::*;
use self::MyUpgrade::*;
pub use self::UpgradeResult::*;
use crate::cell::UnsafeCell;
use crate::ptr;
use crate::sync::atomic::{AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::Receiver;
use crate::time::Instant;
// Various states you can find a port in.
const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
// Any other value represents a pointer to a SignalToken value. The
// protocol ensures that when the state moves *to* a pointer,
// ownership of the token is given to the packet, and when the state
// moves *from* a pointer, ownership of the token is transferred to
// whoever changed the state.
pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked thread as well)
state: AtomicPtr<u8>,
// One-shot data slot location
data: UnsafeCell<Option<T>>,
// when used for the second time, a oneshot channel must be upgraded, and
// this contains the slot for the upgrade
upgrade: UnsafeCell<MyUpgrade<T>>,
}
pub enum Failure<T> {
Empty,
Disconnected,
Upgraded(Receiver<T>),
}
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
UpWoke(SignalToken),
}
enum MyUpgrade<T> {
NothingSent,
SendUsed,
GoUp(Receiver<T>),
}
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
data: UnsafeCell::new(None),
upgrade: UnsafeCell::new(NothingSent),
state: AtomicPtr::new(EMPTY),
}
}
pub fn send(&self, t: T) -> Result<(), T> {
unsafe {
// Sanity check
match *self.upgrade.get() {
NothingSent => {}
_ => panic!("sending on a oneshot that's already sent on "),
}
assert!((*self.data.get()).is_none());
ptr::write(self.data.get(), Some(t));
ptr::write(self.upgrade.get(), SendUsed);
match self.state.swap(DATA, Ordering::SeqCst) {
// Sent the data, no one was waiting
EMPTY => Ok(()),
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.state.swap(DISCONNECTED, Ordering::SeqCst);
ptr::write(self.upgrade.get(), NothingSent);
Err((&mut *self.data.get()).take().unwrap())
}
// Not possible, these are one-use channels
DATA => unreachable!(),
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => {
SignalToken::from_raw(ptr).signal();
Ok(())
}
}
}
}
// Just tests whether this channel has been sent on or not, this is only
// safe to use from the sender.
pub fn sent(&self) -> bool {
unsafe { !matches!(*self.upgrade.get(), NothingSent) }
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Attempt to not block the thread (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(Ordering::SeqCst) == EMPTY {
let (wait_token, signal_token) = blocking::tokens();
let ptr = unsafe { signal_token.to_raw() };
// race with senders to enter the blocking state
if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
// Try to reset the state
if timed_out {
self.abort_selection().map_err(Upgraded)?;
}
} else {
wait_token.wait();
debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
}
} else {
// drop the signal token, since we never blocked
drop(unsafe { SignalToken::from_raw(ptr) });
}
}
self.try_recv()
}
pub fn try_recv(&self) -> Result<T, Failure<T>> {
unsafe {
match self.state.load(Ordering::SeqCst) {
EMPTY => Err(Empty),
// We saw some data on the channel, but the channel can be used
// again to send us an upgrade. As a result, we need to re-insert
// into the channel that there's no data available (otherwise we'll
// just see DATA next time). This is done as a cmpxchg because if
// the state changes under our feet we'd rather just see that state
// change.
DATA => {
let _ = self.state.compare_exchange(
DATA,
EMPTY,
Ordering::SeqCst,
Ordering::SeqCst,
);
match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => unreachable!(),
}
}
// There's no guarantee that we receive before an upgrade happens,
// and an upgrade flags the channel as disconnected, so when we see
// this we first need to check if there's data available and *then*
// we go through and process the upgrade.
DISCONNECTED => match (&mut *self.data.get()).take() {
Some(data) => Ok(data),
None => match ptr::replace(self.upgrade.get(), SendUsed) {
SendUsed | NothingSent => Err(Disconnected),
GoUp(upgrade) => Err(Upgraded(upgrade)),
},
},
// We are the sole receiver; there cannot be a blocking
// receiver already.
_ => unreachable!(),
}
}
}
// Returns whether the upgrade was completed. If the upgrade wasn't
// completed, then the port couldn't get sent to the other half (it will
// never receive it).
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
unsafe {
let prev = match *self.upgrade.get() {
NothingSent => NothingSent,
SendUsed => SendUsed,
_ => panic!("upgrading again"),
};
ptr::write(self.upgrade.get(), GoUp(up));
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// If the channel is empty or has data on it, then we're good to go.
// Senders will check the data before the upgrade (in case we
// plastered over the DATA state).
DATA | EMPTY => UpSuccess,
// If the other end is already disconnected, then we failed the
// upgrade. Be sure to trash the port we were given.
DISCONNECTED => {
ptr::replace(self.upgrade.get(), prev);
UpDisconnected
}
// If someone's waiting, we gotta wake them up
ptr => UpWoke(SignalToken::from_raw(ptr)),
}
}
}
pub fn drop_chan(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
DATA | DISCONNECTED | EMPTY => {}
// If someone's waiting, we gotta wake them up
ptr => unsafe {
SignalToken::from_raw(ptr).signal();
},
}
}
pub fn drop_port(&self) {
match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
// An empty channel has nothing to do, and a remotely disconnected
// channel also has nothing to do b/c we're about to run the drop
// glue
DISCONNECTED | EMPTY => {}
// There's data on the channel, so make sure we destroy it promptly.
// This is why not using an arc is a little difficult (need the box
// to stay valid while we take the data).
DATA => unsafe {
(&mut *self.data.get()).take().unwrap();
},
// We're the only ones that can block on this port
_ => unreachable!(),
}
}
////////////////////////////////////////////////////////////////////////////
// select implementation
////////////////////////////////////////////////////////////////////////////
// Remove a previous selecting thread from this port. This ensures that the
// blocked thread will no longer be visible to any other threads.
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
let state = match self.state.load(Ordering::SeqCst) {
// Each of these states means that no further activity will happen
// with regard to abortion selection
s @ (EMPTY | DATA | DISCONNECTED) => s,
// If we've got a blocked thread, then use an atomic to gain ownership
// of it (may fail)
ptr => self
.state
.compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|x| x),
};
// Now that we've got ownership of our state, figure out what to do
// about it.
match state {
EMPTY => unreachable!(),
// our thread used for select was stolen
DATA => Ok(true),
// If the other end has hung up, then we have complete ownership
// of the port. First, check if there was data waiting for us. This
// is possible if the other end sent something and then hung up.
//
// We then need to check to see if there was an upgrade requested,
// and if so, the upgraded port needs to have its selection aborted.
DISCONNECTED => unsafe {
if (*self.data.get()).is_some() {
Ok(true)
} else {
match ptr::replace(self.upgrade.get(), SendUsed) {
GoUp(port) => Err(port),
_ => Ok(true),
}
}
},
// We woke ourselves up from select.
ptr => unsafe {
drop(SignalToken::from_raw(ptr));
Ok(false)
},
}
}
}
impl<T> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
}
}

View file

@ -1,501 +0,0 @@
/// Shared channels.
///
/// This is the flavor of channels which are not necessarily optimized for any
/// particular use case, but are the most general in how they are used. Shared
/// channels are cloneable allowing for multiple senders.
///
/// High level implementation details can be found in the comment of the parent
/// module. You'll also note that the implementation of the shared and stream
/// channels are quite similar, and this is no coincidence!
pub use self::Failure::*;
use self::StartResult::*;
use core::cmp;
use core::intrinsics::abort;
use crate::cell::UnsafeCell;
use crate::ptr;
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::mpsc_queue as mpsc;
use crate::sync::{Mutex, MutexGuard};
use crate::thread;
use crate::time::Instant;
const DISCONNECTED: isize = isize::MIN;
const FUDGE: isize = 1024;
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
#[cfg(test)]
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: AtomicIsize, // How many items are on this channel
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicPtr<u8>, // SignalToken for wake up
// The number of channels which are currently using this packet.
channels: AtomicUsize,
// See the discussion in Port::drop and the channel send methods for what
// these are used for
port_dropped: AtomicBool,
sender_drain: AtomicIsize,
// this lock protects various portions of this implementation during
// select()
select_lock: Mutex<()>,
}
pub enum Failure {
Empty,
Disconnected,
}
#[derive(PartialEq, Eq)]
enum StartResult {
Installed,
Abort,
}
impl<T> Packet<T> {
// Creation of a packet *must* be followed by a call to postinit_lock
// and later by inherit_blocker
pub fn new() -> Packet<T> {
Packet {
queue: mpsc::Queue::new(),
cnt: AtomicIsize::new(0),
steals: UnsafeCell::new(0),
to_wake: AtomicPtr::new(EMPTY),
channels: AtomicUsize::new(2),
port_dropped: AtomicBool::new(false),
sender_drain: AtomicIsize::new(0),
select_lock: Mutex::new(()),
}
}
// This function should be used after newly created Packet
// was wrapped with an Arc
// In other case mutex data will be duplicated while cloning
// and that could cause problems on platforms where it is
// represented by opaque data structure
pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
self.select_lock.lock().unwrap()
}
// This function is used at the creation of a shared packet to inherit a
// previously blocked thread. This is done to prevent spurious wakeups of
// threads in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
if let Some(token) = token {
assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
self.cnt.store(-1, Ordering::SeqCst);
// This store is a little sketchy. What's happening here is that
// we're transferring a blocker from a oneshot or stream channel to
// this shared channel. In doing so, we never spuriously wake them
// up and rather only wake them up at the appropriate time. This
// implementation of shared channels assumes that any blocking
// recv() will undo the increment of steals performed in try_recv()
// once the recv is complete. This thread that we're inheriting,
// however, is not in the middle of recv. Hence, the first time we
// wake them up, they're going to wake up from their old port, move
// on to the upgraded port, and then call the block recv() function.
//
// When calling this function, they'll find there's data immediately
// available, counting it as a steal. This in fact wasn't a steal
// because we appropriately blocked them waiting for data.
//
// To offset this bad increment, we initially set the steal count to
// -1. You'll find some special code in abort_selection() as well to
// ensure that this -1 steal count doesn't escape too far.
unsafe {
*self.steals.get() = -1;
}
}
// When the shared packet is constructed, we grabbed this lock. The
// purpose of this lock is to ensure that abort_selection() doesn't
// interfere with this method. After we unlock this lock, we're
// signifying that we're done modifying self.cnt and self.to_wake and
// the port is ready for the world to continue using it.
drop(guard);
}
pub fn send(&self, t: T) -> Result<(), T> {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) {
return Err(t);
}
// Note that the multiple sender case is a little trickier
// semantically than the single sender case. The logic for
// incrementing is "add and if disconnected store disconnected".
// This could end up leading some senders to believe that there
// wasn't a disconnect if in fact there was a disconnect. This means
// that while one thread is attempting to re-store the disconnected
// states, other threads could walk through merrily incrementing
// this very-negative disconnected count. To prevent senders from
// spuriously attempting to send when the channels is actually
// disconnected, the count has a ranged check here.
//
// This is also done for another reason. Remember that the return
// value of this function is:
//
// `true` == the data *may* be received, this essentially has no
// meaning
// `false` == the data will *never* be received, this has a lot of
// meaning
//
// In the SPSC case, we have a check of 'queue.is_empty()' to see
// whether the data was actually received, but this same condition
// means nothing in a multi-producer context. As a result, this
// preflight check serves as the definitive "this will never be
// received". Once we get beyond this check, we have permanently
// entered the realm of "this may be received"
if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
return Err(t);
}
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
// In this case, we have possibly failed to send our data, and
// we need to consider re-popping the data in order to fully
// destroy it. We must arbitrate among the multiple senders,
// however, because the queues that we're using are
// single-consumer queues. In order to do this, all exiting
// pushers will use an atomic count in order to count those
// flowing through. Pushers who see 0 are required to drain as
// much as possible, and then can only exit when they are the
// only pusher (otherwise they must try again).
n if n < DISCONNECTED + FUDGE => {
// see the comment in 'try' for a shared channel for why this
// window of "not disconnected" is ok.
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
loop {
// drain the queue, for info on the thread yield see the
// discussion in try_recv
loop {
match self.queue.pop() {
mpsc::Data(..) => {}
mpsc::Empty => break,
mpsc::Inconsistent => thread::yield_now(),
}
}
// maybe we're done, if we're not the last ones
// here, then we need to go try again.
if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
break;
}
}
// At this point, there may still be data on the queue,
// but only if the count hasn't been incremented and
// some other sender hasn't finished pushing data just
// yet. That sender in question will drain its own data.
}
}
// Can't make any assumptions about this case like in the SPSC case.
_ => {}
}
Ok(())
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
// This code is essentially the exact same as that found in the stream
// case (see stream.rs)
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
// Essentially the exact same thing as the stream decrement function.
// Returns true if blocking should proceed.
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(
self.to_wake.load(Ordering::SeqCst),
EMPTY,
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
);
let ptr = token.to_raw();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 {
return Installed;
}
}
}
self.to_wake.store(EMPTY, Ordering::SeqCst);
drop(SignalToken::from_raw(ptr));
Abort
}
}
pub fn try_recv(&self) -> Result<T, Failure> {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
// This is a bit of an interesting case. The channel is reported as
// having data available, but our pop() has failed due to the queue
// being in an inconsistent state. This means that there is some
// pusher somewhere which has yet to complete, but we are guaranteed
// that a pop will eventually succeed. In this case, we spin in a
// yield loop because the remote sender should finish their enqueue
// operation "very quickly".
//
// Avoiding this yield loop would require a different queue
// abstraction which provides the guarantee that after M pushes have
// succeeded, at least M pops will succeed. The current queues
// guarantee that if there are N active pushes, you can pop N times
// once all N have finished.
mpsc::Inconsistent => {
let data;
loop {
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => {
data = t;
break;
}
mpsc::Empty => panic!("inconsistent => empty"),
mpsc::Inconsistent => {}
}
}
Some(data)
}
};
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.steals.get() >= 0);
}
*self.steals.get() += 1;
Ok(data)
},
// See the discussion in the stream implementation for why we try
// again.
None => {
match self.cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
_ => {
match self.queue.pop() {
mpsc::Data(t) => Ok(t),
mpsc::Empty => Err(Disconnected),
// with no senders, an inconsistency is impossible.
mpsc::Inconsistent => unreachable!(),
}
}
}
}
}
}
// Prepares this shared packet for a channel clone, essentially just bumping
// a refcount.
pub fn clone_chan(&self) {
let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
// See comments on Arc::clone() on why we do this (for `mem::forget`).
if old_count > MAX_REFCOUNT {
abort();
}
}
// Decrement the reference count on a channel. This is called whenever a
// Chan is dropped and may end up waking up a receiver. It's the receiver's
// responsibility on the other end to figure out that we've disconnected.
pub fn drop_chan(&self) {
match self.channels.fetch_sub(1, Ordering::SeqCst) {
1 => {}
n if n > 1 => return,
n => panic!("bad number of channels left {n}"),
}
match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
DISCONNECTED => {}
n => {
assert!(n >= 0);
}
}
}
// See the long discussion inside of stream.rs for why the queue is drained,
// and why it is done in this fashion.
pub fn drop_port(&self) {
self.port_dropped.store(true, Ordering::SeqCst);
let mut steals = unsafe { *self.steals.get() };
while {
match self.cnt.compare_exchange(
steals,
DISCONNECTED,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => false,
Err(old) => old != DISCONNECTED,
}
} {
// See the discussion in 'try_recv' for why we yield
// control of this thread.
loop {
match self.queue.pop() {
mpsc::Data(..) => {
steals += 1;
}
mpsc::Empty | mpsc::Inconsistent => break,
}
}
}
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(EMPTY, Ordering::SeqCst);
assert!(ptr != EMPTY);
unsafe { SignalToken::from_raw(ptr) }
}
////////////////////////////////////////////////////////////////////////////
// select implementation
////////////////////////////////////////////////////////////////////////////
// increment the count on the channel (used for selection)
fn bump(&self, amt: isize) -> isize {
match self.cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
DISCONNECTED
}
n => n,
}
}
// Cancels a previous thread waiting on this port, returning whether there's
// data on the port.
//
// This is similar to the stream implementation (hence fewer comments), but
// uses a different value for the "steals" variable.
pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
// Before we do anything else, we bounce on this lock. The reason for
// doing this is to ensure that any upgrade-in-progress is gone and
// done with. Without this bounce, we can race with inherit_blocker
// about looking at and dealing with to_wake. Once we have acquired the
// lock, we are guaranteed that inherit_blocker is done.
{
let _guard = self.select_lock.lock().unwrap();
}
// Like the stream implementation, we want to make sure that the count
// on the channel goes non-negative. We don't know how negative the
// stream currently is, so instead of using a steal value of 1, we load
// the channel count and figure out what we should do to make it
// positive.
let steals = {
let cnt = self.cnt.load(Ordering::SeqCst);
if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
};
let prev = self.bump(steals + 1);
if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
true
} else {
let cur = prev + steals + 1;
assert!(cur >= 0);
if prev < 0 {
drop(self.take_to_wake());
} else {
while self.to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
unsafe {
// if the number of steals is -1, it was the pre-emptive -1 steal
// count from when we inherited a blocker. This is fine because
// we're just going to overwrite it with a real value.
let old = self.steals.get();
assert!(*old == 0 || *old == -1);
*old = steals;
prev >= 0
}
}
}
}
impl<T> Drop for Packet<T> {
fn drop(&mut self) {
// Note that this load is not only an assert for correctness about
// disconnection, but also a proper fence before the read of
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
}
}

View file

@ -1,244 +0,0 @@
//! A single-producer single-consumer concurrent queue
//!
//! This module contains the implementation of an SPSC queue which can be used
//! concurrently between two threads. This data structure is safe to use and
//! enforces the semantics that there is one pusher and one popper.
// The original implementation is based off:
// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
//
// Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
// http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
//
// The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
// of today the license of this file is the same as the rest of the codebase:
// https://github.com/rust-lang/rust/pull/42149
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests;
use core::cell::UnsafeCell;
use core::ptr;
use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use super::cache_aligned::CacheAligned;
// Node within the linked list queue of messages to send
struct Node<T> {
// FIXME: this could be an uninitialized T if we're careful enough, and
// that would reduce memory usage (and be a bit faster).
// is it worth it?
value: Option<T>, // nullable for re-use of nodes
cached: bool, // This node goes into the node cache
next: AtomicPtr<Node<T>>, // next node in the queue
}
/// The single-producer single-consumer queue. This structure is not cloneable,
/// but it can be safely shared in an Arc if it is guaranteed that there
/// is only one popper and one pusher touching the queue at any one point in
/// time.
pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
// consumer fields
consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
// producer fields
producer: CacheAligned<Producer<T, ProducerAddition>>,
}
struct Consumer<T, Addition> {
tail: UnsafeCell<*mut Node<T>>, // where to pop from
tail_prev: AtomicPtr<Node<T>>, // where to pop from
cache_bound: usize, // maximum cache size
cached_nodes: AtomicUsize, // number of nodes marked as cacheable
addition: Addition,
}
struct Producer<T, Addition> {
head: UnsafeCell<*mut Node<T>>, // where to push to
first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
addition: Addition,
}
unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
impl<T> Node<T> {
fn new() -> *mut Node<T> {
Box::into_raw(box Node {
value: None,
cached: false,
next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
})
}
}
impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
/// Creates a new queue. With given additional elements in the producer and
/// consumer portions of the queue.
///
/// Due to the performance implications of cache-contention,
/// we wish to keep fields used mainly by the producer on a separate cache
/// line than those used by the consumer.
/// Since cache lines are usually 64 bytes, it is unreasonably expensive to
/// allocate one for small fields, so we allow users to insert additional
/// fields into the cache lines already allocated by this for the producer
/// and consumer.
///
/// This is unsafe as the type system doesn't enforce a single
/// consumer-producer relationship. It also allows the consumer to `pop`
/// items while there is a `peek` active due to all methods having a
/// non-mutable receiver.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked
/// list, and this means that a push is always a malloc. In
/// order to amortize this cost, an internal cache of nodes is
/// maintained to prevent a malloc from always being
/// necessary. This bound is the limit on the size of the
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub unsafe fn with_additions(
bound: usize,
producer_addition: ProducerAddition,
consumer_addition: ConsumerAddition,
) -> Self {
let n1 = Node::new();
let n2 = Node::new();
(*n1).next.store(n2, Ordering::Relaxed);
Queue {
consumer: CacheAligned::new(Consumer {
tail: UnsafeCell::new(n2),
tail_prev: AtomicPtr::new(n1),
cache_bound: bound,
cached_nodes: AtomicUsize::new(0),
addition: consumer_addition,
}),
producer: CacheAligned::new(Producer {
head: UnsafeCell::new(n2),
first: UnsafeCell::new(n1),
tail_copy: UnsafeCell::new(n1),
addition: producer_addition,
}),
}
}
/// Pushes a new value onto this queue. Note that to use this function
/// safely, it must be externally guaranteed that there is only one pusher.
pub fn push(&self, t: T) {
unsafe {
// Acquire a node (which either uses a cached one or allocates a new
// one), and then append this to the 'head' node.
let n = self.alloc();
assert!((*n).value.is_none());
(*n).value = Some(t);
(*n).next.store(ptr::null_mut(), Ordering::Relaxed);
(**self.producer.head.get()).next.store(n, Ordering::Release);
*(&self.producer.head).get() = n;
}
}
unsafe fn alloc(&self) -> *mut Node<T> {
// First try to see if we can consume the 'first' node for our uses.
if *self.producer.first.get() != *self.producer.tail_copy.get() {
let ret = *self.producer.first.get();
*self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If the above fails, then update our copy of the tail and try
// again.
*self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
if *self.producer.first.get() != *self.producer.tail_copy.get() {
let ret = *self.producer.first.get();
*self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If all of that fails, then we have to allocate a new node
// (there's nothing in the node cache).
Node::new()
}
/// Attempts to pop a value from this queue. Remember that to use this type
/// safely you must ensure that there is only one popper at a time.
pub fn pop(&self) -> Option<T> {
unsafe {
// The `tail` node is not actually a used node, but rather a
// sentinel from where we should start popping from. Hence, look at
// tail's next field and see if we can use it. If we do a pop, then
// the current tail node is a candidate for going into the cache.
let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() {
return None;
}
assert!((*next).value.is_some());
let ret = (*next).value.take();
*self.consumer.0.tail.get() = next;
if self.consumer.cache_bound == 0 {
self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
(*tail).cached = true;
}
if (*tail).cached {
self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
(*self.consumer.tail_prev.load(Ordering::Relaxed))
.next
.store(next, Ordering::Relaxed);
// We have successfully erased all references to 'tail', so
// now we can safely drop it.
let _: Box<Node<T>> = Box::from_raw(tail);
}
}
ret
}
}
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// has no data currently
///
/// # Warning
/// The reference returned is invalid if it is not used before the consumer
/// pops the value off the queue. If the producer then pushes another value
/// onto the queue, it will overwrite the value pointed to by the reference.
pub fn peek(&self) -> Option<&mut T> {
// This is essentially the same as above with all the popping bits
// stripped out.
unsafe {
let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() { None } else { (*next).value.as_mut() }
}
}
pub fn producer_addition(&self) -> &ProducerAddition {
&self.producer.addition
}
pub fn consumer_addition(&self) -> &ConsumerAddition {
&self.consumer.addition
}
}
impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
fn drop(&mut self) {
unsafe {
let mut cur = *self.producer.first.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _n: Box<Node<T>> = Box::from_raw(cur);
cur = next;
}
}
}
}

View file

@ -1,102 +0,0 @@
use super::Queue;
use crate::sync::mpsc::channel;
use crate::sync::Arc;
use crate::thread;
#[test]
fn smoke() {
unsafe {
let queue = Queue::with_additions(0, (), ());
queue.push(1);
queue.push(2);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);
queue.push(3);
queue.push(4);
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), None);
}
}
#[test]
fn peek() {
unsafe {
let queue = Queue::with_additions(0, (), ());
queue.push(vec![1]);
// Ensure the borrowchecker works
match queue.peek() {
Some(vec) => {
assert_eq!(&*vec, &[1]);
}
None => unreachable!(),
}
match queue.pop() {
Some(vec) => {
assert_eq!(&*vec, &[1]);
}
None => unreachable!(),
}
}
}
#[test]
fn drop_full() {
unsafe {
let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
q.push(Box::new(1));
q.push(Box::new(2));
}
}
#[test]
fn smoke_bound() {
unsafe {
let q = Queue::with_additions(0, (), ());
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
}
}
#[test]
fn stress() {
unsafe {
stress_bound(0);
stress_bound(1);
}
unsafe fn stress_bound(bound: usize) {
let count = if cfg!(miri) { 1000 } else { 100000 };
let q = Arc::new(Queue::with_additions(bound, (), ()));
let (tx, rx) = channel();
let q2 = q.clone();
let _t = thread::spawn(move || {
for _ in 0..count {
loop {
match q2.pop() {
Some(1) => break,
Some(_) => panic!(),
None => {}
}
}
}
tx.send(()).unwrap();
});
for _ in 0..count {
q.push(1);
}
rx.recv().unwrap();
}
}

View file

@ -1,457 +0,0 @@
/// Stream channels
///
/// This is the flavor of channels which are optimized for one sender and one
/// receiver. The sender will be upgraded to a shared channel if the channel is
/// cloned.
///
/// High level implementation details can be found in the comment of the parent
/// module.
pub use self::Failure::*;
use self::Message::*;
pub use self::UpgradeResult::*;
use core::cmp;
use crate::cell::UnsafeCell;
use crate::ptr;
use crate::thread;
use crate::time::Instant;
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::spsc_queue as spsc;
use crate::sync::mpsc::Receiver;
const DISCONNECTED: isize = isize::MIN;
#[cfg(test)]
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
pub struct Packet<T> {
// internal queue for all messages
queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
}
struct ProducerAddition {
cnt: AtomicIsize, // How many items are on this channel
to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
}
struct ConsumerAddition {
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
}
pub enum Failure<T> {
Empty,
Disconnected,
Upgraded(Receiver<T>),
}
pub enum UpgradeResult {
UpSuccess,
UpDisconnected,
UpWoke(SignalToken),
}
// Any message could contain an "upgrade request" to a new shared port, so the
// internal queue it's a queue of T, but rather Message<T>
enum Message<T> {
Data(T),
GoUp(Receiver<T>),
}
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
queue: unsafe {
spsc::Queue::with_additions(
128,
ProducerAddition {
cnt: AtomicIsize::new(0),
to_wake: AtomicPtr::new(EMPTY),
port_dropped: AtomicBool::new(false),
},
ConsumerAddition { steals: UnsafeCell::new(0) },
)
},
}
}
pub fn send(&self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
return Err(t);
}
match self.do_send(Data(t)) {
UpSuccess | UpDisconnected => {}
UpWoke(token) => {
token.signal();
}
}
Ok(())
}
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
return UpDisconnected;
}
self.do_send(GoUp(up))
}
fn do_send(&self, t: Message<T>) -> UpgradeResult {
self.queue.push(t);
match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
-1 => UpWoke(self.take_to_wake()),
// As described before, SPSC queues must be >= -2
-2 => UpSuccess,
// Be sure to preserve the disconnected state, and the return value
// in this case is going to be whether our data was received or not.
// This manifests itself on whether we have an empty queue or not.
//
// Primarily, are required to drain the queue here because the port
// will never remove this data. We can only have at most one item to
// drain (the port drains the rest).
DISCONNECTED => {
self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
let first = self.queue.pop();
let second = self.queue.pop();
assert!(second.is_none());
match first {
Some(..) => UpSuccess, // we failed to send the data
None => UpDisconnected, // we successfully sent data
}
}
// Otherwise we just sent some data on a non-waiting queue, so just
// make sure the world is sane and carry on!
n => {
assert!(n >= 0);
UpSuccess
}
}
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
assert!(ptr != EMPTY);
unsafe { SignalToken::from_raw(ptr) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
let ptr = unsafe { token.to_raw() };
self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 {
return Ok(());
}
}
}
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
Err(unsafe { SignalToken::from_raw(ptr) })
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// Optimistic preflight check (scheduling is expensive).
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
// Welp, our channel has no data. Deschedule the current thread and
// initiate the blocking protocol.
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token).is_ok() {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
// Messages which actually popped from the queue shouldn't count as
// a steal, so offset the decrement here (we already have our
// "steal" factored into the channel count above).
data @ (Ok(..) | Err(Upgraded(..))) => unsafe {
*self.queue.consumer_addition().steals.get() -= 1;
data
},
data => data,
}
}
pub fn try_recv(&self) -> Result<T, Failure<T>> {
match self.queue.pop() {
// If we stole some data, record to that effect (this will be
// factored into cnt later on).
//
// Note that we don't allow steals to grow without bound in order to
// prevent eventual overflow of either steals or cnt as an overflow
// would have catastrophic results. Sometimes, steals > cnt, but
// other times cnt > steals, so we don't know the relation between
// steals and cnt. This code path is executed only rarely, so we do
// a pretty slow operation, of swapping 0 into cnt, taking steals
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => unsafe {
if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.queue
.producer_addition()
.cnt
.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
*self.queue.consumer_addition().steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.queue.consumer_addition().steals.get() >= 0);
}
*self.queue.consumer_addition().steals.get() += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
}
},
None => {
match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
// This is a little bit of a tricky case. We failed to pop
// data above, and then we have viewed that the channel is
// disconnected. In this window more data could have been
// sent on the channel. It doesn't really make sense to
// return that the channel is disconnected when there's
// actually data on it, so be extra sure there's no data by
// popping one more time.
//
// We can ignore steals because the other end is
// disconnected and we'll never need to really factor in our
// steals again.
_ => match self.queue.pop() {
Some(Data(t)) => Ok(t),
Some(GoUp(up)) => Err(Upgraded(up)),
None => Err(Disconnected),
},
}
}
}
}
pub fn drop_chan(&self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
DISCONNECTED => {}
n => {
assert!(n >= 0);
}
}
}
pub fn drop_port(&self) {
// Dropping a port seems like a fairly trivial thing. In theory all we
// need to do is flag that we're disconnected and then everything else
// can take over (we don't have anyone to wake up).
//
// The catch for Ports is that we want to drop the entire contents of
// the queue. There are multiple reasons for having this property, the
// largest of which is that if another chan is waiting in this channel
// (but not received yet), then waiting on that port will cause a
// deadlock.
//
// So if we accept that we must now destroy the entire contents of the
// queue, this code may make a bit more sense. The tricky part is that
// we can't let any in-flight sends go un-dropped, we have to make sure
// *everything* is dropped and nothing new will come onto the channel.
// The first thing we do is set a flag saying that we're done for. All
// sends are gated on this flag, so we're immediately guaranteed that
// there are a bounded number of active sends that we'll have to deal
// with.
self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
// Now that we're guaranteed to deal with a bounded number of senders,
// we need to drain the queue. This draining process happens atomically
// with respect to the "count" of the channel. If the count is nonzero
// (with steals taken into account), then there must be data on the
// channel. In this case we drain everything and then try again. We will
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
while {
match self.queue.producer_addition().cnt.compare_exchange(
steals,
DISCONNECTED,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => false,
Err(old) => old != DISCONNECTED,
}
} {
while self.queue.pop().is_some() {
steals += 1;
}
}
// At this point in time, we have gated all future senders from sending,
// and we have flagged the channel as being disconnected. The senders
// still have some responsibility, however, because some sends might not
// complete until after we flag the disconnection. There are more
// details in the sending methods that see DISCONNECTED
}
////////////////////////////////////////////////////////////////////////////
// select implementation
////////////////////////////////////////////////////////////////////////////
// increment the count on the channel (used for selection)
fn bump(&self, amt: isize) -> isize {
match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
DISCONNECTED
}
n => n,
}
}
// Removes a previous thread from being blocked in this port
pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
// If we're aborting selection after upgrading from a oneshot, then
// we're guarantee that no one is waiting. The only way that we could
// have seen the upgrade is if data was actually sent on the channel
// half again. For us, this means that there is guaranteed to be data on
// this channel. Furthermore, we're guaranteed that there was no
// start_selection previously, so there's no need to modify `self.cnt`
// at all.
//
// Hence, because of these invariants, we immediately return `Ok(true)`.
// Note that the data might not actually be sent on the channel just yet.
// The other end could have flagged the upgrade but not sent data to
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
return Ok(true);
}
// We want to make sure that the count on the channel goes non-negative,
// and in the stream case we can have at most one steal, so just assume
// that we had one steal.
let steals = 1;
let prev = self.bump(steals + 1);
// If we were previously disconnected, then we know for sure that there
// is no thread in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
assert!(cur >= 0);
// If the previous count was negative, then we just made things go
// positive, hence we passed the -1 boundary and we're responsible
// for removing the to_wake() field and trashing it.
//
// If the previous count was positive then we're in a tougher
// situation. A possible race is that a sender just incremented
// through -1 (meaning it's going to try to wake a thread up), but it
// hasn't yet read the to_wake. In order to prevent a future recv()
// from waking up too early (this sender picking up the plastered
// over to_wake), we spin loop here waiting for to_wake to be 0.
// Note that this entire select() implementation needs an overhaul,
// and this is *not* the worst part of it, so this is not done as a
// final solution but rather out of necessity for now to get
// something working.
if prev < 0 {
drop(self.take_to_wake());
} else {
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
unsafe {
assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
*self.queue.consumer_addition().steals.get() = steals;
}
// if we were previously positive, then there's surely data to
// receive
prev >= 0
};
// Now that we've determined that this queue "has data", we peek at the
// queue to see if the data is an upgrade or not. If it's an upgrade,
// then we need to destroy this port and abort selection on the
// upgraded port.
if has_data {
match self.queue.peek() {
Some(&mut GoUp(..)) => match self.queue.pop() {
Some(GoUp(port)) => Err(port),
_ => unreachable!(),
},
_ => Ok(true),
}
} else {
Ok(false)
}
}
}
impl<T> Drop for Packet<T> {
fn drop(&mut self) {
// Note that this load is not only an assert for correctness about
// disconnection, but also a proper fence before the read of
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
}
}

View file

@ -1,495 +0,0 @@
use self::Blocker::*;
/// Synchronous channels/ports
///
/// This channel implementation differs significantly from the asynchronous
/// implementations found next to it (oneshot/stream/share). This is an
/// implementation of a synchronous, bounded buffer channel.
///
/// Each channel is created with some amount of backing buffer, and sends will
/// *block* until buffer space becomes available. A buffer size of 0 is valid,
/// which means that every successful send is paired with a successful recv.
///
/// This flavor of channels defines a new `send_opt` method for channels which
/// is the method by which a message is sent but the thread does not panic if it
/// cannot be delivered.
///
/// Another major difference is that send() will *always* return back the data
/// if it couldn't be sent. This is because it is deterministically known when
/// the data is received and when it is not received.
///
/// Implementation-wise, it can all be summed up with "use a mutex plus some
/// logic". The mutex used here is an OS native mutex, meaning that no user code
/// is run inside of the mutex (to prevent context switching). This
/// implementation shares almost all code for the buffered and unbuffered cases
/// of a synchronous channel. There are a few branches for the unbuffered case,
/// but they're mostly just relevant to blocking senders.
pub use self::Failure::*;
use core::intrinsics::abort;
use core::mem;
use core::ptr;
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
use crate::sync::{Mutex, MutexGuard};
use crate::time::Instant;
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
pub struct Packet<T> {
/// Only field outside of the mutex. Just done for kicks, but mainly because
/// the other shared channel already had the code implemented
channels: AtomicUsize,
lock: Mutex<State<T>>,
}
unsafe impl<T: Send> Send for Packet<T> {}
unsafe impl<T: Send> Sync for Packet<T> {}
struct State<T> {
disconnected: bool, // Is the channel disconnected yet?
queue: Queue, // queue of senders waiting to send data
blocker: Blocker, // currently blocked thread on this channel
buf: Buffer<T>, // storage for buffered messages
cap: usize, // capacity of this channel
/// A curious flag used to indicate whether a sender failed or succeeded in
/// blocking. This is used to transmit information back to the thread that it
/// must dequeue its message from the buffer because it was not received.
/// This is only relevant in the 0-buffer case. This obviously cannot be
/// safely constructed, but it's guaranteed to always have a valid pointer
/// value.
canceled: Option<&'static mut bool>,
}
unsafe impl<T: Send> Send for State<T> {}
/// Possible flavors of threads who can be blocked on this channel.
enum Blocker {
BlockedSender(SignalToken),
BlockedReceiver(SignalToken),
NoneBlocked,
}
/// Simple queue for threading threads together. Nodes are stack-allocated, so
/// this structure is not safe at all
struct Queue {
head: *mut Node,
tail: *mut Node,
}
struct Node {
token: Option<SignalToken>,
next: *mut Node,
}
unsafe impl Send for Node {}
/// A simple ring-buffer
struct Buffer<T> {
buf: Vec<Option<T>>,
start: usize,
size: usize,
}
#[derive(Debug)]
pub enum Failure {
Empty,
Disconnected,
}
/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
fn wait<'a, 'b, T>(
lock: &'a Mutex<State<T>>,
mut guard: MutexGuard<'b, State<T>>,
f: fn(SignalToken) -> Blocker,
) -> MutexGuard<'a, State<T>> {
let (wait_token, signal_token) = blocking::tokens();
match mem::replace(&mut guard.blocker, f(signal_token)) {
NoneBlocked => {}
_ => unreachable!(),
}
drop(guard); // unlock
wait_token.wait(); // block
lock.lock().unwrap() // relock
}
/// Same as wait, but waiting at most until `deadline`.
fn wait_timeout_receiver<'a, 'b, T>(
lock: &'a Mutex<State<T>>,
deadline: Instant,
mut guard: MutexGuard<'b, State<T>>,
success: &mut bool,
) -> MutexGuard<'a, State<T>> {
let (wait_token, signal_token) = blocking::tokens();
match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
NoneBlocked => {}
_ => unreachable!(),
}
drop(guard); // unlock
*success = wait_token.wait_max_until(deadline); // block
let mut new_guard = lock.lock().unwrap(); // relock
if !*success {
abort_selection(&mut new_guard);
}
new_guard
}
fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(token) => {
guard.blocker = BlockedSender(token);
true
}
BlockedReceiver(token) => {
drop(token);
false
}
}
}
/// Wakes up a thread, dropping the lock at the correct time
fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
// We need to be careful to wake up the waiting thread *outside* of the mutex
// in case it incurs a context switch.
drop(guard);
token.signal();
}
impl<T> Packet<T> {
pub fn new(capacity: usize) -> Packet<T> {
Packet {
channels: AtomicUsize::new(1),
lock: Mutex::new(State {
disconnected: false,
blocker: NoneBlocked,
cap: capacity,
canceled: None,
queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
buf: Buffer {
buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
start: 0,
size: 0,
},
}),
}
}
// wait until a send slot is available, returning locked access to
// the channel state.
fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
let mut node = Node { token: None, next: ptr::null_mut() };
loop {
let mut guard = self.lock.lock().unwrap();
// are we ready to go?
if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
return guard;
}
// no room; actually block
let wait_token = guard.queue.enqueue(&mut node);
drop(guard);
wait_token.wait();
}
}
pub fn send(&self, t: T) -> Result<(), T> {
let mut guard = self.acquire_send_slot();
if guard.disconnected {
return Err(t);
}
guard.buf.enqueue(t);
match mem::replace(&mut guard.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
// available to take our data. After waiting, we check again to make
// sure the port didn't go away in the meantime. If it did, we need
// to hand back our data.
NoneBlocked if guard.cap == 0 => {
let mut canceled = false;
assert!(guard.canceled.is_none());
guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
let mut guard = wait(&self.lock, guard, BlockedSender);
if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
}
// success, we buffered some data
NoneBlocked => Ok(()),
// success, someone's about to receive our buffered data.
BlockedReceiver(token) => {
wakeup(token, guard);
Ok(())
}
BlockedSender(..) => panic!("lolwut"),
}
}
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected {
Err(super::TrySendError::Disconnected(t))
} else if guard.buf.size() == guard.buf.capacity() {
Err(super::TrySendError::Full(t))
} else if guard.cap == 0 {
// With capacity 0, even though we have buffer space we can't
// transfer the data unless there's a receiver waiting.
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => Err(super::TrySendError::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => {
guard.buf.enqueue(t);
wakeup(token, guard);
Ok(())
}
}
} else {
// If the buffer has some space and the capacity isn't 0, then we
// just enqueue the data for later retrieval, ensuring to wake up
// any blocked receiver if there is one.
assert!(guard.buf.size() < guard.buf.capacity());
guard.buf.enqueue(t);
match mem::replace(&mut guard.blocker, NoneBlocked) {
BlockedReceiver(token) => wakeup(token, guard),
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
}
Ok(())
}
}
// Receives a message from this channel
//
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
let mut guard = self.lock.lock().unwrap();
let mut woke_up_after_waiting = false;
// Wait for the buffer to have something in it. No need for a
// while loop because we're the only receiver.
if !guard.disconnected && guard.buf.size() == 0 {
if let Some(deadline) = deadline {
guard =
wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
} else {
guard = wait(&self.lock, guard, BlockedReceiver);
woke_up_after_waiting = true;
}
}
// N.B., channel could be disconnected while waiting, so the order of
// these conditionals is important.
if guard.disconnected && guard.buf.size() == 0 {
return Err(Disconnected);
}
// Pick up the data, wake up our neighbors, and carry on
assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
if guard.buf.size() == 0 {
return Err(Empty);
}
let ret = guard.buf.dequeue();
self.wakeup_senders(woke_up_after_waiting, guard);
Ok(ret)
}
pub fn try_recv(&self) -> Result<T, Failure> {
let mut guard = self.lock.lock().unwrap();
// Easy cases first
if guard.disconnected && guard.buf.size() == 0 {
return Err(Disconnected);
}
if guard.buf.size() == 0 {
return Err(Empty);
}
// Be sure to wake up neighbors
let ret = Ok(guard.buf.dequeue());
self.wakeup_senders(false, guard);
ret
}
// Wake up pending senders after some data has been received
//
// * `waited` - flag if the receiver blocked to receive some data, or if it
// just picked up some data on the way out
// * `guard` - the lock guard that is held over this channel's lock
fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
// was already the ACK.
let pending_sender2 = if guard.cap == 0 && !waited {
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(token) => {
guard.canceled.take();
Some(token)
}
}
} else {
None
};
mem::drop(guard);
// only outside of the lock do we wake up the pending threads
if let Some(token) = pending_sender1 {
token.signal();
}
if let Some(token) = pending_sender2 {
token.signal();
}
}
// Prepares this shared packet for a channel clone, essentially just bumping
// a refcount.
pub fn clone_chan(&self) {
let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
// See comments on Arc::clone() on why we do this (for `mem::forget`).
if old_count > MAX_REFCOUNT {
abort();
}
}
pub fn drop_chan(&self) {
// Only flag the channel as disconnected if we're the last channel
match self.channels.fetch_sub(1, Ordering::SeqCst) {
1 => {}
_ => return,
}
// Not much to do other than wake up a receiver if one's there
let mut guard = self.lock.lock().unwrap();
if guard.disconnected {
return;
}
guard.disconnected = true;
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(token) => wakeup(token, guard),
}
}
pub fn drop_port(&self) {
let mut guard = self.lock.lock().unwrap();
if guard.disconnected {
return;
}
guard.disconnected = true;
// If the capacity is 0, then the sender may want its data back after
// we're disconnected. Otherwise it's now our responsibility to destroy
// the buffered data. As with many other portions of this code, this
// needs to be careful to destroy the data *outside* of the lock to
// prevent deadlock.
let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
let mut queue =
mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(token) => {
*guard.canceled.take().unwrap() = true;
Some(token)
}
BlockedReceiver(..) => unreachable!(),
};
mem::drop(guard);
while let Some(token) = queue.dequeue() {
token.signal();
}
if let Some(token) = waiter {
token.signal();
}
}
}
impl<T> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
let mut guard = self.lock.lock().unwrap();
assert!(guard.queue.dequeue().is_none());
assert!(guard.canceled.is_none());
}
}
////////////////////////////////////////////////////////////////////////////////
// Buffer, a simple ring buffer backed by Vec<T>
////////////////////////////////////////////////////////////////////////////////
impl<T> Buffer<T> {
fn enqueue(&mut self, t: T) {
let pos = (self.start + self.size) % self.buf.len();
self.size += 1;
let prev = mem::replace(&mut self.buf[pos], Some(t));
assert!(prev.is_none());
}
fn dequeue(&mut self) -> T {
let start = self.start;
self.size -= 1;
self.start = (self.start + 1) % self.buf.len();
let result = &mut self.buf[start];
result.take().unwrap()
}
fn size(&self) -> usize {
self.size
}
fn capacity(&self) -> usize {
self.buf.len()
}
}
////////////////////////////////////////////////////////////////////////////////
// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
////////////////////////////////////////////////////////////////////////////////
impl Queue {
fn enqueue(&mut self, node: &mut Node) -> WaitToken {
let (wait_token, signal_token) = blocking::tokens();
node.token = Some(signal_token);
node.next = ptr::null_mut();
if self.tail.is_null() {
self.head = node as *mut Node;
self.tail = node as *mut Node;
} else {
unsafe {
(*self.tail).next = node as *mut Node;
self.tail = node as *mut Node;
}
}
wait_token
}
fn dequeue(&mut self) -> Option<SignalToken> {
if self.head.is_null() {
return None;
}
let node = self.head;
self.head = unsafe { (*node).next };
if self.head.is_null() {
self.tail = ptr::null_mut();
}
unsafe {
(*node).next = ptr::null_mut();
Some((*node).token.take().unwrap())
}
}
}