2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-01-16 02:39:55 +00:00

recverror

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
Yoshua Wuyts 2019-12-12 10:36:43 +01:00
parent 7b7b959a6e
commit 7885c245c5
2 changed files with 27 additions and 14 deletions

View file

@ -1,6 +1,6 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::error::Error; use std::error::Error;
use std::fmt::{Debug, Display, self}; use std::fmt::{self, Debug, Display};
use std::future::Future; use std::future::Future;
use std::isize; use std::isize;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -388,22 +388,20 @@ impl<T> Receiver<T> {
/// // Then we drop the sender /// // Then we drop the sender
/// }); /// });
/// ///
/// assert_eq!(r.recv().await, Some(1)); /// assert_eq!(r.recv().await, Ok(1));
/// assert_eq!(r.recv().await, Some(2)); /// assert_eq!(r.recv().await, Ok(2));
/// /// assert!(r.recv().await.is_err());
/// // recv() returns `None`
/// assert_eq!(r.recv().await, None);
/// # /// #
/// # }) /// # })
/// ``` /// ```
pub async fn recv(&self) -> Option<T> { pub async fn recv(&self) -> Result<T, RecvError> {
struct RecvFuture<'a, T> { struct RecvFuture<'a, T> {
channel: &'a Channel<T>, channel: &'a Channel<T>,
opt_key: Option<usize>, opt_key: Option<usize>,
} }
impl<T> Future for RecvFuture<'_, T> { impl<T> Future for RecvFuture<'_, T> {
type Output = Option<T>; type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
poll_recv( poll_recv(
@ -569,12 +567,13 @@ impl<T> Stream for Receiver<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self; let this = &mut *self;
poll_recv( let res = futures_core::ready!(poll_recv(
&this.channel, &this.channel,
&this.channel.stream_wakers, &this.channel.stream_wakers,
&mut this.opt_key, &mut this.opt_key,
cx, cx,
) ));
Poll::Ready(res.ok())
} }
} }
@ -593,7 +592,7 @@ fn poll_recv<T>(
wakers: &WakerSet, wakers: &WakerSet,
opt_key: &mut Option<usize>, opt_key: &mut Option<usize>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<T>> { ) -> Poll<Result<T, RecvError>> {
loop { loop {
// If the current task is in the set, remove it. // If the current task is in the set, remove it.
if let Some(key) = opt_key.take() { if let Some(key) = opt_key.take() {
@ -602,8 +601,8 @@ fn poll_recv<T>(
// Try receiving a message. // Try receiving a message.
match channel.try_recv() { match channel.try_recv() {
Ok(msg) => return Poll::Ready(Some(msg)), Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Disconnected) => return Poll::Ready(None), Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})),
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
// Insert this receive operation. // Insert this receive operation.
*opt_key = Some(wakers.insert(cx)); *opt_key = Some(wakers.insert(cx));
@ -1035,3 +1034,17 @@ impl Display for TryRecvError {
} }
} }
} }
/// An error returned from the `recv` method.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct RecvError;
impl Error for RecvError {}
impl Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt("The channel is empty.", f)
}
}

View file

@ -184,7 +184,7 @@ mod rwlock;
cfg_unstable! { cfg_unstable! {
pub use barrier::{Barrier, BarrierWaitResult}; pub use barrier::{Barrier, BarrierWaitResult};
pub use channel::{channel, Sender, Receiver, TryRecvError, TrySendError}; pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError};
mod barrier; mod barrier;
mod channel; mod channel;