forked from mirror/async-std
Merge pull request #585 from async-rs/try_channels
expose try_recv and try_send on channels
This commit is contained in:
commit
3ff9e98f20
3 changed files with 152 additions and 49 deletions
|
@ -1,5 +1,6 @@
|
||||||
use std::cell::UnsafeCell;
|
use std::cell::UnsafeCell;
|
||||||
use std::fmt;
|
use std::error::Error;
|
||||||
|
use std::fmt::{self, Debug, Display};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::isize;
|
use std::isize;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
@ -31,6 +32,7 @@ use crate::sync::WakerSet;
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # fn main() -> Result<(), async_std::sync::RecvError> {
|
||||||
/// # async_std::task::block_on(async {
|
/// # async_std::task::block_on(async {
|
||||||
/// #
|
/// #
|
||||||
/// use std::time::Duration;
|
/// use std::time::Duration;
|
||||||
|
@ -50,10 +52,11 @@ use crate::sync::WakerSet;
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// task::sleep(Duration::from_secs(1)).await;
|
/// task::sleep(Duration::from_secs(1)).await;
|
||||||
/// assert_eq!(r.recv().await, Some(1));
|
/// assert_eq!(r.recv().await?, 1);
|
||||||
/// assert_eq!(r.recv().await, Some(2));
|
/// assert_eq!(r.recv().await?, 2);
|
||||||
|
/// # Ok(())
|
||||||
/// #
|
/// #
|
||||||
/// # })
|
/// # }) }
|
||||||
/// ```
|
/// ```
|
||||||
#[cfg(feature = "unstable")]
|
#[cfg(feature = "unstable")]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
@ -112,6 +115,7 @@ impl<T> Sender<T> {
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # fn main() -> Result<(), async_std::sync::RecvError> {
|
||||||
/// # async_std::task::block_on(async {
|
/// # async_std::task::block_on(async {
|
||||||
/// #
|
/// #
|
||||||
/// use async_std::sync::channel;
|
/// use async_std::sync::channel;
|
||||||
|
@ -124,11 +128,12 @@ impl<T> Sender<T> {
|
||||||
/// s.send(2).await;
|
/// s.send(2).await;
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// assert_eq!(r.recv().await, Some(1));
|
/// assert_eq!(r.recv().await?, 1);
|
||||||
/// assert_eq!(r.recv().await, Some(2));
|
/// assert_eq!(r.recv().await?, 2);
|
||||||
/// assert_eq!(r.recv().await, None);
|
/// assert!(r.recv().await.is_err());
|
||||||
/// #
|
/// #
|
||||||
/// # })
|
/// # Ok(())
|
||||||
|
/// # }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn send(&self, msg: T) {
|
pub async fn send(&self, msg: T) {
|
||||||
struct SendFuture<'a, T> {
|
struct SendFuture<'a, T> {
|
||||||
|
@ -192,6 +197,27 @@ impl<T> Sender<T> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to send a message into the channel.
|
||||||
|
///
|
||||||
|
/// If the channel is full, this method will return an error.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::sync::channel;
|
||||||
|
///
|
||||||
|
/// let (s, r) = channel(1);
|
||||||
|
/// assert!(s.try_send(1).is_ok());
|
||||||
|
/// assert!(s.try_send(2).is_err());
|
||||||
|
/// #
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
|
||||||
|
self.channel.try_send(msg)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the channel capacity.
|
/// Returns the channel capacity.
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
|
@ -313,6 +339,7 @@ impl<T> fmt::Debug for Sender<T> {
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # fn main() -> Result<(), async_std::sync::RecvError> {
|
||||||
/// # async_std::task::block_on(async {
|
/// # async_std::task::block_on(async {
|
||||||
/// #
|
/// #
|
||||||
/// use std::time::Duration;
|
/// use std::time::Duration;
|
||||||
|
@ -328,10 +355,11 @@ impl<T> fmt::Debug for Sender<T> {
|
||||||
/// s.send(2).await;
|
/// s.send(2).await;
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
|
/// assert_eq!(r.recv().await?, 1); // Received immediately.
|
||||||
/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
|
/// assert_eq!(r.recv().await?, 2); // Received after 1 second.
|
||||||
/// #
|
/// #
|
||||||
/// # })
|
/// # Ok(())
|
||||||
|
/// # }) }
|
||||||
/// ```
|
/// ```
|
||||||
#[cfg(feature = "unstable")]
|
#[cfg(feature = "unstable")]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
@ -353,6 +381,7 @@ impl<T> Receiver<T> {
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # fn main() -> Result<(), async_std::sync::RecvError> {
|
||||||
/// # async_std::task::block_on(async {
|
/// # async_std::task::block_on(async {
|
||||||
/// #
|
/// #
|
||||||
/// use async_std::sync::channel;
|
/// use async_std::sync::channel;
|
||||||
|
@ -366,22 +395,21 @@ impl<T> Receiver<T> {
|
||||||
/// // Then we drop the sender
|
/// // Then we drop the sender
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// assert_eq!(r.recv().await, Some(1));
|
/// assert_eq!(r.recv().await?, 1);
|
||||||
/// assert_eq!(r.recv().await, Some(2));
|
/// assert_eq!(r.recv().await?, 2);
|
||||||
///
|
/// assert!(r.recv().await.is_err());
|
||||||
/// // recv() returns `None`
|
|
||||||
/// assert_eq!(r.recv().await, None);
|
|
||||||
/// #
|
/// #
|
||||||
/// # })
|
/// # Ok(())
|
||||||
|
/// # }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn recv(&self) -> Option<T> {
|
pub async fn recv(&self) -> Result<T, RecvError> {
|
||||||
struct RecvFuture<'a, T> {
|
struct RecvFuture<'a, T> {
|
||||||
channel: &'a Channel<T>,
|
channel: &'a Channel<T>,
|
||||||
opt_key: Option<usize>,
|
opt_key: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Future for RecvFuture<'_, T> {
|
impl<T> Future for RecvFuture<'_, T> {
|
||||||
type Output = Option<T>;
|
type Output = Result<T, RecvError>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
poll_recv(
|
poll_recv(
|
||||||
|
@ -409,6 +437,30 @@ impl<T> Receiver<T> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to receive a message from the channel.
|
||||||
|
///
|
||||||
|
/// If the channel is empty, this method will return an error.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::sync::channel;
|
||||||
|
///
|
||||||
|
/// let (s, r) = channel(1);
|
||||||
|
///
|
||||||
|
/// s.send(1u8).await;
|
||||||
|
///
|
||||||
|
/// assert!(r.try_recv().is_ok());
|
||||||
|
/// assert!(r.try_recv().is_err());
|
||||||
|
/// #
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||||
|
self.channel.try_recv()
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the channel capacity.
|
/// Returns the channel capacity.
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
|
@ -523,12 +575,13 @@ impl<T> Stream for Receiver<T> {
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let this = &mut *self;
|
let this = &mut *self;
|
||||||
poll_recv(
|
let res = futures_core::ready!(poll_recv(
|
||||||
&this.channel,
|
&this.channel,
|
||||||
&this.channel.stream_wakers,
|
&this.channel.stream_wakers,
|
||||||
&mut this.opt_key,
|
&mut this.opt_key,
|
||||||
cx,
|
cx,
|
||||||
)
|
));
|
||||||
|
Poll::Ready(res.ok())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,7 +600,7 @@ fn poll_recv<T>(
|
||||||
wakers: &WakerSet,
|
wakers: &WakerSet,
|
||||||
opt_key: &mut Option<usize>,
|
opt_key: &mut Option<usize>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<T>> {
|
) -> Poll<Result<T, RecvError>> {
|
||||||
loop {
|
loop {
|
||||||
// If the current task is in the set, remove it.
|
// If the current task is in the set, remove it.
|
||||||
if let Some(key) = opt_key.take() {
|
if let Some(key) = opt_key.take() {
|
||||||
|
@ -556,8 +609,8 @@ fn poll_recv<T>(
|
||||||
|
|
||||||
// Try receiving a message.
|
// Try receiving a message.
|
||||||
match channel.try_recv() {
|
match channel.try_recv() {
|
||||||
Ok(msg) => return Poll::Ready(Some(msg)),
|
Ok(msg) => return Poll::Ready(Ok(msg)),
|
||||||
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
|
Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})),
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
// Insert this receive operation.
|
// Insert this receive operation.
|
||||||
*opt_key = Some(wakers.insert(cx));
|
*opt_key = Some(wakers.insert(cx));
|
||||||
|
@ -936,8 +989,10 @@ impl<T> Drop for Channel<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An error returned from the `try_send()` method.
|
/// An error returned from the `try_send` method.
|
||||||
enum TrySendError<T> {
|
#[cfg(feature = "unstable")]
|
||||||
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
pub enum TrySendError<T> {
|
||||||
/// The channel is full but not disconnected.
|
/// The channel is full but not disconnected.
|
||||||
Full(T),
|
Full(T),
|
||||||
|
|
||||||
|
@ -945,11 +1000,59 @@ enum TrySendError<T> {
|
||||||
Disconnected(T),
|
Disconnected(T),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An error returned from the `try_recv()` method.
|
impl<T> Error for TrySendError<T> {}
|
||||||
enum TryRecvError {
|
|
||||||
|
impl<T> Debug for TrySendError<T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Full(_) => Debug::fmt("Full<T>", f),
|
||||||
|
Self::Disconnected(_) => Debug::fmt("Disconnected<T>", f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Display for TrySendError<T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Full(_) => Display::fmt("The channel is full.", f),
|
||||||
|
Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An error returned from the `try_recv` method.
|
||||||
|
#[cfg(feature = "unstable")]
|
||||||
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum TryRecvError {
|
||||||
/// The channel is empty but not disconnected.
|
/// The channel is empty but not disconnected.
|
||||||
Empty,
|
Empty,
|
||||||
|
|
||||||
/// The channel is empty and disconnected.
|
/// The channel is empty and disconnected.
|
||||||
Disconnected,
|
Disconnected,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Error for TryRecvError {}
|
||||||
|
|
||||||
|
impl Display for TryRecvError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Empty => Display::fmt("The channel is empty.", f),
|
||||||
|
Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An error returned from the `recv` method.
|
||||||
|
#[cfg(feature = "unstable")]
|
||||||
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RecvError;
|
||||||
|
|
||||||
|
impl Error for RecvError {}
|
||||||
|
|
||||||
|
impl Display for RecvError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
Display::fmt("The channel is empty.", f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -184,7 +184,7 @@ mod rwlock;
|
||||||
|
|
||||||
cfg_unstable! {
|
cfg_unstable! {
|
||||||
pub use barrier::{Barrier, BarrierWaitResult};
|
pub use barrier::{Barrier, BarrierWaitResult};
|
||||||
pub use channel::{channel, Sender, Receiver};
|
pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError};
|
||||||
|
|
||||||
mod barrier;
|
mod barrier;
|
||||||
mod channel;
|
mod channel;
|
||||||
|
|
|
@ -18,13 +18,13 @@ fn smoke() {
|
||||||
let (s, r) = channel(1);
|
let (s, r) = channel(1);
|
||||||
|
|
||||||
s.send(7).await;
|
s.send(7).await;
|
||||||
assert_eq!(r.recv().await, Some(7));
|
assert_eq!(r.recv().await.unwrap(), 7);
|
||||||
|
|
||||||
s.send(8).await;
|
s.send(8).await;
|
||||||
assert_eq!(r.recv().await, Some(8));
|
assert_eq!(r.recv().await.unwrap(), 8);
|
||||||
|
|
||||||
drop(s);
|
drop(s);
|
||||||
assert_eq!(r.recv().await, None);
|
assert!(r.recv().await.is_err());
|
||||||
});
|
});
|
||||||
|
|
||||||
task::block_on(async {
|
task::block_on(async {
|
||||||
|
@ -74,7 +74,7 @@ fn len_empty_full() {
|
||||||
assert_eq!(r.is_empty(), false);
|
assert_eq!(r.is_empty(), false);
|
||||||
assert_eq!(r.is_full(), true);
|
assert_eq!(r.is_full(), true);
|
||||||
|
|
||||||
r.recv().await;
|
let _ = r.recv().await;
|
||||||
|
|
||||||
assert_eq!(s.len(), 1);
|
assert_eq!(s.len(), 1);
|
||||||
assert_eq!(s.is_empty(), false);
|
assert_eq!(s.is_empty(), false);
|
||||||
|
@ -91,12 +91,12 @@ fn recv() {
|
||||||
let (s, r) = channel(100);
|
let (s, r) = channel(100);
|
||||||
|
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
assert_eq!(r.recv().await, Some(7));
|
assert_eq!(r.recv().await.unwrap(), 7);
|
||||||
task::sleep(ms(1000)).await;
|
task::sleep(ms(1000)).await;
|
||||||
assert_eq!(r.recv().await, Some(8));
|
assert_eq!(r.recv().await.unwrap(), 8);
|
||||||
task::sleep(ms(1000)).await;
|
task::sleep(ms(1000)).await;
|
||||||
assert_eq!(r.recv().await, Some(9));
|
assert_eq!(r.recv().await.unwrap(), 9);
|
||||||
assert_eq!(r.recv().await, None);
|
assert!(r.recv().await.is_err());
|
||||||
});
|
});
|
||||||
|
|
||||||
task::sleep(ms(1500)).await;
|
task::sleep(ms(1500)).await;
|
||||||
|
@ -122,9 +122,9 @@ fn send() {
|
||||||
});
|
});
|
||||||
|
|
||||||
task::sleep(ms(1500)).await;
|
task::sleep(ms(1500)).await;
|
||||||
assert_eq!(r.recv().await, Some(7));
|
assert_eq!(r.recv().await.unwrap(), 7);
|
||||||
assert_eq!(r.recv().await, Some(8));
|
assert_eq!(r.recv().await.unwrap(), 8);
|
||||||
assert_eq!(r.recv().await, Some(9));
|
assert_eq!(r.recv().await.unwrap(), 9);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,10 +139,10 @@ fn recv_after_disconnect() {
|
||||||
|
|
||||||
drop(s);
|
drop(s);
|
||||||
|
|
||||||
assert_eq!(r.recv().await, Some(1));
|
assert_eq!(r.recv().await.unwrap(), 1);
|
||||||
assert_eq!(r.recv().await, Some(2));
|
assert_eq!(r.recv().await.unwrap(), 2);
|
||||||
assert_eq!(r.recv().await, Some(3));
|
assert_eq!(r.recv().await.unwrap(), 3);
|
||||||
assert_eq!(r.recv().await, None);
|
assert!(r.recv().await.is_err());
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +164,7 @@ fn len() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i in 0..50 {
|
for i in 0..50 {
|
||||||
r.recv().await;
|
let _ = r.recv().await;
|
||||||
assert_eq!(r.len(), 50 - i - 1);
|
assert_eq!(r.len(), 50 - i - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ fn len() {
|
||||||
let r = r.clone();
|
let r = r.clone();
|
||||||
async move {
|
async move {
|
||||||
for i in 0..COUNT {
|
for i in 0..COUNT {
|
||||||
assert_eq!(r.recv().await, Some(i));
|
assert_eq!(r.recv().await.unwrap(), i);
|
||||||
let len = r.len();
|
let len = r.len();
|
||||||
assert!(len <= CAP);
|
assert!(len <= CAP);
|
||||||
}
|
}
|
||||||
|
@ -214,7 +214,7 @@ fn disconnect_wakes_receiver() {
|
||||||
let (s, r) = channel::<()>(1);
|
let (s, r) = channel::<()>(1);
|
||||||
|
|
||||||
let child = task::spawn(async move {
|
let child = task::spawn(async move {
|
||||||
assert_eq!(r.recv().await, None);
|
assert!(r.recv().await.is_err());
|
||||||
});
|
});
|
||||||
|
|
||||||
task::sleep(ms(1000)).await;
|
task::sleep(ms(1000)).await;
|
||||||
|
@ -233,9 +233,9 @@ fn spsc() {
|
||||||
|
|
||||||
let child = task::spawn(async move {
|
let child = task::spawn(async move {
|
||||||
for i in 0..COUNT {
|
for i in 0..COUNT {
|
||||||
assert_eq!(r.recv().await, Some(i));
|
assert_eq!(r.recv().await.unwrap(), i);
|
||||||
}
|
}
|
||||||
assert_eq!(r.recv().await, None);
|
assert!(r.recv().await.is_err());
|
||||||
});
|
});
|
||||||
|
|
||||||
for i in 0..COUNT {
|
for i in 0..COUNT {
|
||||||
|
|
Loading…
Reference in a new issue