From 32dce319d33f4b5279e8b421134fbecd0c5b2120 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 25 Nov 2019 19:35:50 +0100 Subject: [PATCH 1/5] expose try_recv and try_send on channels Signed-off-by: Yoshua Wuyts --- src/sync/channel.rs | 88 ++++++++++++++++++++++++++++++++++++++++++--- src/sync/mod.rs | 2 +- 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index a957bec..ff9f996 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -1,5 +1,6 @@ use std::cell::UnsafeCell; -use std::fmt; +use std::error::Error; +use std::fmt::{Debug, Display, self}; use std::future::Future; use std::isize; use std::marker::PhantomData; @@ -192,6 +193,27 @@ impl Sender { .await } + /// Attempts to send a message into the channel. + /// + /// If the channel is full, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// assert!(s.try_send(1).is_ok()); + /// assert!(s.try_send(2).is_err()); + /// # + /// # }) + /// ``` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + self.channel.try_send(msg) + } + /// Returns the channel capacity. /// /// # Examples @@ -409,6 +431,30 @@ impl Receiver { .await } + /// Attempts to receive a message from the channel. + /// + /// If the channel is empty, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// s.send(1u8).await; + /// + /// assert!(r.try_recv().is_ok()); + /// assert!(r.try_recv().is_err()); + /// # + /// # }) + /// ``` + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } + /// Returns the channel capacity. /// /// # Examples @@ -936,8 +982,8 @@ impl Drop for Channel { } } -/// An error returned from the `try_send()` method. -enum TrySendError { +/// An error returned from the `try_send` method. +pub enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -945,11 +991,43 @@ enum TrySendError { Disconnected(T), } -/// An error returned from the `try_recv()` method. -enum TryRecvError { +impl Error for TrySendError {} + +impl Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Debug::fmt("Full", f), + Self::Disconnected(_) => Debug::fmt("Disconnected", f), + } + } +} + +impl Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Display::fmt("The channel is full.", f), + Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f), + } + } +} + +/// An error returned from the `try_recv` method. +#[derive(Debug)] +pub enum TryRecvError { /// The channel is empty but not disconnected. Empty, /// The channel is empty and disconnected. Disconnected, } + +impl Error for TryRecvError {} + +impl Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Empty => Display::fmt("The channel is empty.", f), + Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f), + } + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 088c520..1d6a93f 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -184,7 +184,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver}; + pub use channel::{channel, Sender, Receiver, TryRecvError, TrySendError}; mod barrier; mod channel; From 7b7b959a6efa4732d7284a7b41cc3220e94b5694 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 26 Nov 2019 10:58:53 +0100 Subject: [PATCH 2/5] mark channel errs as unstable Signed-off-by: Yoshua Wuyts --- src/sync/channel.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index ff9f996..a1f2183 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -983,6 +983,8 @@ impl Drop for Channel { } /// An error returned from the `try_send` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -1012,6 +1014,8 @@ impl Display for TrySendError { } /// An error returned from the `try_recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub enum TryRecvError { /// The channel is empty but not disconnected. From 7885c245c54ebc7943b406624462fde8be1fc41c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 12 Dec 2019 10:36:43 +0100 Subject: [PATCH 3/5] recverror Signed-off-by: Yoshua Wuyts --- src/sync/channel.rs | 39 ++++++++++++++++++++++++++------------- src/sync/mod.rs | 2 +- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index a1f2183..ace18e7 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -1,6 +1,6 @@ use std::cell::UnsafeCell; use std::error::Error; -use std::fmt::{Debug, Display, self}; +use std::fmt::{self, Debug, Display}; use std::future::Future; use std::isize; use std::marker::PhantomData; @@ -388,22 +388,20 @@ impl Receiver { /// // Then we drop the sender /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// - /// // recv() returns `None` - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await, Ok(1)); + /// assert_eq!(r.recv().await, Ok(2)); + /// assert!(r.recv().await.is_err()); /// # /// # }) /// ``` - pub async fn recv(&self) -> Option { + pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> { channel: &'a Channel, opt_key: Option, } impl Future for RecvFuture<'_, T> { - type Output = Option; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv( @@ -569,12 +567,13 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv( + let res = futures_core::ready!(poll_recv( &this.channel, &this.channel.stream_wakers, &mut this.opt_key, cx, - ) + )); + Poll::Ready(res.ok()) } } @@ -593,7 +592,7 @@ fn poll_recv( wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll> { loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { @@ -602,8 +601,8 @@ fn poll_recv( // Try receiving a message. match channel.try_recv() { - Ok(msg) => return Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})), Err(TryRecvError::Empty) => { // Insert this receive operation. *opt_key = Some(wakers.insert(cx)); @@ -1035,3 +1034,17 @@ impl Display for TryRecvError { } } } + +/// An error returned from the `recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct RecvError; + +impl Error for RecvError {} + +impl Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt("The channel is empty.", f) + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1d6a93f..c221165 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -184,7 +184,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver, TryRecvError, TrySendError}; + pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; mod barrier; mod channel; From 19fd7a4084b937c1d3de04b044f73ec9e52d77e1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 12 Dec 2019 11:24:52 +0100 Subject: [PATCH 4/5] fix channel tests Signed-off-by: Yoshua Wuyts --- tests/channel.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/channel.rs b/tests/channel.rs index 34bd888..f302906 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -18,13 +18,13 @@ fn smoke() { let (s, r) = channel(1); s.send(7).await; - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); s.send(8).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); drop(s); - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::block_on(async { @@ -74,7 +74,7 @@ fn len_empty_full() { assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), true); - r.recv().await; + let _ = r.recv().await; assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); @@ -91,12 +91,12 @@ fn recv() { let (s, r) = channel(100); task::spawn(async move { - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(9)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 9); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1500)).await; @@ -122,9 +122,9 @@ fn send() { }); task::sleep(ms(1500)).await; - assert_eq!(r.recv().await, Some(7)); - assert_eq!(r.recv().await, Some(8)); - assert_eq!(r.recv().await, Some(9)); + assert_eq!(r.recv().await.unwrap(), 7); + assert_eq!(r.recv().await.unwrap(), 8); + assert_eq!(r.recv().await.unwrap(), 9); }) } @@ -139,10 +139,10 @@ fn recv_after_disconnect() { drop(s); - assert_eq!(r.recv().await, Some(1)); - assert_eq!(r.recv().await, Some(2)); - assert_eq!(r.recv().await, Some(3)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 1); + assert_eq!(r.recv().await.unwrap(), 2); + assert_eq!(r.recv().await.unwrap(), 3); + assert!(r.recv().await.is_err()); }) } @@ -164,7 +164,7 @@ fn len() { } for i in 0..50 { - r.recv().await; + let _ = r.recv().await; assert_eq!(r.len(), 50 - i - 1); } } @@ -188,7 +188,7 @@ fn len() { let r = r.clone(); async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); let len = r.len(); assert!(len <= CAP); } @@ -214,7 +214,7 @@ fn disconnect_wakes_receiver() { let (s, r) = channel::<()>(1); let child = task::spawn(async move { - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1000)).await; @@ -233,9 +233,9 @@ fn spsc() { let child = task::spawn(async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); } - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); for i in 0..COUNT { From b7c7efc797a0a5d4bce5e1d80bc95bb189d160f1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 16 Mar 2020 00:05:39 +0100 Subject: [PATCH 5/5] Update try_channel doctests --- src/sync/channel.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index ace18e7..8ab1cc1 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -32,6 +32,7 @@ use crate::sync::WakerSet; /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -51,10 +52,11 @@ use crate::sync::WakerSet; /// }); /// /// task::sleep(Duration::from_secs(1)).await; -/// assert_eq!(r.recv().await, Some(1)); -/// assert_eq!(r.recv().await, Some(2)); +/// assert_eq!(r.recv().await?, 1); +/// assert_eq!(r.recv().await?, 2); +/// # Ok(()) /// # -/// # }) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -113,6 +115,7 @@ impl Sender { /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -125,11 +128,12 @@ impl Sender { /// s.send(2).await; /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); + /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { @@ -335,6 +339,7 @@ impl fmt::Debug for Sender { /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -350,10 +355,11 @@ impl fmt::Debug for Sender { /// s.send(2).await; /// }); /// -/// assert_eq!(r.recv().await, Some(1)); // Received immediately. -/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second. +/// assert_eq!(r.recv().await?, 1); // Received immediately. +/// assert_eq!(r.recv().await?, 2); // Received after 1 second. /// # -/// # }) +/// # Ok(()) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -375,6 +381,7 @@ impl Receiver { /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -388,11 +395,12 @@ impl Receiver { /// // Then we drop the sender /// }); /// - /// assert_eq!(r.recv().await, Ok(1)); - /// assert_eq!(r.recv().await, Ok(2)); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> {