diff --git a/src/sync/channel.rs b/src/sync/channel.rs index a1f2183..ace18e7 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -1,6 +1,6 @@ use std::cell::UnsafeCell; use std::error::Error; -use std::fmt::{Debug, Display, self}; +use std::fmt::{self, Debug, Display}; use std::future::Future; use std::isize; use std::marker::PhantomData; @@ -388,22 +388,20 @@ impl Receiver { /// // Then we drop the sender /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// - /// // recv() returns `None` - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await, Ok(1)); + /// assert_eq!(r.recv().await, Ok(2)); + /// assert!(r.recv().await.is_err()); /// # /// # }) /// ``` - pub async fn recv(&self) -> Option { + pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> { channel: &'a Channel, opt_key: Option, } impl Future for RecvFuture<'_, T> { - type Output = Option; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv( @@ -569,12 +567,13 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv( + let res = futures_core::ready!(poll_recv( &this.channel, &this.channel.stream_wakers, &mut this.opt_key, cx, - ) + )); + Poll::Ready(res.ok()) } } @@ -593,7 +592,7 @@ fn poll_recv( wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll> { loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { @@ -602,8 +601,8 @@ fn poll_recv( // Try receiving a message. match channel.try_recv() { - Ok(msg) => return Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})), Err(TryRecvError::Empty) => { // Insert this receive operation. *opt_key = Some(wakers.insert(cx)); @@ -1035,3 +1034,17 @@ impl Display for TryRecvError { } } } + +/// An error returned from the `recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct RecvError; + +impl Error for RecvError {} + +impl Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt("The channel is empty.", f) + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 1d6a93f..c221165 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -184,7 +184,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver, TryRecvError, TrySendError}; + pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; mod barrier; mod channel;