diff --git a/src/sync/channel.rs b/src/sync/channel.rs index a957bec..8ab1cc1 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -1,5 +1,6 @@ use std::cell::UnsafeCell; -use std::fmt; +use std::error::Error; +use std::fmt::{self, Debug, Display}; use std::future::Future; use std::isize; use std::marker::PhantomData; @@ -31,6 +32,7 @@ use crate::sync::WakerSet; /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -50,10 +52,11 @@ use crate::sync::WakerSet; /// }); /// /// task::sleep(Duration::from_secs(1)).await; -/// assert_eq!(r.recv().await, Some(1)); -/// assert_eq!(r.recv().await, Some(2)); +/// assert_eq!(r.recv().await?, 1); +/// assert_eq!(r.recv().await?, 2); +/// # Ok(()) /// # -/// # }) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -112,6 +115,7 @@ impl Sender { /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -124,11 +128,12 @@ impl Sender { /// s.send(2).await; /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); + /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { @@ -192,6 +197,27 @@ impl Sender { .await } + /// Attempts to send a message into the channel. + /// + /// If the channel is full, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// assert!(s.try_send(1).is_ok()); + /// assert!(s.try_send(2).is_err()); + /// # + /// # }) + /// ``` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + self.channel.try_send(msg) + } + /// Returns the channel capacity. /// /// # Examples @@ -313,6 +339,7 @@ impl fmt::Debug for Sender { /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -328,10 +355,11 @@ impl fmt::Debug for Sender { /// s.send(2).await; /// }); /// -/// assert_eq!(r.recv().await, Some(1)); // Received immediately. -/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second. +/// assert_eq!(r.recv().await?, 1); // Received immediately. +/// assert_eq!(r.recv().await?, 2); // Received after 1 second. /// # -/// # }) +/// # Ok(()) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -353,6 +381,7 @@ impl Receiver { /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -366,22 +395,21 @@ impl Receiver { /// // Then we drop the sender /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// - /// // recv() returns `None` - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); + /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` - pub async fn recv(&self) -> Option { + pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> { channel: &'a Channel, opt_key: Option, } impl Future for RecvFuture<'_, T> { - type Output = Option; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv( @@ -409,6 +437,30 @@ impl Receiver { .await } + /// Attempts to receive a message from the channel. + /// + /// If the channel is empty, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// s.send(1u8).await; + /// + /// assert!(r.try_recv().is_ok()); + /// assert!(r.try_recv().is_err()); + /// # + /// # }) + /// ``` + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } + /// Returns the channel capacity. /// /// # Examples @@ -523,12 +575,13 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv( + let res = futures_core::ready!(poll_recv( &this.channel, &this.channel.stream_wakers, &mut this.opt_key, cx, - ) + )); + Poll::Ready(res.ok()) } } @@ -547,7 +600,7 @@ fn poll_recv( wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll> { loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { @@ -556,8 +609,8 @@ fn poll_recv( // Try receiving a message. match channel.try_recv() { - Ok(msg) => return Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})), Err(TryRecvError::Empty) => { // Insert this receive operation. *opt_key = Some(wakers.insert(cx)); @@ -936,8 +989,10 @@ impl Drop for Channel { } } -/// An error returned from the `try_send()` method. -enum TrySendError { +/// An error returned from the `try_send` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -945,11 +1000,59 @@ enum TrySendError { Disconnected(T), } -/// An error returned from the `try_recv()` method. -enum TryRecvError { +impl Error for TrySendError {} + +impl Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Debug::fmt("Full", f), + Self::Disconnected(_) => Debug::fmt("Disconnected", f), + } + } +} + +impl Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Display::fmt("The channel is full.", f), + Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f), + } + } +} + +/// An error returned from the `try_recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub enum TryRecvError { /// The channel is empty but not disconnected. Empty, /// The channel is empty and disconnected. Disconnected, } + +impl Error for TryRecvError {} + +impl Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Empty => Display::fmt("The channel is empty.", f), + Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f), + } + } +} + +/// An error returned from the `recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct RecvError; + +impl Error for RecvError {} + +impl Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt("The channel is empty.", f) + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 088c520..c221165 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -184,7 +184,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver}; + pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; mod barrier; mod channel; diff --git a/tests/channel.rs b/tests/channel.rs index 34bd888..f302906 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -18,13 +18,13 @@ fn smoke() { let (s, r) = channel(1); s.send(7).await; - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); s.send(8).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); drop(s); - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::block_on(async { @@ -74,7 +74,7 @@ fn len_empty_full() { assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), true); - r.recv().await; + let _ = r.recv().await; assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); @@ -91,12 +91,12 @@ fn recv() { let (s, r) = channel(100); task::spawn(async move { - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(9)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 9); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1500)).await; @@ -122,9 +122,9 @@ fn send() { }); task::sleep(ms(1500)).await; - assert_eq!(r.recv().await, Some(7)); - assert_eq!(r.recv().await, Some(8)); - assert_eq!(r.recv().await, Some(9)); + assert_eq!(r.recv().await.unwrap(), 7); + assert_eq!(r.recv().await.unwrap(), 8); + assert_eq!(r.recv().await.unwrap(), 9); }) } @@ -139,10 +139,10 @@ fn recv_after_disconnect() { drop(s); - assert_eq!(r.recv().await, Some(1)); - assert_eq!(r.recv().await, Some(2)); - assert_eq!(r.recv().await, Some(3)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 1); + assert_eq!(r.recv().await.unwrap(), 2); + assert_eq!(r.recv().await.unwrap(), 3); + assert!(r.recv().await.is_err()); }) } @@ -164,7 +164,7 @@ fn len() { } for i in 0..50 { - r.recv().await; + let _ = r.recv().await; assert_eq!(r.len(), 50 - i - 1); } } @@ -188,7 +188,7 @@ fn len() { let r = r.clone(); async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); let len = r.len(); assert!(len <= CAP); } @@ -214,7 +214,7 @@ fn disconnect_wakes_receiver() { let (s, r) = channel::<()>(1); let child = task::spawn(async move { - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1000)).await; @@ -233,9 +233,9 @@ fn spsc() { let child = task::spawn(async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); } - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); for i in 0..COUNT {