diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 565134dc..d47eabce 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: check - args: --all --benches --bins --examples --tests + args: --all --bins --examples - name: check unstable uses: actions-rs/cargo@v1 diff --git a/Cargo.toml b/Cargo.toml index e353386d..7aaba687 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ pin-project-lite = "0.1" [dev-dependencies] femme = "1.2.0" +rand = "0.7.2" # surf = "1.0.2" tempdir = "0.3.7" futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } diff --git a/src/sync/channel.rs b/src/sync/channel.rs new file mode 100644 index 00000000..6751417a --- /dev/null +++ b/src/sync/channel.rs @@ -0,0 +1,1132 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::future::Future; +use std::isize; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::process; +use std::ptr; +use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use crossbeam_utils::{Backoff, CachePadded}; +use futures_core::stream::Stream; +use slab::Slab; + +/// Creates a bounded multi-producer multi-consumer channel. +/// +/// This channel has a buffer that can hold at most `cap` messages at a time. +/// +/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it +/// becomes closed. Receive operations on a closed and empty channel return `None` instead of +/// blocking. +/// +/// # Panics +/// +/// If `cap` is zero, this function will panic. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use std::time::Duration; +/// +/// use async_std::sync::channel; +/// use async_std::task; +/// +/// let (s, r) = channel(1); +/// +/// // This call returns immediately because there is enough space in the channel. +/// s.send(1).await; +/// +/// task::spawn(async move { +/// // This call blocks the current task because the channel is full. +/// // It will be able to complete only after the first message is received. +/// s.send(2).await; +/// }); +/// +/// task::sleep(Duration::from_secs(1)).await; +/// assert_eq!(r.recv().await, Some(1)); +/// assert_eq!(r.recv().await, Some(2)); +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub fn channel(cap: usize) -> (Sender, Receiver) { + let channel = Arc::new(Channel::with_capacity(cap)); + let s = Sender { + channel: channel.clone(), + }; + let r = Receiver { + channel, + opt_key: None, + }; + (s, r) +} + +/// The sending side of a channel. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::sync::channel; +/// use async_std::task; +/// +/// let (s1, r) = channel(100); +/// let s2 = s1.clone(); +/// +/// task::spawn(async move { s1.send(1).await }); +/// task::spawn(async move { s2.send(2).await }); +/// +/// let msg1 = r.recv().await.unwrap(); +/// let msg2 = r.recv().await.unwrap(); +/// +/// assert_eq!(msg1 + msg2, 3); +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub struct Sender { + /// The inner channel. + channel: Arc>, +} + +impl Sender { + /// Sends a message into the channel. + /// + /// If the channel is full, this method will block the current task until the send operation + /// can proceed. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// use async_std::task; + /// + /// let (s, r) = channel(1); + /// + /// task::spawn(async move { + /// s.send(1).await; + /// s.send(2).await; + /// }); + /// + /// assert_eq!(r.recv().await, Some(1)); + /// assert_eq!(r.recv().await, Some(2)); + /// assert_eq!(r.recv().await, None); + /// # + /// # }) + /// ``` + pub async fn send(&self, msg: T) { + struct SendFuture<'a, T> { + sender: &'a Sender, + msg: Option, + opt_key: Option, + } + + impl Unpin for SendFuture<'_, T> {} + + impl Future for SendFuture<'_, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let msg = self.msg.take().unwrap(); + + // Try sending the message. + let poll = match self.sender.channel.push(msg) { + Ok(()) => Poll::Ready(()), + Err(PushError::Disconnected(msg)) => { + self.msg = Some(msg); + Poll::Pending + } + Err(PushError::Full(msg)) => { + // Register the current task. + match self.opt_key { + None => self.opt_key = Some(self.sender.channel.sends.register(cx)), + Some(key) => self.sender.channel.sends.reregister(key, cx), + } + + // Try sending the message again. + match self.sender.channel.push(msg) { + Ok(()) => Poll::Ready(()), + Err(PushError::Disconnected(msg)) | Err(PushError::Full(msg)) => { + self.msg = Some(msg); + Poll::Pending + } + } + } + }; + + if poll.is_ready() { + // If the current task was registered, unregister now. + if let Some(key) = self.opt_key.take() { + // `true` means the send operation is completed. + self.sender.channel.sends.unregister(key, true); + } + } + + poll + } + } + + impl Drop for SendFuture<'_, T> { + fn drop(&mut self) { + // If the current task was registered, unregister now. + if let Some(key) = self.opt_key { + // `false` means the send operation is cancelled. + self.sender.channel.sends.unregister(key, false); + } + } + } + + SendFuture { + sender: self, + msg: Some(msg), + opt_key: None, + } + .await + } + + /// Returns the channel capacity. + /// + /// # Examples + /// + /// ``` + /// use async_std::sync::channel; + /// + /// let (s, _) = channel::(5); + /// assert_eq!(s.capacity(), 5); + /// ``` + pub fn capacity(&self) -> usize { + self.channel.cap + } + + /// Returns `true` if the channel is empty. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// assert!(s.is_empty()); + /// s.send(0).await; + /// assert!(!s.is_empty()); + /// # + /// # }) + /// ``` + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns `true` if the channel is full. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// assert!(!s.is_full()); + /// s.send(0).await; + /// assert!(s.is_full()); + /// # + /// # }) + /// ``` + pub fn is_full(&self) -> bool { + self.channel.is_full() + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(2); + /// assert_eq!(s.len(), 0); + /// + /// s.send(1).await; + /// s.send(2).await; + /// assert_eq!(s.len(), 2); + /// # + /// # }) + /// ``` + pub fn len(&self) -> usize { + self.channel.len() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + // Decrement the sender count and disconnect the channel if it drops down to zero. + if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.disconnect(); + } + } +} + +impl Clone for Sender { + fn clone(&self) -> Sender { + let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); + + // Make sure the count never overflows, even if lots of sender clones are leaked. + if count > isize::MAX as usize { + process::abort(); + } + + Sender { + channel: self.channel.clone(), + } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Sender { .. }") + } +} + +/// The receiving side of a channel. +/// +/// This type implements the [`Stream`] trait, which means it can act as an asynchronous iterator. +/// +/// [`Stream`]: ../stream/trait.Stream.html +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use std::time::Duration; +/// +/// use async_std::sync::channel; +/// use async_std::task; +/// +/// let (s, r) = channel(100); +/// +/// task::spawn(async move { +/// s.send(1).await; +/// task::sleep(Duration::from_secs(1)).await; +/// s.send(2).await; +/// }); +/// +/// assert_eq!(r.recv().await, Some(1)); // Received immediately. +/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second. +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub struct Receiver { + /// The inner channel. + channel: Arc>, + + /// The registration key for this receiver in the `channel.streams` registry. + opt_key: Option, +} + +impl Receiver { + /// Receives a message from the channel. + /// + /// If the channel is empty and it still has senders, this method will block the current task + /// until the receive operation can proceed. If the channel is empty and there are no more + /// senders, this method returns `None`. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// use async_std::task; + /// + /// let (s, r) = channel(1); + /// + /// task::spawn(async move { + /// s.send(1).await; + /// s.send(2).await; + /// }); + /// + /// assert_eq!(r.recv().await, Some(1)); + /// assert_eq!(r.recv().await, Some(2)); + /// assert_eq!(r.recv().await, None); + /// # + /// # }) + /// ``` + pub async fn recv(&self) -> Option { + struct RecvFuture<'a, T> { + channel: &'a Channel, + opt_key: Option, + } + + impl Future for RecvFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + poll_recv(&self.channel, &self.channel.recvs, &mut self.opt_key, cx) + } + } + + impl Drop for RecvFuture<'_, T> { + fn drop(&mut self) { + // If the current task was registered, unregister now. + if let Some(key) = self.opt_key { + // `false` means the receive operation is cancelled. + self.channel.recvs.unregister(key, false); + } + } + } + + RecvFuture { + channel: &self.channel, + opt_key: None, + } + .await + } + + /// Returns the channel capacity. + /// + /// # Examples + /// + /// ``` + /// use async_std::sync::channel; + /// + /// let (_, r) = channel::(5); + /// assert_eq!(r.capacity(), 5); + /// ``` + pub fn capacity(&self) -> usize { + self.channel.cap + } + + /// Returns `true` if the channel is empty. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// assert!(r.is_empty()); + /// s.send(0).await; + /// assert!(!r.is_empty()); + /// # + /// # }) + /// ``` + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns `true` if the channel is full. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// assert!(!r.is_full()); + /// s.send(0).await; + /// assert!(r.is_full()); + /// # + /// # }) + /// ``` + pub fn is_full(&self) -> bool { + self.channel.is_full() + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(2); + /// assert_eq!(r.len(), 0); + /// + /// s.send(1).await; + /// s.send(2).await; + /// assert_eq!(r.len(), 2); + /// # + /// # }) + /// ``` + pub fn len(&self) -> usize { + self.channel.len() + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + // If the current task was registered as blocked on this stream, unregister now. + if let Some(key) = self.opt_key { + // `false` means the last request for a stream item is cancelled. + self.channel.streams.unregister(key, false); + } + + // Decrement the receiver count and disconnect the channel if it drops down to zero. + if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.disconnect(); + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Receiver { + let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); + + // Make sure the count never overflows, even if lots of receiver clones are leaked. + if count > isize::MAX as usize { + process::abort(); + } + + Receiver { + channel: self.channel.clone(), + opt_key: None, + } + } +} + +impl Stream for Receiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + poll_recv(&this.channel, &this.channel.streams, &mut this.opt_key, cx) + } +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Receiver { .. }") + } +} + +/// Polls a receive operation on a channel. +/// +/// If the receive operation is blocked, the current task will be registered in `registry` and its +/// registration key will then be stored in `opt_key`. +fn poll_recv( + channel: &Channel, + registry: &Registry, + opt_key: &mut Option, + cx: &mut Context<'_>, +) -> Poll> { + // Try receiving a message. + let poll = match channel.pop() { + Ok(msg) => Poll::Ready(Some(msg)), + Err(PopError::Disconnected) => Poll::Ready(None), + Err(PopError::Empty) => { + // Register the current task. + match *opt_key { + None => *opt_key = Some(registry.register(cx)), + Some(key) => registry.reregister(key, cx), + } + + // Try receiving a message again. + match channel.pop() { + Ok(msg) => Poll::Ready(Some(msg)), + Err(PopError::Disconnected) => Poll::Ready(None), + Err(PopError::Empty) => Poll::Pending, + } + } + }; + + if poll.is_ready() { + // If the current task was registered, unregister now. + if let Some(key) = opt_key.take() { + // `true` means the receive operation is completed. + registry.unregister(key, true); + } + } + + poll +} + +/// A slot in a channel. +struct Slot { + /// The current stamp. + stamp: AtomicUsize, + + /// The message in this slot. + msg: UnsafeCell, +} + +/// Bounded channel based on a preallocated array. +struct Channel { + /// The head of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit in the head is always zero. + /// + /// Messages are popped from the head of the channel. + head: CachePadded, + + /// The tail of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit indicates that the channel is disconnected. + /// + /// Messages are pushed into the tail of the channel. + tail: CachePadded, + + /// The buffer holding slots. + buffer: *mut Slot, + + /// The channel capacity. + cap: usize, + + /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. + one_lap: usize, + + /// If this bit is set in the tail, that means either all senders were dropped or all receivers + /// were dropped. + mark_bit: usize, + + /// Send operations waiting while the channel is full. + sends: Registry, + + /// Receive operations waiting while the channel is empty and not disconnected. + recvs: Registry, + + /// Streams waiting while the channel is empty and not disconnected. + streams: Registry, + + /// The number of currently active `Sender`s. + sender_count: AtomicUsize, + + /// The number of currently active `Receivers`s. + receiver_count: AtomicUsize, + + /// Indicates that dropping a `Channel` may drop values of type `T`. + _marker: PhantomData, +} + +unsafe impl Send for Channel {} +unsafe impl Sync for Channel {} +impl Unpin for Channel {} + +impl Channel { + /// Creates a bounded channel of capacity `cap`. + fn with_capacity(cap: usize) -> Self { + assert!(cap > 0, "capacity must be positive"); + + // Compute constants `mark_bit` and `one_lap`. + let mark_bit = (cap + 1).next_power_of_two(); + let one_lap = mark_bit * 2; + + // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let head = 0; + // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let tail = 0; + + // Allocate a buffer of `cap` slots. + let buffer = { + let mut v = Vec::>::with_capacity(cap); + let ptr = v.as_mut_ptr(); + mem::forget(v); + ptr + }; + + // Initialize stamps in the slots. + for i in 0..cap { + unsafe { + // Set the stamp to `{ lap: 0, mark: 0, index: i }`. + let slot = buffer.add(i); + ptr::write(&mut (*slot).stamp, AtomicUsize::new(i)); + } + } + + Channel { + buffer, + cap, + one_lap, + mark_bit, + head: CachePadded::new(AtomicUsize::new(head)), + tail: CachePadded::new(AtomicUsize::new(tail)), + sends: Registry::new(), + recvs: Registry::new(), + streams: Registry::new(), + sender_count: AtomicUsize::new(1), + receiver_count: AtomicUsize::new(1), + _marker: PhantomData, + } + } + + /// Attempts to push a message. + fn push(&self, msg: T) -> Result<(), PushError> { + let backoff = Backoff::new(); + let mut tail = self.tail.load(Ordering::Relaxed); + + loop { + // Deconstruct the tail. + let index = tail & (self.mark_bit - 1); + let lap = tail & !(self.one_lap - 1); + + // Inspect the corresponding slot. + let slot = unsafe { &*self.buffer.add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the tail and the stamp match, we may attempt to push. + if tail == stamp { + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the tail. + match self.tail.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Write the message into the slot and update the stamp. + unsafe { slot.msg.get().write(msg) }; + let stamp = tail + 1; + slot.stamp.store(stamp, Ordering::Release); + + // Wake a blocked receive operation. + self.recvs.notify_one(); + + // Wake all blocked streams. + self.streams.notify_all(); + + return Ok(()); + } + Err(t) => { + tail = t; + backoff.spin(); + } + } + } else if stamp.wrapping_add(self.one_lap) == tail + 1 { + atomic::fence(Ordering::SeqCst); + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the channel is full. + + // Check if the channel is disconnected. + if tail & self.mark_bit != 0 { + return Err(PushError::Disconnected(msg)); + } else { + return Err(PushError::Full(msg)); + } + } + + backoff.spin(); + tail = self.tail.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + tail = self.tail.load(Ordering::Relaxed); + } + } + } + + /// Attempts to pop a message. + fn pop(&self) -> Result { + let backoff = Backoff::new(); + let mut head = self.head.load(Ordering::Relaxed); + + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + let slot = unsafe { &*self.buffer.add(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the the stamp is ahead of the head by 1, we may attempt to pop. + if head + 1 == stamp { + let new = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the head. + match self.head.compare_exchange_weak( + head, + new, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Read the message from the slot and update the stamp. + let msg = unsafe { slot.msg.get().read() }; + let stamp = head.wrapping_add(self.one_lap); + slot.stamp.store(stamp, Ordering::Release); + + // Wake a blocked send operation. + self.sends.notify_one(); + + return Ok(msg); + } + Err(h) => { + head = h; + backoff.spin(); + } + } + } else if stamp == head { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if (tail & !self.mark_bit) == head { + // If the channel is disconnected... + if tail & self.mark_bit != 0 { + return Err(PopError::Disconnected); + } else { + // Otherwise, the receive operation is not ready. + return Err(PopError::Empty); + } + } + + backoff.spin(); + head = self.head.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.snooze(); + head = self.head.load(Ordering::Relaxed); + } + } + } + + /// Returns the current number of messages inside the channel. + fn len(&self) -> usize { + loop { + // Load the tail, then load the head. + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // If the tail didn't change, we've got consistent values to work with. + if self.tail.load(Ordering::SeqCst) == tail { + let hix = head & (self.mark_bit - 1); + let tix = tail & (self.mark_bit - 1); + + return if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if (tail & !self.mark_bit) == head { + 0 + } else { + self.cap + }; + } + } + } + + /// Returns `true` if the channel is empty. + fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::SeqCst); + let tail = self.tail.load(Ordering::SeqCst); + + // Is the tail equal to the head? + // + // Note: If the head changes just before we load the tail, that means there was a moment + // when the channel was not empty, so it is safe to just return `false`. + (tail & !self.mark_bit) == head + } + + /// Returns `true` if the channel is full. + fn is_full(&self) -> bool { + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // Is the head lagging one lap behind tail? + // + // Note: If the tail changes just before we load the head, that means there was a moment + // when the channel was not full, so it is safe to just return `false`. + head.wrapping_add(self.one_lap) == tail & !self.mark_bit + } + + /// Disconnects the channel and wakes up all blocked operations. + fn disconnect(&self) { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + + if tail & self.mark_bit == 0 { + // Notify everyone blocked on this channel. + self.sends.notify_all(); + self.recvs.notify_all(); + self.streams.notify_all(); + } + } +} + +impl Drop for Channel { + fn drop(&mut self) { + // Get the index of the head. + let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); + + // Loop over all slots that hold a message and drop them. + for i in 0..self.len() { + // Compute the index of the next slot holding a message. + let index = if hix + i < self.cap { + hix + i + } else { + hix + i - self.cap + }; + + unsafe { + self.buffer.add(index).drop_in_place(); + } + } + + // Finally, deallocate the buffer, but don't run any destructors. + unsafe { + Vec::from_raw_parts(self.buffer, 0, self.cap); + } + } +} + +/// An error returned from the `push()` method. +enum PushError { + /// The channel is full but not disconnected. + Full(T), + + /// The channel is full and disconnected. + Disconnected(T), +} + +/// An error returned from the `pop()` method. +enum PopError { + /// The channel is empty but not disconnected. + Empty, + + /// The channel is empty and disconnected. + Disconnected, +} + +/// A list of blocked channel operations. +struct Blocked { + /// A list of registered channel operations. + /// + /// Each entry has a waker associated with the task that is executing the operation. If the + /// waker is set to `None`, that means the task has been woken up but hasn't removed itself + /// from the registry yet. + entries: Slab>, + + /// The number of wakers in the entry list. + waker_count: usize, +} + +/// A registry of blocked channel operations. +/// +/// Blocked operations register themselves in a registry. Successful operations on the opposite +/// side of the channel wake blocked operations in the registry. +struct Registry { + /// A list of blocked channel operations. + blocked: Spinlock, + + /// Set to `true` if there are no wakers in the registry. + /// + /// Note that this either means there are no entries in the registry, or that all entries have + /// been notified. + is_empty: AtomicBool, +} + +impl Registry { + /// Creates a new registry. + fn new() -> Registry { + Registry { + blocked: Spinlock::new(Blocked { + entries: Slab::new(), + waker_count: 0, + }), + is_empty: AtomicBool::new(true), + } + } + + /// Registers a blocked channel operation and returns a key associated with it. + fn register(&self, cx: &Context<'_>) -> usize { + let mut blocked = self.blocked.lock(); + + // Insert a new entry into the list of blocked tasks. + let w = cx.waker().clone(); + let key = blocked.entries.insert(Some(w)); + + blocked.waker_count += 1; + if blocked.waker_count == 1 { + self.is_empty.store(false, Ordering::SeqCst); + } + + key + } + + /// Re-registers a blocked channel operation by filling in its waker. + fn reregister(&self, key: usize, cx: &Context<'_>) { + let mut blocked = self.blocked.lock(); + + let was_none = blocked.entries[key].is_none(); + let w = cx.waker().clone(); + blocked.entries[key] = Some(w); + + if was_none { + blocked.waker_count += 1; + if blocked.waker_count == 1 { + self.is_empty.store(false, Ordering::SeqCst); + } + } + } + + /// Unregisters a channel operation. + /// + /// If `completed` is `true`, the operation will be removed from the registry. If `completed` + /// is `false`, that means the operation was cancelled so another one will be notified. + fn unregister(&self, key: usize, completed: bool) { + let mut blocked = self.blocked.lock(); + let mut removed = false; + + match blocked.entries.remove(key) { + Some(_) => removed = true, + None => { + if !completed { + // This operation was cancelled. Notify another one. + if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + removed = true; + } + } + } + } + } + + if removed { + blocked.waker_count -= 1; + if blocked.waker_count == 0 { + self.is_empty.store(true, Ordering::SeqCst); + } + } + } + + /// Notifies one blocked channel operation. + #[inline] + fn notify_one(&self) { + if !self.is_empty.load(Ordering::SeqCst) { + let mut blocked = self.blocked.lock(); + + if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + + blocked.waker_count -= 1; + if blocked.waker_count == 0 { + self.is_empty.store(true, Ordering::SeqCst); + } + } + } + } + } + + /// Notifies all blocked channel operations. + #[inline] + fn notify_all(&self) { + if !self.is_empty.load(Ordering::SeqCst) { + let mut blocked = self.blocked.lock(); + + for (_, opt_waker) in blocked.entries.iter_mut() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + + blocked.waker_count = 0; + self.is_empty.store(true, Ordering::SeqCst); + } + } +} + +/// A simple spinlock. +struct Spinlock { + flag: AtomicBool, + value: UnsafeCell, +} + +impl Spinlock { + /// Returns a new spinlock initialized with `value`. + fn new(value: T) -> Spinlock { + Spinlock { + flag: AtomicBool::new(false), + value: UnsafeCell::new(value), + } + } + + /// Locks the spinlock. + fn lock(&self) -> SpinlockGuard<'_, T> { + let backoff = Backoff::new(); + while self.flag.swap(true, Ordering::Acquire) { + backoff.snooze(); + } + SpinlockGuard { parent: self } + } +} + +/// A guard holding a spinlock locked. +struct SpinlockGuard<'a, T> { + parent: &'a Spinlock, +} + +impl<'a, T> Drop for SpinlockGuard<'a, T> { + fn drop(&mut self) { + self.parent.flag.store(false, Ordering::Release); + } +} + +impl<'a, T> Deref for SpinlockGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +impl<'a, T> DerefMut for SpinlockGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.parent.value.get() } + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index be74d8f7..3ad2776c 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -40,5 +40,8 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; + pub use channel::{channel, Sender, Receiver}; + mod barrier; + mod channel; } diff --git a/tests/channel.rs b/tests/channel.rs new file mode 100644 index 00000000..91622b0d --- /dev/null +++ b/tests/channel.rs @@ -0,0 +1,350 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_std::sync::channel; +use async_std::task; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + task::block_on(async { + let (s, r) = channel(1); + + s.send(7).await; + assert_eq!(r.recv().await, Some(7)); + + s.send(8).await; + assert_eq!(r.recv().await, Some(8)); + + drop(s); + assert_eq!(r.recv().await, None); + }) +} + +#[test] +fn capacity() { + for i in 1..10 { + let (s, r) = channel::<()>(i); + assert_eq!(s.capacity(), i); + assert_eq!(r.capacity(), i); + } +} + +#[test] +fn len_empty_full() { + task::block_on(async { + let (s, r) = channel(2); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); + + s.send(()).await; + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + + s.send(()).await; + + assert_eq!(s.len(), 2); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), true); + assert_eq!(r.len(), 2); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), true); + + r.recv().await; + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + }) +} + +#[test] +fn recv() { + task::block_on(async { + let (s, r) = channel(100); + + task::spawn(async move { + assert_eq!(r.recv().await, Some(7)); + task::sleep(ms(1000)).await; + assert_eq!(r.recv().await, Some(8)); + task::sleep(ms(1000)).await; + assert_eq!(r.recv().await, Some(9)); + assert_eq!(r.recv().await, None); + }); + + task::sleep(ms(1500)).await; + s.send(7).await; + s.send(8).await; + s.send(9).await; + }) +} + +#[test] +fn send() { + task::block_on(async { + let (s, r) = channel(1); + + task::spawn(async move { + s.send(7).await; + task::sleep(ms(1000)).await; + s.send(8).await; + task::sleep(ms(1000)).await; + s.send(9).await; + task::sleep(ms(1000)).await; + s.send(10).await; + }); + + task::sleep(ms(1500)).await; + assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await, Some(9)); + }) +} + +#[test] +fn recv_after_disconnect() { + task::block_on(async { + let (s, r) = channel(100); + + s.send(1).await; + s.send(2).await; + s.send(3).await; + + drop(s); + + assert_eq!(r.recv().await, Some(1)); + assert_eq!(r.recv().await, Some(2)); + assert_eq!(r.recv().await, Some(3)); + assert_eq!(r.recv().await, None); + }) +} + +#[test] +fn len() { + const COUNT: usize = 25_000; + const CAP: usize = 1000; + + task::block_on(async { + let (s, r) = channel(CAP); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for _ in 0..CAP / 10 { + for i in 0..50 { + s.send(i).await; + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + r.recv().await; + assert_eq!(r.len(), 50 - i - 1); + } + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..CAP { + s.send(i).await; + assert_eq!(s.len(), i + 1); + } + + for _ in 0..CAP { + r.recv().await.unwrap(); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + let child = task::spawn({ + let r = r.clone(); + async move { + for i in 0..COUNT { + assert_eq!(r.recv().await, Some(i)); + let len = r.len(); + assert!(len <= CAP); + } + } + }); + + for i in 0..COUNT { + s.send(i).await; + let len = s.len(); + assert!(len <= CAP); + } + + child.await; + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + }) +} + +#[test] +fn disconnect_wakes_receiver() { + task::block_on(async { + let (s, r) = channel::<()>(1); + + let child = task::spawn(async move { + assert_eq!(r.recv().await, None); + }); + + task::sleep(ms(1000)).await; + drop(s); + + child.await; + }) +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + task::block_on(async { + let (s, r) = channel(3); + + let child = task::spawn(async move { + for i in 0..COUNT { + assert_eq!(r.recv().await, Some(i)); + } + assert_eq!(r.recv().await, None); + }); + + for i in 0..COUNT { + s.send(i).await; + } + drop(s); + + child.await; + }) +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const TASKS: usize = 4; + + task::block_on(async { + let (s, r) = channel::(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + let v = Arc::new(v); + + let mut tasks = Vec::new(); + + for _ in 0..TASKS { + let r = r.clone(); + let v = v.clone(); + tasks.push(task::spawn(async move { + for _ in 0..COUNT { + let n = r.recv().await.unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + })); + } + + for _ in 0..TASKS { + let s = s.clone(); + tasks.push(task::spawn(async move { + for i in 0..COUNT { + s.send(i).await; + } + })); + } + + for t in tasks { + t.await; + } + + for c in v.iter() { + assert_eq!(c.load(Ordering::SeqCst), TASKS); + } + }); +} + +#[test] +fn oneshot() { + const COUNT: usize = 10_000; + + task::block_on(async { + for _ in 0..COUNT { + let (s, r) = channel(1); + + let c1 = task::spawn(async move { r.recv().await.unwrap() }); + let c2 = task::spawn(async move { s.send(0).await }); + + c1.await; + c2.await; + } + }) +} + +#[test] +fn drops() { + const RUNS: usize = 100; + + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..RUNS { + task::block_on(async { + let steps = rng.gen_range(0, 10_000); + let additional = rng.gen_range(0, 50); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = channel::(50); + + let child = task::spawn({ + let r = r.clone(); + async move { + for _ in 0..steps { + r.recv().await.unwrap(); + } + } + }); + + for _ in 0..steps { + s.send(DropCounter).await; + } + + child.await; + + for _ in 0..additional { + s.send(DropCounter).await; + } + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); + }) + } +}