mirror of
https://github.com/async-rs/async-std.git
synced 2025-04-03 23:16:40 +00:00
update stream::fuse
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
parent
7b4bb26c5c
commit
aa94d450d6
2 changed files with 19 additions and 41 deletions
|
@ -1,51 +1,29 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use crate::task::{Context, Poll};
|
/// A `Stream` that is permanently closed once a single call to `poll` results in
|
||||||
|
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
|
||||||
/// A `Stream` that is permanently closed
|
|
||||||
/// once a single call to `poll` results in
|
|
||||||
/// `Poll::Ready(None)`, returning `Poll::Ready(None)`
|
|
||||||
/// for all future calls to `poll`.
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Fuse<S> {
|
pub struct Fuse<S> {
|
||||||
stream: S,
|
pub(crate) stream: S,
|
||||||
done: bool,
|
pub(crate) done: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Unpin> Unpin for Fuse<S> {}
|
impl<S: Unpin> Unpin for Fuse<S> {}
|
||||||
|
|
||||||
impl<S: futures::Stream> Fuse<S> {
|
impl<S: futures_core::Stream> Fuse<S> {
|
||||||
pin_utils::unsafe_pinned!(stream: S);
|
pin_utils::unsafe_pinned!(stream: S);
|
||||||
pin_utils::unsafe_unpinned!(done: bool);
|
pin_utils::unsafe_unpinned!(done: bool);
|
||||||
|
|
||||||
/// Returns `true` if the underlying stream is fused.
|
|
||||||
///
|
|
||||||
/// If this `Stream` is fused, all future calls to
|
|
||||||
/// `poll` will return `Poll::Ready(None)`.
|
|
||||||
pub fn is_done(&self) -> bool {
|
|
||||||
self.done
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Consumes this `Fuse` and returns the inner
|
|
||||||
/// `Stream`, unfusing it if it had become
|
|
||||||
/// fused.
|
|
||||||
pub fn into_inner(self) -> S
|
|
||||||
where
|
|
||||||
S: Sized,
|
|
||||||
{
|
|
||||||
self.stream
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<S: futures_core::Stream> futures_core::Stream for Fuse<S> {
|
||||||
impl<S: futures::Stream> futures::Stream for Fuse<S> {
|
|
||||||
type Item = S::Item;
|
type Item = S::Item;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||||
if self.done {
|
if self.done {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
|
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||||
if next.is_none() {
|
if next.is_none() {
|
||||||
*self.as_mut().done() = true;
|
*self.as_mut().done() = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,29 +248,29 @@ pub trait Stream {
|
||||||
Enumerate::new(self)
|
Enumerate::new(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Transforms this `Stream` into a "fused" `Stream`
|
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll`
|
||||||
/// such that after the first time `poll` returns
|
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return
|
||||||
/// `Poll::Ready(None)`, all future calls to
|
/// `Poll::Ready(None)`.
|
||||||
/// `poll` will also return `Poll::Ready(None)`.
|
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// # #![feature(async_await)]
|
|
||||||
/// # fn main() { async_std::task::block_on(async {
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
/// #
|
/// #
|
||||||
/// use async_std::prelude::*;
|
/// use async_std::prelude::*;
|
||||||
/// use async_std::stream;
|
/// use async_std::stream;
|
||||||
///
|
///
|
||||||
/// let mut s = stream::repeat(9).take(3);
|
/// let mut s = stream::once(1).fuse();
|
||||||
///
|
/// assert_eq!(s.next().await, Some(1));
|
||||||
/// while let Some(v) = s.next().await {
|
/// assert_eq!(s.next().await, None);
|
||||||
/// assert_eq!(v, 9);
|
/// assert_eq!(s.next().await, None);
|
||||||
/// }
|
|
||||||
/// #
|
/// #
|
||||||
/// # }) }
|
/// # }) }
|
||||||
/// ```
|
/// ```
|
||||||
fn fuse(self) -> Fuse<Self> {
|
fn fuse(self) -> Fuse<Self>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
Fuse {
|
Fuse {
|
||||||
stream: self,
|
stream: self,
|
||||||
done: false,
|
done: false,
|
||||||
|
|
Loading…
Reference in a new issue