use std::cell::UnsafeCell; use std::error::Error; use std::fmt::{self, Debug, Display}; use std::future::Future; use std::isize; use std::marker::PhantomData; use std::mem; use std::pin::Pin; use std::process; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use crossbeam_utils::Backoff; use crate::stream::Stream; use crate::sync::WakerSet; /// Creates a bounded multi-producer multi-consumer channel. /// /// This channel has a buffer that can hold at most `cap` messages at a time. /// /// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it /// becomes closed. Receive operations on a closed and empty channel return `None` instead of /// trying to await a message. /// /// # Panics /// /// If `cap` is zero, this function will panic. /// /// # Examples /// /// ``` /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; /// /// use async_std::sync::channel; /// use async_std::task; /// /// let (s, r) = channel(1); /// /// // This call returns immediately because there is enough space in the channel. /// s.send(1usize).await; /// /// task::spawn(async move { /// // This call will have to wait because the channel is full. /// // It will be able to complete only after the first message is received. /// s.send(2).await; /// }); /// /// task::sleep(Duration::from_secs(1)).await; /// assert_eq!(r.recv().await?, 1); /// assert_eq!(r.recv().await?, 2); /// # Ok(()) /// # /// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn channel(cap: usize) -> (Sender, Receiver) { let channel = Arc::new(Channel::with_capacity(cap)); let s = Sender { channel: channel.clone(), }; let r = Receiver { channel, opt_key: None, }; (s, r) } /// The sending side of a channel. /// /// This struct is created by the [`channel`] function. See its /// documentation for more. /// /// [`channel`]: fn.channel.html /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// use async_std::task; /// /// let (s1, r) = channel(100); /// let s2 = s1.clone(); /// /// task::spawn(async move { s1.send(1).await }); /// task::spawn(async move { s2.send(2).await }); /// /// let msg1 = r.recv().await.unwrap(); /// let msg2 = r.recv().await.unwrap(); /// /// assert_eq!(msg1 + msg2, 3); /// # /// # }) /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct Sender { /// The inner channel. channel: Arc>, } impl Sender { /// Sends a message into the channel. /// /// If the channel is full, this method will wait until there is space in the channel. /// /// # Examples /// /// ``` /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// use async_std::task; /// /// let (s, r) = channel(1); /// /// task::spawn(async move { /// s.send(1).await; /// s.send(2).await; /// }); /// /// assert_eq!(r.recv().await?, 1); /// assert_eq!(r.recv().await?, 2); /// assert!(r.recv().await.is_err()); /// # /// # Ok(()) /// # }) } /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { channel: &'a Channel, msg: Option, opt_key: Option, } impl Unpin for SendFuture<'_, T> {} impl Future for SendFuture<'_, T> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { let msg = self.msg.take().unwrap(); // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { self.channel.send_wakers.remove(key); } // Try sending the message. match self.channel.try_send(msg) { Ok(()) => return Poll::Ready(()), Err(TrySendError::Disconnected(msg)) => { self.msg = Some(msg); return Poll::Pending; } Err(TrySendError::Full(msg)) => { self.msg = Some(msg); // Insert this send operation. self.opt_key = Some(self.channel.send_wakers.insert(cx)); // If the channel is still full and not disconnected, return. if self.channel.is_full() && !self.channel.is_disconnected() { return Poll::Pending; } } } } } } impl Drop for SendFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the set, that means it is being cancelled now. // Wake up another task instead. if let Some(key) = self.opt_key { self.channel.send_wakers.cancel(key); } } } SendFuture { channel: &self.channel, msg: Some(msg), opt_key: None, } .await } /// Attempts to send a message into the channel. /// /// If the channel is full, this method will return an error. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// assert!(s.try_send(1).is_ok()); /// assert!(s.try_send(2).is_err()); /// # /// # }) /// ``` pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { self.channel.try_send(msg) } /// Returns the channel capacity. /// /// # Examples /// /// ``` /// use async_std::sync::channel; /// /// let (s, _) = channel::(5); /// assert_eq!(s.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { self.channel.cap } /// Returns `true` if the channel is empty. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// /// assert!(s.is_empty()); /// s.send(0).await; /// assert!(!s.is_empty()); /// # /// # }) /// ``` pub fn is_empty(&self) -> bool { self.channel.is_empty() } /// Returns `true` if the channel is full. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// /// assert!(!s.is_full()); /// s.send(0).await; /// assert!(s.is_full()); /// # /// # }) /// ``` pub fn is_full(&self) -> bool { self.channel.is_full() } /// Returns the number of messages in the channel. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(2); /// assert_eq!(s.len(), 0); /// /// s.send(1).await; /// s.send(2).await; /// assert_eq!(s.len(), 2); /// # /// # }) /// ``` pub fn len(&self) -> usize { self.channel.len() } } impl Drop for Sender { fn drop(&mut self) { // Decrement the sender count and disconnect the channel if it drops down to zero. if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 { self.channel.disconnect(); } } } impl Clone for Sender { fn clone(&self) -> Sender { let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); // Make sure the count never overflows, even if lots of sender clones are leaked. if count > isize::MAX as usize { process::abort(); } Sender { channel: self.channel.clone(), } } } impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Sender { .. }") } } /// The receiving side of a channel. /// /// This type receives messages by calling `recv`. But it also implements the [`Stream`] trait, /// which means it can act as an asynchronous iterator. This struct is created by the [`channel`] /// function. See its documentation for more. /// /// [`channel`]: fn.channel.html /// [`Stream`]: ../stream/trait.Stream.html /// /// # Examples /// /// ``` /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; /// /// use async_std::sync::channel; /// use async_std::task; /// /// let (s, r) = channel(100); /// /// task::spawn(async move { /// s.send(1usize).await; /// task::sleep(Duration::from_secs(1)).await; /// s.send(2).await; /// }); /// /// assert_eq!(r.recv().await?, 1); // Received immediately. /// assert_eq!(r.recv().await?, 2); // Received after 1 second. /// # /// # Ok(()) /// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct Receiver { /// The inner channel. channel: Arc>, /// The key for this receiver in the `channel.stream_wakers` set. opt_key: Option, } impl Receiver { /// Receives a message from the channel. /// /// If the channel is empty and still has senders, this method /// will wait until a message is sent into it. Once all senders /// have been dropped it will return `None`. /// /// # Examples /// /// ``` /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// use async_std::task; /// /// let (s, r) = channel(1); /// /// task::spawn(async move { /// s.send(1usize).await; /// s.send(2).await; /// // Then we drop the sender /// }); /// /// assert_eq!(r.recv().await?, 1); /// assert_eq!(r.recv().await?, 2); /// assert!(r.recv().await.is_err()); /// # /// # Ok(()) /// # }) } /// ``` pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> { channel: &'a Channel, opt_key: Option, } impl Future for RecvFuture<'_, T> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv( &self.channel, &self.channel.recv_wakers, &mut self.opt_key, cx, ) } } impl Drop for RecvFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { self.channel.recv_wakers.cancel(key); } } } RecvFuture { channel: &self.channel, opt_key: None, } .await } /// Attempts to receive a message from the channel. /// /// If the channel is empty, this method will return an error. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// /// s.send(1u8).await; /// /// assert!(r.try_recv().is_ok()); /// assert!(r.try_recv().is_err()); /// # /// # }) /// ``` pub fn try_recv(&self) -> Result { self.channel.try_recv() } /// Returns the channel capacity. /// /// # Examples /// /// ``` /// use async_std::sync::channel; /// /// let (_, r) = channel::(5); /// assert_eq!(r.capacity(), 5); /// ``` pub fn capacity(&self) -> usize { self.channel.cap } /// Returns `true` if the channel is empty. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// /// assert!(r.is_empty()); /// s.send(0).await; /// assert!(!r.is_empty()); /// # /// # }) /// ``` pub fn is_empty(&self) -> bool { self.channel.is_empty() } /// Returns `true` if the channel is full. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(1); /// /// assert!(!r.is_full()); /// s.send(0).await; /// assert!(r.is_full()); /// # /// # }) /// ``` pub fn is_full(&self) -> bool { self.channel.is_full() } /// Returns the number of messages in the channel. /// /// # Examples /// /// ``` /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; /// /// let (s, r) = channel(2); /// assert_eq!(r.len(), 0); /// /// s.send(1).await; /// s.send(2).await; /// assert_eq!(r.len(), 2); /// # /// # }) /// ``` pub fn len(&self) -> usize { self.channel.len() } } impl Drop for Receiver { fn drop(&mut self) { // If the current task is still in the stream set, that means it is being cancelled now. if let Some(key) = self.opt_key { self.channel.stream_wakers.cancel(key); } // Decrement the receiver count and disconnect the channel if it drops down to zero. if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { self.channel.disconnect(); } } } impl Clone for Receiver { fn clone(&self) -> Receiver { let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); // Make sure the count never overflows, even if lots of receiver clones are leaked. if count > isize::MAX as usize { process::abort(); } Receiver { channel: self.channel.clone(), opt_key: None, } } } impl Stream for Receiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let res = futures_core::ready!(poll_recv( &this.channel, &this.channel.stream_wakers, &mut this.opt_key, cx, )); Poll::Ready(res.ok()) } } impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Receiver { .. }") } } /// Polls a receive operation on a channel. /// /// If the receive operation is blocked, the current task will be inserted into `wakers` and its /// associated key will then be stored in `opt_key`. fn poll_recv( channel: &Channel, wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, ) -> Poll> { loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { wakers.remove(key); } // Try receiving a message. match channel.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})), Err(TryRecvError::Empty) => { // Insert this receive operation. *opt_key = Some(wakers.insert(cx)); // If the channel is still empty and not disconnected, return. if channel.is_empty() && !channel.is_disconnected() { return Poll::Pending; } } } } } /// A slot in a channel. struct Slot { /// The current stamp. stamp: AtomicUsize, /// The message in this slot. msg: UnsafeCell, } /// Bounded channel based on a preallocated array. struct Channel { /// The head of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but /// packed into a single `usize`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit in the head is always zero. /// /// Messages are popped from the head of the channel. head: AtomicUsize, /// The tail of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but /// packed into a single `usize`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit indicates that the channel is disconnected. /// /// Messages are pushed into the tail of the channel. tail: AtomicUsize, /// The buffer holding slots. buffer: *mut Slot, /// The channel capacity. cap: usize, /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. one_lap: usize, /// If this bit is set in the tail, that means either all senders were dropped or all receivers /// were dropped. mark_bit: usize, /// Send operations waiting while the channel is full. send_wakers: WakerSet, /// Receive operations waiting while the channel is empty and not disconnected. recv_wakers: WakerSet, /// Streams waiting while the channel is empty and not disconnected. stream_wakers: WakerSet, /// The number of currently active `Sender`s. sender_count: AtomicUsize, /// The number of currently active `Receivers`s. receiver_count: AtomicUsize, /// Indicates that dropping a `Channel` may drop values of type `T`. _marker: PhantomData, } unsafe impl Send for Channel {} unsafe impl Sync for Channel {} impl Unpin for Channel {} impl Channel { /// Creates a bounded channel of capacity `cap`. fn with_capacity(cap: usize) -> Self { assert!(cap > 0, "capacity must be positive"); // Compute constants `mark_bit` and `one_lap`. let mark_bit = (cap + 1).next_power_of_two(); let one_lap = mark_bit * 2; // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. let head = 0; // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. let tail = 0; // Allocate a buffer of `cap` slots. let buffer = { let mut v = Vec::>::with_capacity(cap); let ptr = v.as_mut_ptr(); mem::forget(v); ptr }; // Initialize stamps in the slots. for i in 0..cap { unsafe { // Set the stamp to `{ lap: 0, mark: 0, index: i }`. let slot = buffer.add(i); ptr::write(&mut (*slot).stamp, AtomicUsize::new(i)); } } Channel { buffer, cap, one_lap, mark_bit, head: AtomicUsize::new(head), tail: AtomicUsize::new(tail), send_wakers: WakerSet::new(), recv_wakers: WakerSet::new(), stream_wakers: WakerSet::new(), sender_count: AtomicUsize::new(1), receiver_count: AtomicUsize::new(1), _marker: PhantomData, } } /// Attempts to send a message. fn try_send(&self, msg: T) -> Result<(), TrySendError> { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); loop { // Extract mark bit from the tail and unset it. // // If the mark bit was set (which means all receivers have been dropped), we will still // send the message into the channel if there is enough capacity. The message will get // dropped when the channel is dropped (which means when all senders are also dropped). let mark_bit = tail & self.mark_bit; tail ^= mark_bit; // Deconstruct the tail. let index = tail & (self.mark_bit - 1); let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. let slot = unsafe { &*self.buffer.add(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. if tail == stamp { let new_tail = if index + 1 < self.cap { // Same lap, incremented index. // Set to `{ lap: lap, mark: 0, index: index + 1 }`. tail + 1 } else { // One lap forward, index wraps around to zero. // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. lap.wrapping_add(self.one_lap) }; // Try moving the tail. match self.tail.compare_exchange_weak( tail | mark_bit, new_tail | mark_bit, Ordering::SeqCst, Ordering::Relaxed, ) { Ok(_) => { // Write the message into the slot and update the stamp. unsafe { slot.msg.get().write(msg) }; let stamp = tail + 1; slot.stamp.store(stamp, Ordering::Release); // Wake a blocked receive operation. self.recv_wakers.notify_one(); // Wake all blocked streams. self.stream_wakers.notify_all(); return Ok(()); } Err(t) => { tail = t; backoff.spin(); } } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { atomic::fence(Ordering::SeqCst); let head = self.head.load(Ordering::Relaxed); // If the head lags one lap behind the tail as well... if head.wrapping_add(self.one_lap) == tail { // ...then the channel is full. // Check if the channel is disconnected. if mark_bit != 0 { return Err(TrySendError::Disconnected(msg)); } else { return Err(TrySendError::Full(msg)); } } backoff.spin(); tail = self.tail.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. backoff.snooze(); tail = self.tail.load(Ordering::Relaxed); } } } /// Attempts to receive a message. fn try_recv(&self) -> Result { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); loop { // Deconstruct the head. let index = head & (self.mark_bit - 1); let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. let slot = unsafe { &*self.buffer.add(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the the stamp is ahead of the head by 1, we may attempt to pop. if head + 1 == stamp { let new = if index + 1 < self.cap { // Same lap, incremented index. // Set to `{ lap: lap, mark: 0, index: index + 1 }`. head + 1 } else { // One lap forward, index wraps around to zero. // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. lap.wrapping_add(self.one_lap) }; // Try moving the head. match self.head.compare_exchange_weak( head, new, Ordering::SeqCst, Ordering::Relaxed, ) { Ok(_) => { // Read the message from the slot and update the stamp. let msg = unsafe { slot.msg.get().read() }; let stamp = head.wrapping_add(self.one_lap); slot.stamp.store(stamp, Ordering::Release); // Wake a blocked send operation. self.send_wakers.notify_one(); return Ok(msg); } Err(h) => { head = h; backoff.spin(); } } } else if stamp == head { atomic::fence(Ordering::SeqCst); let tail = self.tail.load(Ordering::Relaxed); // If the tail equals the head, that means the channel is empty. if (tail & !self.mark_bit) == head { // If the channel is disconnected... if tail & self.mark_bit != 0 { return Err(TryRecvError::Disconnected); } else { // Otherwise, the receive operation is not ready. return Err(TryRecvError::Empty); } } backoff.spin(); head = self.head.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. backoff.snooze(); head = self.head.load(Ordering::Relaxed); } } } /// Returns the current number of messages inside the channel. fn len(&self) -> usize { loop { // Load the tail, then load the head. let tail = self.tail.load(Ordering::SeqCst); let head = self.head.load(Ordering::SeqCst); // If the tail didn't change, we've got consistent values to work with. if self.tail.load(Ordering::SeqCst) == tail { let hix = head & (self.mark_bit - 1); let tix = tail & (self.mark_bit - 1); return if hix < tix { tix - hix } else if hix > tix { self.cap - hix + tix } else if (tail & !self.mark_bit) == head { 0 } else { self.cap }; } } } /// Returns `true` if the channel is disconnected. pub fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 } /// Returns `true` if the channel is empty. fn is_empty(&self) -> bool { let head = self.head.load(Ordering::SeqCst); let tail = self.tail.load(Ordering::SeqCst); // Is the tail equal to the head? // // Note: If the head changes just before we load the tail, that means there was a moment // when the channel was not empty, so it is safe to just return `false`. (tail & !self.mark_bit) == head } /// Returns `true` if the channel is full. fn is_full(&self) -> bool { let tail = self.tail.load(Ordering::SeqCst); let head = self.head.load(Ordering::SeqCst); // Is the head lagging one lap behind tail? // // Note: If the tail changes just before we load the head, that means there was a moment // when the channel was not full, so it is safe to just return `false`. head.wrapping_add(self.one_lap) == tail & !self.mark_bit } /// Disconnects the channel and wakes up all blocked operations. fn disconnect(&self) { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { // Notify everyone blocked on this channel. self.send_wakers.notify_all(); self.recv_wakers.notify_all(); self.stream_wakers.notify_all(); } } } impl Drop for Channel { fn drop(&mut self) { // Get the index of the head. let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); // Loop over all slots that hold a message and drop them. for i in 0..self.len() { // Compute the index of the next slot holding a message. let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; unsafe { self.buffer.add(index).drop_in_place(); } } // Finally, deallocate the buffer, but don't run any destructors. unsafe { Vec::from_raw_parts(self.buffer, 0, self.cap); } } } /// An error returned from the `try_send` method. #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub enum TrySendError { /// The channel is full but not disconnected. Full(T), /// The channel is full and disconnected. Disconnected(T), } impl Error for TrySendError {} impl Debug for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Full(_) => Debug::fmt("Full", f), Self::Disconnected(_) => Debug::fmt("Disconnected", f), } } } impl Display for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Full(_) => Display::fmt("The channel is full.", f), Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f), } } } /// An error returned from the `try_recv` method. #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub enum TryRecvError { /// The channel is empty but not disconnected. Empty, /// The channel is empty and disconnected. Disconnected, } impl Error for TryRecvError {} impl Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Empty => Display::fmt("The channel is empty.", f), Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f), } } } /// An error returned from the `recv` method. #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub struct RecvError; impl Error for RecvError {} impl Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { Display::fmt("The channel is empty.", f) } }