forked from mirror/async-std
* Add channel behind unstable feature flag * Don't check tests without unstable feature flag * Fix typos * Remove useless attribute
1132 lines
33 KiB
Rust
1132 lines
33 KiB
Rust
use std::cell::UnsafeCell;
|
|
use std::fmt;
|
|
use std::future::Future;
|
|
use std::isize;
|
|
use std::marker::PhantomData;
|
|
use std::mem;
|
|
use std::ops::{Deref, DerefMut};
|
|
use std::pin::Pin;
|
|
use std::process;
|
|
use std::ptr;
|
|
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll, Waker};
|
|
|
|
use crossbeam_utils::{Backoff, CachePadded};
|
|
use futures_core::stream::Stream;
|
|
use slab::Slab;
|
|
|
|
/// Creates a bounded multi-producer multi-consumer channel.
|
|
///
|
|
/// This channel has a buffer that can hold at most `cap` messages at a time.
|
|
///
|
|
/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it
|
|
/// becomes closed. Receive operations on a closed and empty channel return `None` instead of
|
|
/// blocking.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// If `cap` is zero, this function will panic.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use std::time::Duration;
|
|
///
|
|
/// use async_std::sync::channel;
|
|
/// use async_std::task;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// // This call returns immediately because there is enough space in the channel.
|
|
/// s.send(1).await;
|
|
///
|
|
/// task::spawn(async move {
|
|
/// // This call blocks the current task because the channel is full.
|
|
/// // It will be able to complete only after the first message is received.
|
|
/// s.send(2).await;
|
|
/// });
|
|
///
|
|
/// task::sleep(Duration::from_secs(1)).await;
|
|
/// assert_eq!(r.recv().await, Some(1));
|
|
/// assert_eq!(r.recv().await, Some(2));
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
#[cfg(feature = "unstable")]
|
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
|
pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
|
|
let channel = Arc::new(Channel::with_capacity(cap));
|
|
let s = Sender {
|
|
channel: channel.clone(),
|
|
};
|
|
let r = Receiver {
|
|
channel,
|
|
opt_key: None,
|
|
};
|
|
(s, r)
|
|
}
|
|
|
|
/// The sending side of a channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
/// use async_std::task;
|
|
///
|
|
/// let (s1, r) = channel(100);
|
|
/// let s2 = s1.clone();
|
|
///
|
|
/// task::spawn(async move { s1.send(1).await });
|
|
/// task::spawn(async move { s2.send(2).await });
|
|
///
|
|
/// let msg1 = r.recv().await.unwrap();
|
|
/// let msg2 = r.recv().await.unwrap();
|
|
///
|
|
/// assert_eq!(msg1 + msg2, 3);
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
#[cfg(feature = "unstable")]
|
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
|
pub struct Sender<T> {
|
|
/// The inner channel.
|
|
channel: Arc<Channel<T>>,
|
|
}
|
|
|
|
impl<T> Sender<T> {
|
|
/// Sends a message into the channel.
|
|
///
|
|
/// If the channel is full, this method will block the current task until the send operation
|
|
/// can proceed.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
/// use async_std::task;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// task::spawn(async move {
|
|
/// s.send(1).await;
|
|
/// s.send(2).await;
|
|
/// });
|
|
///
|
|
/// assert_eq!(r.recv().await, Some(1));
|
|
/// assert_eq!(r.recv().await, Some(2));
|
|
/// assert_eq!(r.recv().await, None);
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub async fn send(&self, msg: T) {
|
|
struct SendFuture<'a, T> {
|
|
sender: &'a Sender<T>,
|
|
msg: Option<T>,
|
|
opt_key: Option<usize>,
|
|
}
|
|
|
|
impl<T> Unpin for SendFuture<'_, T> {}
|
|
|
|
impl<T> Future for SendFuture<'_, T> {
|
|
type Output = ();
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let msg = self.msg.take().unwrap();
|
|
|
|
// Try sending the message.
|
|
let poll = match self.sender.channel.push(msg) {
|
|
Ok(()) => Poll::Ready(()),
|
|
Err(PushError::Disconnected(msg)) => {
|
|
self.msg = Some(msg);
|
|
Poll::Pending
|
|
}
|
|
Err(PushError::Full(msg)) => {
|
|
// Register the current task.
|
|
match self.opt_key {
|
|
None => self.opt_key = Some(self.sender.channel.sends.register(cx)),
|
|
Some(key) => self.sender.channel.sends.reregister(key, cx),
|
|
}
|
|
|
|
// Try sending the message again.
|
|
match self.sender.channel.push(msg) {
|
|
Ok(()) => Poll::Ready(()),
|
|
Err(PushError::Disconnected(msg)) | Err(PushError::Full(msg)) => {
|
|
self.msg = Some(msg);
|
|
Poll::Pending
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
if poll.is_ready() {
|
|
// If the current task was registered, unregister now.
|
|
if let Some(key) = self.opt_key.take() {
|
|
// `true` means the send operation is completed.
|
|
self.sender.channel.sends.unregister(key, true);
|
|
}
|
|
}
|
|
|
|
poll
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for SendFuture<'_, T> {
|
|
fn drop(&mut self) {
|
|
// If the current task was registered, unregister now.
|
|
if let Some(key) = self.opt_key {
|
|
// `false` means the send operation is cancelled.
|
|
self.sender.channel.sends.unregister(key, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
SendFuture {
|
|
sender: self,
|
|
msg: Some(msg),
|
|
opt_key: None,
|
|
}
|
|
.await
|
|
}
|
|
|
|
/// Returns the channel capacity.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, _) = channel::<i32>(5);
|
|
/// assert_eq!(s.capacity(), 5);
|
|
/// ```
|
|
pub fn capacity(&self) -> usize {
|
|
self.channel.cap
|
|
}
|
|
|
|
/// Returns `true` if the channel is empty.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// assert!(s.is_empty());
|
|
/// s.send(0).await;
|
|
/// assert!(!s.is_empty());
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn is_empty(&self) -> bool {
|
|
self.channel.is_empty()
|
|
}
|
|
|
|
/// Returns `true` if the channel is full.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// assert!(!s.is_full());
|
|
/// s.send(0).await;
|
|
/// assert!(s.is_full());
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn is_full(&self) -> bool {
|
|
self.channel.is_full()
|
|
}
|
|
|
|
/// Returns the number of messages in the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(2);
|
|
/// assert_eq!(s.len(), 0);
|
|
///
|
|
/// s.send(1).await;
|
|
/// s.send(2).await;
|
|
/// assert_eq!(s.len(), 2);
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn len(&self) -> usize {
|
|
self.channel.len()
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Sender<T> {
|
|
fn drop(&mut self) {
|
|
// Decrement the sender count and disconnect the channel if it drops down to zero.
|
|
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
|
self.channel.disconnect();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Clone for Sender<T> {
|
|
fn clone(&self) -> Sender<T> {
|
|
let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
// Make sure the count never overflows, even if lots of sender clones are leaked.
|
|
if count > isize::MAX as usize {
|
|
process::abort();
|
|
}
|
|
|
|
Sender {
|
|
channel: self.channel.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for Sender<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Sender { .. }")
|
|
}
|
|
}
|
|
|
|
/// The receiving side of a channel.
|
|
///
|
|
/// This type implements the [`Stream`] trait, which means it can act as an asynchronous iterator.
|
|
///
|
|
/// [`Stream`]: ../stream/trait.Stream.html
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use std::time::Duration;
|
|
///
|
|
/// use async_std::sync::channel;
|
|
/// use async_std::task;
|
|
///
|
|
/// let (s, r) = channel(100);
|
|
///
|
|
/// task::spawn(async move {
|
|
/// s.send(1).await;
|
|
/// task::sleep(Duration::from_secs(1)).await;
|
|
/// s.send(2).await;
|
|
/// });
|
|
///
|
|
/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
|
|
/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
#[cfg(feature = "unstable")]
|
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
|
pub struct Receiver<T> {
|
|
/// The inner channel.
|
|
channel: Arc<Channel<T>>,
|
|
|
|
/// The registration key for this receiver in the `channel.streams` registry.
|
|
opt_key: Option<usize>,
|
|
}
|
|
|
|
impl<T> Receiver<T> {
|
|
/// Receives a message from the channel.
|
|
///
|
|
/// If the channel is empty and it still has senders, this method will block the current task
|
|
/// until the receive operation can proceed. If the channel is empty and there are no more
|
|
/// senders, this method returns `None`.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
/// use async_std::task;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// task::spawn(async move {
|
|
/// s.send(1).await;
|
|
/// s.send(2).await;
|
|
/// });
|
|
///
|
|
/// assert_eq!(r.recv().await, Some(1));
|
|
/// assert_eq!(r.recv().await, Some(2));
|
|
/// assert_eq!(r.recv().await, None);
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub async fn recv(&self) -> Option<T> {
|
|
struct RecvFuture<'a, T> {
|
|
channel: &'a Channel<T>,
|
|
opt_key: Option<usize>,
|
|
}
|
|
|
|
impl<T> Future for RecvFuture<'_, T> {
|
|
type Output = Option<T>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
poll_recv(&self.channel, &self.channel.recvs, &mut self.opt_key, cx)
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for RecvFuture<'_, T> {
|
|
fn drop(&mut self) {
|
|
// If the current task was registered, unregister now.
|
|
if let Some(key) = self.opt_key {
|
|
// `false` means the receive operation is cancelled.
|
|
self.channel.recvs.unregister(key, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
RecvFuture {
|
|
channel: &self.channel,
|
|
opt_key: None,
|
|
}
|
|
.await
|
|
}
|
|
|
|
/// Returns the channel capacity.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (_, r) = channel::<i32>(5);
|
|
/// assert_eq!(r.capacity(), 5);
|
|
/// ```
|
|
pub fn capacity(&self) -> usize {
|
|
self.channel.cap
|
|
}
|
|
|
|
/// Returns `true` if the channel is empty.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// assert!(r.is_empty());
|
|
/// s.send(0).await;
|
|
/// assert!(!r.is_empty());
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn is_empty(&self) -> bool {
|
|
self.channel.is_empty()
|
|
}
|
|
|
|
/// Returns `true` if the channel is full.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(1);
|
|
///
|
|
/// assert!(!r.is_full());
|
|
/// s.send(0).await;
|
|
/// assert!(r.is_full());
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn is_full(&self) -> bool {
|
|
self.channel.is_full()
|
|
}
|
|
|
|
/// Returns the number of messages in the channel.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # async_std::task::block_on(async {
|
|
/// #
|
|
/// use async_std::sync::channel;
|
|
///
|
|
/// let (s, r) = channel(2);
|
|
/// assert_eq!(r.len(), 0);
|
|
///
|
|
/// s.send(1).await;
|
|
/// s.send(2).await;
|
|
/// assert_eq!(r.len(), 2);
|
|
/// #
|
|
/// # })
|
|
/// ```
|
|
pub fn len(&self) -> usize {
|
|
self.channel.len()
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Receiver<T> {
|
|
fn drop(&mut self) {
|
|
// If the current task was registered as blocked on this stream, unregister now.
|
|
if let Some(key) = self.opt_key {
|
|
// `false` means the last request for a stream item is cancelled.
|
|
self.channel.streams.unregister(key, false);
|
|
}
|
|
|
|
// Decrement the receiver count and disconnect the channel if it drops down to zero.
|
|
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
|
self.channel.disconnect();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Clone for Receiver<T> {
|
|
fn clone(&self) -> Receiver<T> {
|
|
let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
// Make sure the count never overflows, even if lots of receiver clones are leaked.
|
|
if count > isize::MAX as usize {
|
|
process::abort();
|
|
}
|
|
|
|
Receiver {
|
|
channel: self.channel.clone(),
|
|
opt_key: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Stream for Receiver<T> {
|
|
type Item = T;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
let this = &mut *self;
|
|
poll_recv(&this.channel, &this.channel.streams, &mut this.opt_key, cx)
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for Receiver<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Receiver { .. }")
|
|
}
|
|
}
|
|
|
|
/// Polls a receive operation on a channel.
|
|
///
|
|
/// If the receive operation is blocked, the current task will be registered in `registry` and its
|
|
/// registration key will then be stored in `opt_key`.
|
|
fn poll_recv<T>(
|
|
channel: &Channel<T>,
|
|
registry: &Registry,
|
|
opt_key: &mut Option<usize>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<T>> {
|
|
// Try receiving a message.
|
|
let poll = match channel.pop() {
|
|
Ok(msg) => Poll::Ready(Some(msg)),
|
|
Err(PopError::Disconnected) => Poll::Ready(None),
|
|
Err(PopError::Empty) => {
|
|
// Register the current task.
|
|
match *opt_key {
|
|
None => *opt_key = Some(registry.register(cx)),
|
|
Some(key) => registry.reregister(key, cx),
|
|
}
|
|
|
|
// Try receiving a message again.
|
|
match channel.pop() {
|
|
Ok(msg) => Poll::Ready(Some(msg)),
|
|
Err(PopError::Disconnected) => Poll::Ready(None),
|
|
Err(PopError::Empty) => Poll::Pending,
|
|
}
|
|
}
|
|
};
|
|
|
|
if poll.is_ready() {
|
|
// If the current task was registered, unregister now.
|
|
if let Some(key) = opt_key.take() {
|
|
// `true` means the receive operation is completed.
|
|
registry.unregister(key, true);
|
|
}
|
|
}
|
|
|
|
poll
|
|
}
|
|
|
|
/// A slot in a channel.
|
|
struct Slot<T> {
|
|
/// The current stamp.
|
|
stamp: AtomicUsize,
|
|
|
|
/// The message in this slot.
|
|
msg: UnsafeCell<T>,
|
|
}
|
|
|
|
/// Bounded channel based on a preallocated array.
|
|
struct Channel<T> {
|
|
/// The head of the channel.
|
|
///
|
|
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
|
|
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
|
|
/// represent the lap. The mark bit in the head is always zero.
|
|
///
|
|
/// Messages are popped from the head of the channel.
|
|
head: CachePadded<AtomicUsize>,
|
|
|
|
/// The tail of the channel.
|
|
///
|
|
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
|
|
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
|
|
/// represent the lap. The mark bit indicates that the channel is disconnected.
|
|
///
|
|
/// Messages are pushed into the tail of the channel.
|
|
tail: CachePadded<AtomicUsize>,
|
|
|
|
/// The buffer holding slots.
|
|
buffer: *mut Slot<T>,
|
|
|
|
/// The channel capacity.
|
|
cap: usize,
|
|
|
|
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
|
|
one_lap: usize,
|
|
|
|
/// If this bit is set in the tail, that means either all senders were dropped or all receivers
|
|
/// were dropped.
|
|
mark_bit: usize,
|
|
|
|
/// Send operations waiting while the channel is full.
|
|
sends: Registry,
|
|
|
|
/// Receive operations waiting while the channel is empty and not disconnected.
|
|
recvs: Registry,
|
|
|
|
/// Streams waiting while the channel is empty and not disconnected.
|
|
streams: Registry,
|
|
|
|
/// The number of currently active `Sender`s.
|
|
sender_count: AtomicUsize,
|
|
|
|
/// The number of currently active `Receivers`s.
|
|
receiver_count: AtomicUsize,
|
|
|
|
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
|
|
_marker: PhantomData<T>,
|
|
}
|
|
|
|
unsafe impl<T: Send> Send for Channel<T> {}
|
|
unsafe impl<T: Send> Sync for Channel<T> {}
|
|
impl<T> Unpin for Channel<T> {}
|
|
|
|
impl<T> Channel<T> {
|
|
/// Creates a bounded channel of capacity `cap`.
|
|
fn with_capacity(cap: usize) -> Self {
|
|
assert!(cap > 0, "capacity must be positive");
|
|
|
|
// Compute constants `mark_bit` and `one_lap`.
|
|
let mark_bit = (cap + 1).next_power_of_two();
|
|
let one_lap = mark_bit * 2;
|
|
|
|
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
|
|
let head = 0;
|
|
// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
|
|
let tail = 0;
|
|
|
|
// Allocate a buffer of `cap` slots.
|
|
let buffer = {
|
|
let mut v = Vec::<Slot<T>>::with_capacity(cap);
|
|
let ptr = v.as_mut_ptr();
|
|
mem::forget(v);
|
|
ptr
|
|
};
|
|
|
|
// Initialize stamps in the slots.
|
|
for i in 0..cap {
|
|
unsafe {
|
|
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
|
|
let slot = buffer.add(i);
|
|
ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
|
|
}
|
|
}
|
|
|
|
Channel {
|
|
buffer,
|
|
cap,
|
|
one_lap,
|
|
mark_bit,
|
|
head: CachePadded::new(AtomicUsize::new(head)),
|
|
tail: CachePadded::new(AtomicUsize::new(tail)),
|
|
sends: Registry::new(),
|
|
recvs: Registry::new(),
|
|
streams: Registry::new(),
|
|
sender_count: AtomicUsize::new(1),
|
|
receiver_count: AtomicUsize::new(1),
|
|
_marker: PhantomData,
|
|
}
|
|
}
|
|
|
|
/// Attempts to push a message.
|
|
fn push(&self, msg: T) -> Result<(), PushError<T>> {
|
|
let backoff = Backoff::new();
|
|
let mut tail = self.tail.load(Ordering::Relaxed);
|
|
|
|
loop {
|
|
// Deconstruct the tail.
|
|
let index = tail & (self.mark_bit - 1);
|
|
let lap = tail & !(self.one_lap - 1);
|
|
|
|
// Inspect the corresponding slot.
|
|
let slot = unsafe { &*self.buffer.add(index) };
|
|
let stamp = slot.stamp.load(Ordering::Acquire);
|
|
|
|
// If the tail and the stamp match, we may attempt to push.
|
|
if tail == stamp {
|
|
let new_tail = if index + 1 < self.cap {
|
|
// Same lap, incremented index.
|
|
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
|
tail + 1
|
|
} else {
|
|
// One lap forward, index wraps around to zero.
|
|
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
|
lap.wrapping_add(self.one_lap)
|
|
};
|
|
|
|
// Try moving the tail.
|
|
match self.tail.compare_exchange_weak(
|
|
tail,
|
|
new_tail,
|
|
Ordering::SeqCst,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(_) => {
|
|
// Write the message into the slot and update the stamp.
|
|
unsafe { slot.msg.get().write(msg) };
|
|
let stamp = tail + 1;
|
|
slot.stamp.store(stamp, Ordering::Release);
|
|
|
|
// Wake a blocked receive operation.
|
|
self.recvs.notify_one();
|
|
|
|
// Wake all blocked streams.
|
|
self.streams.notify_all();
|
|
|
|
return Ok(());
|
|
}
|
|
Err(t) => {
|
|
tail = t;
|
|
backoff.spin();
|
|
}
|
|
}
|
|
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
|
|
atomic::fence(Ordering::SeqCst);
|
|
let head = self.head.load(Ordering::Relaxed);
|
|
|
|
// If the head lags one lap behind the tail as well...
|
|
if head.wrapping_add(self.one_lap) == tail {
|
|
// ...then the channel is full.
|
|
|
|
// Check if the channel is disconnected.
|
|
if tail & self.mark_bit != 0 {
|
|
return Err(PushError::Disconnected(msg));
|
|
} else {
|
|
return Err(PushError::Full(msg));
|
|
}
|
|
}
|
|
|
|
backoff.spin();
|
|
tail = self.tail.load(Ordering::Relaxed);
|
|
} else {
|
|
// Snooze because we need to wait for the stamp to get updated.
|
|
backoff.snooze();
|
|
tail = self.tail.load(Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Attempts to pop a message.
|
|
fn pop(&self) -> Result<T, PopError> {
|
|
let backoff = Backoff::new();
|
|
let mut head = self.head.load(Ordering::Relaxed);
|
|
|
|
loop {
|
|
// Deconstruct the head.
|
|
let index = head & (self.mark_bit - 1);
|
|
let lap = head & !(self.one_lap - 1);
|
|
|
|
// Inspect the corresponding slot.
|
|
let slot = unsafe { &*self.buffer.add(index) };
|
|
let stamp = slot.stamp.load(Ordering::Acquire);
|
|
|
|
// If the the stamp is ahead of the head by 1, we may attempt to pop.
|
|
if head + 1 == stamp {
|
|
let new = if index + 1 < self.cap {
|
|
// Same lap, incremented index.
|
|
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
|
|
head + 1
|
|
} else {
|
|
// One lap forward, index wraps around to zero.
|
|
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
|
|
lap.wrapping_add(self.one_lap)
|
|
};
|
|
|
|
// Try moving the head.
|
|
match self.head.compare_exchange_weak(
|
|
head,
|
|
new,
|
|
Ordering::SeqCst,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(_) => {
|
|
// Read the message from the slot and update the stamp.
|
|
let msg = unsafe { slot.msg.get().read() };
|
|
let stamp = head.wrapping_add(self.one_lap);
|
|
slot.stamp.store(stamp, Ordering::Release);
|
|
|
|
// Wake a blocked send operation.
|
|
self.sends.notify_one();
|
|
|
|
return Ok(msg);
|
|
}
|
|
Err(h) => {
|
|
head = h;
|
|
backoff.spin();
|
|
}
|
|
}
|
|
} else if stamp == head {
|
|
atomic::fence(Ordering::SeqCst);
|
|
let tail = self.tail.load(Ordering::Relaxed);
|
|
|
|
// If the tail equals the head, that means the channel is empty.
|
|
if (tail & !self.mark_bit) == head {
|
|
// If the channel is disconnected...
|
|
if tail & self.mark_bit != 0 {
|
|
return Err(PopError::Disconnected);
|
|
} else {
|
|
// Otherwise, the receive operation is not ready.
|
|
return Err(PopError::Empty);
|
|
}
|
|
}
|
|
|
|
backoff.spin();
|
|
head = self.head.load(Ordering::Relaxed);
|
|
} else {
|
|
// Snooze because we need to wait for the stamp to get updated.
|
|
backoff.snooze();
|
|
head = self.head.load(Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns the current number of messages inside the channel.
|
|
fn len(&self) -> usize {
|
|
loop {
|
|
// Load the tail, then load the head.
|
|
let tail = self.tail.load(Ordering::SeqCst);
|
|
let head = self.head.load(Ordering::SeqCst);
|
|
|
|
// If the tail didn't change, we've got consistent values to work with.
|
|
if self.tail.load(Ordering::SeqCst) == tail {
|
|
let hix = head & (self.mark_bit - 1);
|
|
let tix = tail & (self.mark_bit - 1);
|
|
|
|
return if hix < tix {
|
|
tix - hix
|
|
} else if hix > tix {
|
|
self.cap - hix + tix
|
|
} else if (tail & !self.mark_bit) == head {
|
|
0
|
|
} else {
|
|
self.cap
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the channel is empty.
|
|
fn is_empty(&self) -> bool {
|
|
let head = self.head.load(Ordering::SeqCst);
|
|
let tail = self.tail.load(Ordering::SeqCst);
|
|
|
|
// Is the tail equal to the head?
|
|
//
|
|
// Note: If the head changes just before we load the tail, that means there was a moment
|
|
// when the channel was not empty, so it is safe to just return `false`.
|
|
(tail & !self.mark_bit) == head
|
|
}
|
|
|
|
/// Returns `true` if the channel is full.
|
|
fn is_full(&self) -> bool {
|
|
let tail = self.tail.load(Ordering::SeqCst);
|
|
let head = self.head.load(Ordering::SeqCst);
|
|
|
|
// Is the head lagging one lap behind tail?
|
|
//
|
|
// Note: If the tail changes just before we load the head, that means there was a moment
|
|
// when the channel was not full, so it is safe to just return `false`.
|
|
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
|
|
}
|
|
|
|
/// Disconnects the channel and wakes up all blocked operations.
|
|
fn disconnect(&self) {
|
|
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
|
|
|
|
if tail & self.mark_bit == 0 {
|
|
// Notify everyone blocked on this channel.
|
|
self.sends.notify_all();
|
|
self.recvs.notify_all();
|
|
self.streams.notify_all();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Channel<T> {
|
|
fn drop(&mut self) {
|
|
// Get the index of the head.
|
|
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
|
|
|
|
// Loop over all slots that hold a message and drop them.
|
|
for i in 0..self.len() {
|
|
// Compute the index of the next slot holding a message.
|
|
let index = if hix + i < self.cap {
|
|
hix + i
|
|
} else {
|
|
hix + i - self.cap
|
|
};
|
|
|
|
unsafe {
|
|
self.buffer.add(index).drop_in_place();
|
|
}
|
|
}
|
|
|
|
// Finally, deallocate the buffer, but don't run any destructors.
|
|
unsafe {
|
|
Vec::from_raw_parts(self.buffer, 0, self.cap);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// An error returned from the `push()` method.
|
|
enum PushError<T> {
|
|
/// The channel is full but not disconnected.
|
|
Full(T),
|
|
|
|
/// The channel is full and disconnected.
|
|
Disconnected(T),
|
|
}
|
|
|
|
/// An error returned from the `pop()` method.
|
|
enum PopError {
|
|
/// The channel is empty but not disconnected.
|
|
Empty,
|
|
|
|
/// The channel is empty and disconnected.
|
|
Disconnected,
|
|
}
|
|
|
|
/// A list of blocked channel operations.
|
|
struct Blocked {
|
|
/// A list of registered channel operations.
|
|
///
|
|
/// Each entry has a waker associated with the task that is executing the operation. If the
|
|
/// waker is set to `None`, that means the task has been woken up but hasn't removed itself
|
|
/// from the registry yet.
|
|
entries: Slab<Option<Waker>>,
|
|
|
|
/// The number of wakers in the entry list.
|
|
waker_count: usize,
|
|
}
|
|
|
|
/// A registry of blocked channel operations.
|
|
///
|
|
/// Blocked operations register themselves in a registry. Successful operations on the opposite
|
|
/// side of the channel wake blocked operations in the registry.
|
|
struct Registry {
|
|
/// A list of blocked channel operations.
|
|
blocked: Spinlock<Blocked>,
|
|
|
|
/// Set to `true` if there are no wakers in the registry.
|
|
///
|
|
/// Note that this either means there are no entries in the registry, or that all entries have
|
|
/// been notified.
|
|
is_empty: AtomicBool,
|
|
}
|
|
|
|
impl Registry {
|
|
/// Creates a new registry.
|
|
fn new() -> Registry {
|
|
Registry {
|
|
blocked: Spinlock::new(Blocked {
|
|
entries: Slab::new(),
|
|
waker_count: 0,
|
|
}),
|
|
is_empty: AtomicBool::new(true),
|
|
}
|
|
}
|
|
|
|
/// Registers a blocked channel operation and returns a key associated with it.
|
|
fn register(&self, cx: &Context<'_>) -> usize {
|
|
let mut blocked = self.blocked.lock();
|
|
|
|
// Insert a new entry into the list of blocked tasks.
|
|
let w = cx.waker().clone();
|
|
let key = blocked.entries.insert(Some(w));
|
|
|
|
blocked.waker_count += 1;
|
|
if blocked.waker_count == 1 {
|
|
self.is_empty.store(false, Ordering::SeqCst);
|
|
}
|
|
|
|
key
|
|
}
|
|
|
|
/// Re-registers a blocked channel operation by filling in its waker.
|
|
fn reregister(&self, key: usize, cx: &Context<'_>) {
|
|
let mut blocked = self.blocked.lock();
|
|
|
|
let was_none = blocked.entries[key].is_none();
|
|
let w = cx.waker().clone();
|
|
blocked.entries[key] = Some(w);
|
|
|
|
if was_none {
|
|
blocked.waker_count += 1;
|
|
if blocked.waker_count == 1 {
|
|
self.is_empty.store(false, Ordering::SeqCst);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unregisters a channel operation.
|
|
///
|
|
/// If `completed` is `true`, the operation will be removed from the registry. If `completed`
|
|
/// is `false`, that means the operation was cancelled so another one will be notified.
|
|
fn unregister(&self, key: usize, completed: bool) {
|
|
let mut blocked = self.blocked.lock();
|
|
let mut removed = false;
|
|
|
|
match blocked.entries.remove(key) {
|
|
Some(_) => removed = true,
|
|
None => {
|
|
if !completed {
|
|
// This operation was cancelled. Notify another one.
|
|
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
|
|
if let Some(w) = opt_waker.take() {
|
|
w.wake();
|
|
removed = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if removed {
|
|
blocked.waker_count -= 1;
|
|
if blocked.waker_count == 0 {
|
|
self.is_empty.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Notifies one blocked channel operation.
|
|
#[inline]
|
|
fn notify_one(&self) {
|
|
if !self.is_empty.load(Ordering::SeqCst) {
|
|
let mut blocked = self.blocked.lock();
|
|
|
|
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
|
|
// If there is no waker in this entry, that means it was already woken.
|
|
if let Some(w) = opt_waker.take() {
|
|
w.wake();
|
|
|
|
blocked.waker_count -= 1;
|
|
if blocked.waker_count == 0 {
|
|
self.is_empty.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Notifies all blocked channel operations.
|
|
#[inline]
|
|
fn notify_all(&self) {
|
|
if !self.is_empty.load(Ordering::SeqCst) {
|
|
let mut blocked = self.blocked.lock();
|
|
|
|
for (_, opt_waker) in blocked.entries.iter_mut() {
|
|
// If there is no waker in this entry, that means it was already woken.
|
|
if let Some(w) = opt_waker.take() {
|
|
w.wake();
|
|
}
|
|
}
|
|
|
|
blocked.waker_count = 0;
|
|
self.is_empty.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A simple spinlock.
|
|
struct Spinlock<T> {
|
|
flag: AtomicBool,
|
|
value: UnsafeCell<T>,
|
|
}
|
|
|
|
impl<T> Spinlock<T> {
|
|
/// Returns a new spinlock initialized with `value`.
|
|
fn new(value: T) -> Spinlock<T> {
|
|
Spinlock {
|
|
flag: AtomicBool::new(false),
|
|
value: UnsafeCell::new(value),
|
|
}
|
|
}
|
|
|
|
/// Locks the spinlock.
|
|
fn lock(&self) -> SpinlockGuard<'_, T> {
|
|
let backoff = Backoff::new();
|
|
while self.flag.swap(true, Ordering::Acquire) {
|
|
backoff.snooze();
|
|
}
|
|
SpinlockGuard { parent: self }
|
|
}
|
|
}
|
|
|
|
/// A guard holding a spinlock locked.
|
|
struct SpinlockGuard<'a, T> {
|
|
parent: &'a Spinlock<T>,
|
|
}
|
|
|
|
impl<'a, T> Drop for SpinlockGuard<'a, T> {
|
|
fn drop(&mut self) {
|
|
self.parent.flag.store(false, Ordering::Release);
|
|
}
|
|
}
|
|
|
|
impl<'a, T> Deref for SpinlockGuard<'a, T> {
|
|
type Target = T;
|
|
|
|
fn deref(&self) -> &T {
|
|
unsafe { &*self.parent.value.get() }
|
|
}
|
|
}
|
|
|
|
impl<'a, T> DerefMut for SpinlockGuard<'a, T> {
|
|
fn deref_mut(&mut self) -> &mut T {
|
|
unsafe { &mut *self.parent.value.get() }
|
|
}
|
|
}
|