From ba43a05d01d997860e12616a0e36587a11fb4aee Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 8 Sep 2019 12:56:51 +0200 Subject: [PATCH] split stream into multiple files (#150) * split stream into multiple files Signed-off-by: Yoshua Wuyts * cargo fmt Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 1 - src/stream/stream/all.rs | 52 ++++++++ src/stream/stream/any.rs | 52 ++++++++ src/stream/{ => stream}/min_by.rs | 12 +- src/stream/{stream.rs => stream/mod.rs} | 163 +++--------------------- src/stream/stream/next.rs | 17 +++ src/stream/stream/take.rs | 34 +++++ 7 files changed, 177 insertions(+), 154 deletions(-) create mode 100644 src/stream/stream/all.rs create mode 100644 src/stream/stream/any.rs rename src/stream/{ => stream}/min_by.rs (84%) rename src/stream/{stream.rs => stream/mod.rs} (65%) create mode 100644 src/stream/stream/next.rs create mode 100644 src/stream/stream/take.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 5eed9c2..8dcc6d5 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,7 +27,6 @@ pub use repeat::{repeat, Repeat}; pub use stream::{Stream, Take}; mod empty; -mod min_by; mod once; mod repeat; mod stream; diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs new file mode 100644 index 0000000..6271024 --- /dev/null +++ b/src/stream/stream/all.rs @@ -0,0 +1,52 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::marker::PhantomData; +use std::pin::Pin; + +#[derive(Debug)] +pub struct AllFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pub(crate) stream: &'a mut S, + pub(crate) f: F, + pub(crate) result: bool, + pub(crate) __item: PhantomData, +} + +impl<'a, S, F, T> AllFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AllFuture<'_, S, F, S::Item> +where + S: futures_core::stream::Stream + Unpin + Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_core::stream::Stream; + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(false) + } + } + None => Poll::Ready(self.result), + } + } +} diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs new file mode 100644 index 0000000..f1f551a --- /dev/null +++ b/src/stream/stream/any.rs @@ -0,0 +1,52 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::marker::PhantomData; +use std::pin::Pin; + +#[derive(Debug)] +pub struct AnyFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pub(crate) stream: &'a mut S, + pub(crate) f: F, + pub(crate) result: bool, + pub(crate) __item: PhantomData, +} + +impl<'a, S, F, T> AnyFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AnyFuture<'_, S, F, S::Item> +where + S: futures_core::stream::Stream + Unpin + Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_core::stream::Stream; + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + Poll::Ready(true) + } else { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } + } + None => Poll::Ready(self.result), + } + } +} diff --git a/src/stream/min_by.rs b/src/stream/stream/min_by.rs similarity index 84% rename from src/stream/min_by.rs rename to src/stream/stream/min_by.rs index b21de77..b65d88d 100644 --- a/src/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -1,23 +1,23 @@ use std::cmp::Ordering; use std::pin::Pin; -use super::stream::Stream; use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// A future that yields the minimum item in a stream by a given comparison function. #[derive(Clone, Debug)] -pub struct MinBy { +pub struct MinByFuture { stream: S, compare: F, min: Option, } -impl Unpin for MinBy {} +impl Unpin for MinByFuture {} -impl MinBy { +impl MinByFuture { pub(super) fn new(stream: S, compare: F) -> Self { - MinBy { + MinByFuture { stream, compare, min: None, @@ -25,7 +25,7 @@ impl MinBy { } } -impl Future for MinBy +impl Future for MinByFuture where S: futures_core::stream::Stream + Unpin, S::Item: Copy, diff --git a/src/stream/stream.rs b/src/stream/stream/mod.rs similarity index 65% rename from src/stream/stream.rs rename to src/stream/stream/mod.rs index 95b4a61..91b111e 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream/mod.rs @@ -21,16 +21,24 @@ //! # }) } //! ``` -use std::cmp::Ordering; -use std::pin::Pin; +mod all; +mod any; +mod min_by; +mod next; +mod take; -use cfg_if::cfg_if; +pub use take::Take; + +use all::AllFuture; +use any::AnyFuture; +use min_by::MinByFuture; +use next::NextFuture; -use super::min_by::MinBy; -use crate::future::Future; -use crate::task::{Context, Poll}; +use std::cmp::Ordering; use std::marker::PhantomData; +use cfg_if::cfg_if; + cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -145,12 +153,12 @@ pub trait Stream { /// # /// # }) } /// ``` - fn min_by(self, compare: F) -> MinBy + fn min_by(self, compare: F) -> MinByFuture where Self: Sized + Unpin, F: FnMut(&Self::Item, &Self::Item) -> Ordering, { - MinBy::new(self, compare) + MinByFuture::new(self, compare) } /// Tests if every element of the stream matches a predicate. @@ -278,142 +286,3 @@ impl Stream for T { NextFuture { stream: self } } } - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct NextFuture<'a, T: Unpin + ?Sized> { - stream: &'a mut T, -} - -impl Future for NextFuture<'_, T> { - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut *self.stream).poll_next(cx) - } -} - -/// A stream that yields the first `n` items of another stream. -#[derive(Clone, Debug)] -pub struct Take { - stream: S, - remaining: usize, -} - -impl Unpin for Take {} - -impl Take { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(remaining: usize); -} - -impl futures_core::stream::Stream for Take { - type Item = S::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.remaining == 0 { - Poll::Ready(None) - } else { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(_) => *self.as_mut().remaining() -= 1, - None => *self.as_mut().remaining() = 0, - } - Poll::Ready(next) - } - } -} - -#[derive(Debug)] -pub struct AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - stream: &'a mut S, - f: F, - result: bool, - __item: PhantomData, -} - -impl<'a, S, F, T> AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} - -impl Future for AllFuture<'_, S, F, S::Item> -where - S: futures_core::stream::Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, -{ - type Output = bool; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; - if result { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(false) - } - } - None => Poll::Ready(self.result), - } - } -} - -#[derive(Debug)] -pub struct AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - stream: &'a mut S, - f: F, - result: bool, - __item: PhantomData, -} - -impl<'a, S, F, T> AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} - -impl Future for AnyFuture<'_, S, F, S::Item> -where - S: futures_core::stream::Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, -{ - type Output = bool; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; - if result { - Poll::Ready(true) - } else { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending - } - } - None => Poll::Ready(self.result), - } - } -} diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs new file mode 100644 index 0000000..b64750d --- /dev/null +++ b/src/stream/stream/next.rs @@ -0,0 +1,17 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; +use std::pin::Pin; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NextFuture<'a, T: Unpin + ?Sized> { + pub(crate) stream: &'a mut T, +} + +impl Future for NextFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next(cx) + } +} diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs new file mode 100644 index 0000000..0499a6a --- /dev/null +++ b/src/stream/stream/take.rs @@ -0,0 +1,34 @@ +use crate::task::{Context, Poll}; + +use std::pin::Pin; + +/// A stream that yields the first `n` items of another stream. +#[derive(Clone, Debug)] +pub struct Take { + pub(crate) stream: S, + pub(crate) remaining: usize, +} + +impl Unpin for Take {} + +impl Take { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(remaining: usize); +} + +impl futures_core::stream::Stream for Take { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + Poll::Ready(None) + } else { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(_) => *self.as_mut().remaining() -= 1, + None => *self.as_mut().remaining() = 0, + } + Poll::Ready(next) + } + } +}