diff --git a/src/fs/metadata.rs b/src/fs/metadata.rs index 6bb9993..2c9e41e 100644 --- a/src/fs/metadata.rs +++ b/src/fs/metadata.rs @@ -70,7 +70,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.file_type()); @@ -90,7 +90,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata(".").await?; /// println!("{:?}", metadata.is_dir()); @@ -110,7 +110,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.is_file()); @@ -128,7 +128,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{}", metadata.len()); @@ -146,7 +146,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.permissions()); @@ -169,7 +169,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.modified()); @@ -192,7 +192,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.accessed()); @@ -215,7 +215,7 @@ cfg_if! { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use std::fs; + /// use async_std::fs; /// /// let metadata = fs::metadata("a.txt").await?; /// println!("{:?}", metadata.created()); diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs new file mode 100644 index 0000000..68211f7 --- /dev/null +++ b/src/stream/stream/filter.rs @@ -0,0 +1,49 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream to filter elements of another stream with a predicate. +#[derive(Debug)] +pub struct Filter { + stream: S, + predicate: P, + __t: PhantomData, +} + +impl Filter { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: P); + + pub(super) fn new(stream: S, predicate: P) -> Self { + Filter { + stream, + predicate, + __t: PhantomData, + } + } +} + +impl futures_core::stream::Stream for Filter +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match (self.as_mut().predicate())(&v) { + true => Poll::Ready(Some(v)), + false => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 6aa92c7..d79605e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod enumerate; +mod filter; mod filter_map; mod find; mod find_map; @@ -33,12 +34,15 @@ mod min_by; mod next; mod nth; mod scan; +mod skip; mod skip_while; mod take; mod zip; +pub use filter::Filter; pub use fuse::Fuse; pub use scan::Scan; +pub use skip::Skip; pub use skip_while::SkipWhile; pub use take::Take; pub use zip::Zip; @@ -284,6 +288,34 @@ pub trait Stream { done: false, } } + /// Creates a stream that uses a predicate to determine if an element + /// should be yeilded. + /// + /// # Examples + /// + /// Basic usage: + /// + ///``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); + /// let mut s = s.filter(|i| i % 2 == 0); + /// + /// assert_eq!(s.next().await, Some(2)); + /// assert_eq!(s.next().await, Some(4)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + fn filter

(self, predicate: P) -> Filter + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + Filter::new(self, predicate) + } /// Both filters and maps a stream. /// @@ -672,6 +704,7 @@ pub trait Stream { /// elements in the strem are yeilded. /// /// ## Examples + /// /// ``` /// # fn main() { async_std::task::block_on(async { /// # @@ -695,6 +728,25 @@ pub trait Stream { SkipWhile::new(self, predicate) } + /// Creates a combinator that skips the first `n` elements. + /// + /// ## Examples + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let mut skipped = s.skip(2); + /// + /// assert_eq!(skipped.next().await, Some(3)); + /// assert_eq!(skipped.next().await, None); + /// # + /// # }) } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + /// 'Zips up' two streams into a single stream of pairs. /// /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs new file mode 100644 index 0000000..8a2d966 --- /dev/null +++ b/src/stream/stream/skip.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::stream::Stream; + +/// A stream to skip first n elements of another stream. +#[derive(Debug)] +pub struct Skip { + stream: S, + n: usize, +} + +impl Skip { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(n: usize); + + pub(crate) fn new(stream: S, n: usize) -> Self { + Skip { stream, n } + } +} + +impl Stream for Skip +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.n { + 0 => return Poll::Ready(Some(v)), + _ => *self.as_mut().n() -= 1, + }, + None => return Poll::Ready(None), + } + } + } +}