Merge pull request #506 from stjepang/cleanup-stream

Cleanup stream module
poc-serde-support
Yoshua Wuyts 5 years ago committed by GitHub
commit 76c5ffe9ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,8 +19,8 @@
//! [`File`]s: //! [`File`]s:
//! //!
//! ```no_run //! ```no_run
//! use async_std::prelude::*;
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::prelude::*;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
@ -47,9 +47,9 @@
//! coming from: //! coming from:
//! //!
//! ```no_run //! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::SeekFrom;
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::io::SeekFrom;
//! use async_std::prelude::*;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
@ -82,9 +82,9 @@
//! methods to any reader: //! methods to any reader:
//! //!
//! ```no_run //! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
@ -104,9 +104,9 @@
//! to [`write`][`Write::write`]: //! to [`write`][`Write::write`]:
//! //!
//! ```no_run //! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufWriter;
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::io::BufWriter;
//! use async_std::io::prelude::*;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
@ -179,9 +179,9 @@
//! lines: //! lines:
//! //!
//! ```no_run //! ```no_run
//! use async_std::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #

@ -65,10 +65,10 @@ pub trait Extend<A> {
/// ``` /// ```
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub async fn extend<'a, C, A, T>(collection: &mut C, stream: T) pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
where where
C: Extend<A>, C: Extend<T>,
T: IntoStream<Item = A> + 'a, S: IntoStream<Item = T> + 'a,
{ {
Extend::extend(collection, stream).await Extend::extend(collection, stream).await
} }

@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { /// A stream that yields elements by calling a closure.
/// A stream that yields elements by calling a closure. ///
/// /// This stream is created by the [`from_fn`] function. See its
/// This stream is created by the [`from_fn`] function. See its /// documentation for more.
/// documentation for more. ///
/// /// [`from_fn`]: fn.from_fn.html
/// [`from_fn`]: fn.from_fn.html #[derive(Clone, Debug)]
#[derive(Debug)] pub struct FromFn<F> {
pub struct FromFn<F, Fut, T> { f: F,
f: F,
#[pin]
future: Option<Fut>,
__t: PhantomData<T>,
}
} }
impl<F> Unpin for FromFn<F> {}
/// Creates a new stream where to produce each new element a provided closure is called. /// Creates a new stream where to produce each new element a provided closure is called.
/// ///
/// This allows creating a custom stream with any behaviour without using the more verbose /// This allows creating a custom stream with any behaviour without using the more verbose
@ -34,21 +27,15 @@ pin_project! {
/// # async_std::task::block_on(async { /// # async_std::task::block_on(async {
/// # /// #
/// use async_std::prelude::*; /// use async_std::prelude::*;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::stream; /// use async_std::stream;
/// ///
/// let count = Arc::new(Mutex::new(0u8)); /// let mut count = 0u8;
/// let s = stream::from_fn(|| { /// let s = stream::from_fn(|| {
/// let count = Arc::clone(&count); /// count += 1;
/// /// if count > 3 {
/// async move { /// None
/// *count.lock().await += 1; /// } else {
/// /// Some(count)
/// if *count.lock().await > 3 {
/// None
/// } else {
/// Some(*count.lock().await)
/// }
/// } /// }
/// }); /// });
/// ///
@ -60,38 +47,21 @@ pin_project! {
/// # /// #
/// # }) /// # })
/// ``` /// ```
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T> pub fn from_fn<T, F>(f: F) -> FromFn<F>
where where
F: FnMut() -> Fut, F: FnMut() -> Option<T>,
Fut: Future<Output = Option<T>>,
{ {
FromFn { FromFn { f }
f,
future: None,
__t: PhantomData,
}
} }
impl<F, Fut, T> Stream for FromFn<F, Fut, T> impl<T, F> Stream for FromFn<F>
where where
F: FnMut() -> Fut, F: FnMut() -> Option<T>,
Fut: Future<Output = Option<T>>,
{ {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let item = (&mut self.f)();
loop { Poll::Ready(item)
if this.future.is_some() {
let next =
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);
return Poll::Ready(next);
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
} }
} }

@ -6,7 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that created from iterator /// A stream that was created from iterator.
/// ///
/// This stream is created by the [`from_iter`] function. /// This stream is created by the [`from_iter`] function.
/// See it documentation for more. /// See it documentation for more.

@ -306,9 +306,7 @@ pub use from_iter::{from_iter, FromIter};
pub use once::{once, Once}; pub use once::{once, Once};
pub use repeat::{repeat, Repeat}; pub use repeat::{repeat, Repeat};
pub use repeat_with::{repeat_with, RepeatWith}; pub use repeat_with::{repeat_with, RepeatWith};
pub use stream::{ pub use stream::*;
Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip,
};
pub(crate) mod stream; pub(crate) mod stream;

@ -33,7 +33,7 @@ pin_project! {
/// documentation for more. /// documentation for more.
/// ///
/// [`once`]: fn.once.html /// [`once`]: fn.once.html
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Once<T> { pub struct Once<T> {
value: Option<T>, value: Option<T>,
} }

@ -33,7 +33,7 @@ where
/// documentation for more. /// documentation for more.
/// ///
/// [`repeat`]: fn.repeat.html /// [`repeat`]: fn.repeat.html
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Repeat<T> { pub struct Repeat<T> {
item: T, item: T,
} }

@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { /// A stream that repeats elements of type `T` endlessly by applying a provided closure.
/// A stream that repeats elements of type `T` endlessly by applying a provided closure. ///
/// /// This stream is created by the [`repeat_with`] function. See its
/// This stream is created by the [`repeat_with`] function. See its /// documentation for more.
/// documentation for more. ///
/// /// [`repeat_with`]: fn.repeat_with.html
/// [`repeat_with`]: fn.repeat_with.html #[derive(Clone, Debug)]
#[derive(Debug)] pub struct RepeatWith<F> {
pub struct RepeatWith<F, Fut, A> { f: F,
f: F,
#[pin]
future: Option<Fut>,
__a: PhantomData<A>,
}
} }
impl<F> Unpin for RepeatWith<F> {}
/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure. /// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
/// ///
/// # Examples /// # Examples
@ -35,7 +28,7 @@ pin_project! {
/// use async_std::prelude::*; /// use async_std::prelude::*;
/// use async_std::stream; /// use async_std::stream;
/// ///
/// let s = stream::repeat_with(|| async { 1 }); /// let s = stream::repeat_with(|| 1);
/// ///
/// pin_utils::pin_mut!(s); /// pin_utils::pin_mut!(s);
/// ///
@ -54,48 +47,38 @@ pin_project! {
/// use async_std::prelude::*; /// use async_std::prelude::*;
/// use async_std::stream; /// use async_std::stream;
/// ///
/// let s = stream::repeat_with(|| async { 1u8 }).take(2); /// let mut n = 1;
/// let s = stream::repeat_with(|| {
/// let item = n;
/// n *= 2;
/// item
/// })
/// .take(4);
/// ///
/// pin_utils::pin_mut!(s); /// pin_utils::pin_mut!(s);
/// ///
/// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None); /// assert_eq!(s.next().await, None);
/// # }) /// # })
/// ``` /// ```
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A> pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where where
F: FnMut() -> Fut, F: FnMut() -> T,
Fut: Future<Output = A>,
{ {
RepeatWith { RepeatWith { f: repeater }
f: repeater,
future: None,
__a: PhantomData,
}
} }
impl<F, Fut, A> Stream for RepeatWith<F, Fut, A> impl<T, F> Stream for RepeatWith<F>
where where
F: FnMut() -> Fut, F: FnMut() -> T,
Fut: Future<Output = A>,
{ {
type Item = A; type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);
return Poll::Ready(Some(res));
} else {
let fut = (this.f)();
this.future.set(Some(fut)); fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
} let item = (&mut self.f)();
} Poll::Ready(Some(item))
} }
} }

@ -7,7 +7,7 @@ use crate::prelude::*;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// Chains two streams one after another. /// A stream that chains two streams one after another.
/// ///
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its /// This `struct` is created by the [`chain`] method on [`Stream`]. See its
/// documentation for more. /// documentation for more.

@ -4,6 +4,7 @@ use pin_project_lite::pin_project;
use std::pin::Pin; use std::pin::Pin;
pin_project! { pin_project! {
/// A stream that clones the elements of an underlying stream.
#[derive(Debug)] #[derive(Debug)]
pub struct Cloned<S> { pub struct Cloned<S> {
#[pin] #[pin]

@ -4,8 +4,8 @@ use pin_project_lite::pin_project;
use std::pin::Pin; use std::pin::Pin;
pin_project! { pin_project! {
#[doc(hidden)] /// A stream that copies the elements of an underlying stream.
#[allow(missing_debug_implementations)] #[derive(Debug)]
pub struct Copied<S> { pub struct Copied<S> {
#[pin] #[pin]
stream: S, stream: S,

@ -1,73 +1,54 @@
use std::mem::ManuallyDrop;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { /// A stream that will repeatedly yield the same list of elements.
/// A stream that will repeatedly yield the same list of elements #[derive(Debug)]
pub struct Cycle<S, T> { pub struct Cycle<S> {
#[pin] orig: S,
source: S, source: ManuallyDrop<S>,
index: usize,
buffer: Vec<T>,
state: CycleState,
}
}
#[derive(Eq, PartialEq)]
enum CycleState {
FromStream,
FromBuffer,
} }
impl<S> Cycle<S, S::Item> impl<S> Cycle<S>
where where
S: Stream, S: Stream + Clone,
S::Item: Clone,
{ {
pub fn new(source: S) -> Cycle<S, S::Item> { pub fn new(source: S) -> Cycle<S> {
Cycle { Cycle {
source, orig: source.clone(),
index: 0, source: ManuallyDrop::new(source),
buffer: Vec::new(),
state: CycleState::FromStream,
} }
} }
} }
impl<S> Stream for Cycle<S, S::Item> impl<S> Drop for Cycle<S> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.source);
}
}
}
impl<S> Stream for Cycle<S>
where where
S: Stream, S: Stream + Clone,
S::Item: Clone,
{ {
type Item = S::Item; type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project(); unsafe {
let this = self.get_unchecked_mut();
let mut next;
if *this.state == CycleState::FromStream { match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) {
next = futures_core::ready!(this.source.poll_next(cx)); Some(item) => Poll::Ready(Some(item)),
None => {
if let Some(val) = next { ManuallyDrop::drop(&mut this.source);
this.buffer.push(val.clone()); this.source = ManuallyDrop::new(this.orig.clone());
next = Some(val) Pin::new_unchecked(&mut *this.source).poll_next(cx)
} else { }
*this.state = CycleState::FromBuffer;
next = this.buffer.get(*this.index).cloned();
} }
} else {
let mut index = *this.index;
if index == this.buffer.len() {
index = 0
}
next = Some(this.buffer[index].clone());
*this.index = index + 1;
} }
Poll::Ready(next)
} }
} }

@ -6,8 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] #[derive(Debug)]
#[allow(missing_debug_implementations)]
pub struct Enumerate<S> { pub struct Enumerate<S> {
#[pin] #[pin]
stream: S, stream: S,

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`filter`]: trait.Stream.html#method.filter /// [`filter`]: trait.Stream.html#method.filter
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Filter<S, P, T> { pub struct Filter<S, P> {
#[pin] #[pin]
stream: S, stream: S,
predicate: P, predicate: P,
__t: PhantomData<T>,
} }
} }
impl<S, P, T> Filter<S, P, T> { impl<S, P> Filter<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self { pub(super) fn new(stream: S, predicate: P) -> Self {
Filter { Filter {
stream, stream,
predicate, predicate,
__t: PhantomData,
} }
} }
} }
impl<S, P> Stream for Filter<S, P, S::Item> impl<S, P> Stream for Filter<S, P>
where where
S: Stream, S: Stream,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -7,29 +6,21 @@ use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
pin_project! { pin_project! {
#[doc(hidden)] #[derive(Debug)]
#[allow(missing_debug_implementations)] pub struct FilterMap<S, F> {
pub struct FilterMap<S, F, T, B> {
#[pin] #[pin]
stream: S, stream: S,
f: F, f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
} }
} }
impl<S, F, T, B> FilterMap<S, F, T, B> { impl<S, F> FilterMap<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self { pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap { FilterMap { stream, f }
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
} }
} }
impl<S, F, B> Stream for FilterMap<S, F, S::Item, B> impl<S, F, B> Stream for FilterMap<S, F>
where where
S: Stream, S: Stream,
F: FnMut(S::Item) -> Option<B>, F: FnMut(S::Item) -> Option<B>,

@ -1,31 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> { pub struct FindFuture<'a, S, P> {
stream: &'a mut S, stream: &'a mut S,
p: P, p: P,
__t: PhantomData<T>,
} }
impl<'a, S, P, T> FindFuture<'a, S, P, T> { impl<'a, S, P> FindFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, p: P) -> Self { pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture { FindFuture { stream, p }
stream,
p,
__t: PhantomData,
}
} }
} }
impl<S: Unpin, P, T> Unpin for FindFuture<'_, S, P, T> {} impl<S: Unpin, P> Unpin for FindFuture<'_, S, P> {}
impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item> impl<'a, S, P> Future for FindFuture<'a, S, P>
where where
S: Stream + Unpin + Sized, S: Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,

@ -1,33 +1,25 @@
use std::marker::PhantomData; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct FindMapFuture<'a, S, F, T, B> { pub struct FindMapFuture<'a, S, F> {
stream: &'a mut S, stream: &'a mut S,
f: F, f: F,
__b: PhantomData<B>,
__t: PhantomData<T>,
} }
impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { impl<'a, S, F> FindMapFuture<'a, S, F> {
pub(super) fn new(stream: &'a mut S, f: F) -> Self { pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture { FindMapFuture { stream, f }
stream,
f,
__b: PhantomData,
__t: PhantomData,
}
} }
} }
impl<S: Unpin, F, T, B> Unpin for FindMapFuture<'_, S, F, T, B> {} impl<S: Unpin, F> Unpin for FindMapFuture<'_, S, F> {}
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B> impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
where where
S: Stream + Unpin + Sized, S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> Option<B>, F: FnMut(S::Item) -> Option<B>,

@ -1,33 +1,36 @@
use pin_project_lite::pin_project;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::stream::map::Map; use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream}; use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that maps each element to a stream, and yields the elements of the produced
/// streams.
///
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more. /// documentation for more.
/// ///
/// [`flat_map`]: trait.Stream.html#method.flat_map /// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)] pub struct FlatMap<S, U, F> {
pub struct FlatMap<S, U, T, F> {
#[pin] #[pin]
stream: Map<S, F, T, U>, stream: Map<S, F>,
#[pin] #[pin]
inner_stream: Option<U>, inner_stream: Option<U>,
} }
} }
impl<S, U, F> FlatMap<S, U, S::Item, F> impl<S, U, F> FlatMap<S, U, F>
where where
S: Stream, S: Stream,
U: IntoStream, U: IntoStream,
F: FnMut(S::Item) -> U, F: FnMut(S::Item) -> U,
{ {
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> { pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, F> {
FlatMap { FlatMap {
stream: stream.map(f), stream: stream.map(f),
inner_stream: None, inner_stream: None,
@ -35,7 +38,7 @@ where
} }
} }
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F> impl<S, U, F> Stream for FlatMap<S, U, F>
where where
S: Stream, S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>, S::Item: IntoStream<IntoStream = U, Item = U::Item>,

@ -1,30 +1,38 @@
use pin_project_lite::pin_project; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::{IntoStream, Stream}; use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that flattens one level of nesting in an stream of things that can be turned into
/// streams.
///
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more. /// documentation for more.
/// ///
/// [`flatten`]: trait.Stream.html#method.flatten /// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)] pub struct Flatten<S>
pub struct Flatten<S, U> { where
S: Stream,
S::Item: IntoStream,
{
#[pin] #[pin]
stream: S, stream: S,
#[pin] #[pin]
inner_stream: Option<U>, inner_stream: Option<<S::Item as IntoStream>::IntoStream>,
} }
} }
impl<S> Flatten<S, S::Item> impl<S> Flatten<S>
where where
S: Stream, S: Stream,
S::Item: IntoStream, S::Item: IntoStream,
{ {
pub(super) fn new(stream: S) -> Flatten<S, S::Item> { pub(super) fn new(stream: S) -> Flatten<S> {
Flatten { Flatten {
stream, stream,
inner_stream: None, inner_stream: None,
@ -32,7 +40,7 @@ where
} }
} }
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream> impl<S, U> Stream for Flatten<S>
where where
S: Stream, S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>, S::Item: IntoStream<IntoStream = U, Item = U::Item>,
@ -56,3 +64,16 @@ where
} }
} }
} }
impl<S, U> fmt::Debug for Flatten<S>
where
S: fmt::Debug + Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: fmt::Debug + Stream,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Flatten")
.field("inner", &self.stream)
.finish()
}
}

@ -1,6 +1,5 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -8,29 +7,26 @@ use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] #[derive(Debug)]
#[allow(missing_debug_implementations)] pub struct FoldFuture<S, F, B> {
pub struct FoldFuture<S, F, T, B> {
#[pin] #[pin]
stream: S, stream: S,
f: F, f: F,
acc: Option<B>, acc: Option<B>,
__t: PhantomData<T>,
} }
} }
impl<S, F, T, B> FoldFuture<S, F, T, B> { impl<S, F, B> FoldFuture<S, F, B> {
pub(super) fn new(stream: S, init: B, f: F) -> Self { pub(super) fn new(stream: S, init: B, f: F) -> Self {
FoldFuture { FoldFuture {
stream, stream,
f, f,
acc: Some(init), acc: Some(init),
__t: PhantomData,
} }
} }
} }
impl<S, F, B> Future for FoldFuture<S, F, S::Item, B> impl<S, F, B> Future for FoldFuture<S, F, B>
where where
S: Stream + Sized, S: Stream + Sized,
F: FnMut(B, S::Item) -> B, F: FnMut(B, S::Item) -> B,

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future; use std::future::Future;
@ -10,25 +9,23 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct ForEachFuture<S, F, T> { pub struct ForEachFuture<S, F> {
#[pin] #[pin]
stream: S, stream: S,
f: F, f: F,
__t: PhantomData<T>,
} }
} }
impl<S, F, T> ForEachFuture<S, F, T> { impl<S, F> ForEachFuture<S, F> {
pub(super) fn new(stream: S, f: F) -> Self { pub(super) fn new(stream: S, f: F) -> Self {
ForEachFuture { ForEachFuture {
stream, stream,
f, f,
__t: PhantomData,
} }
} }
} }
impl<S, F> Future for ForEachFuture<S, F, S::Item> impl<S, F> Future for ForEachFuture<S, F>
where where
S: Stream + Sized, S: Stream + Sized,
F: FnMut(S::Item), F: FnMut(S::Item),

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`inspect`]: trait.Stream.html#method.inspect /// [`inspect`]: trait.Stream.html#method.inspect
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Inspect<S, F, T> { pub struct Inspect<S, F> {
#[pin] #[pin]
stream: S, stream: S,
f: F, f: F,
__t: PhantomData<T>,
} }
} }
impl<S, F, T> Inspect<S, F, T> { impl<S, F> Inspect<S, F> {
pub(super) fn new(stream: S, f: F) -> Self { pub(super) fn new(stream: S, f: F) -> Self {
Inspect { Inspect {
stream, stream,
f, f,
__t: PhantomData,
} }
} }
} }
impl<S, F> Stream for Inspect<S, F, S::Item> impl<S, F> Stream for Inspect<S, F>
where where
S: Stream, S: Stream,
F: FnMut(&S::Item), F: FnMut(&S::Item),

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -7,29 +6,25 @@ use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] /// A stream that maps value of another stream with a function.
#[allow(missing_debug_implementations)] #[derive(Debug)]
pub struct Map<S, F, T, B> { pub struct Map<S, F> {
#[pin] #[pin]
stream: S, stream: S,
f: F, f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
} }
} }
impl<S, F, T, B> Map<S, F, T, B> { impl<S, F> Map<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self { pub(crate) fn new(stream: S, f: F) -> Self {
Map { Map {
stream, stream,
f, f,
__from: PhantomData,
__to: PhantomData,
} }
} }
} }
impl<S, F, B> Stream for Map<S, F, S::Item, B> impl<S, F, B> Stream for Map<S, F>
where where
S: Stream, S: Stream,
F: FnMut(S::Item) -> B, F: FnMut(S::Item) -> B,

@ -288,6 +288,7 @@ extension_trait! {
Creates a stream that yields elements based on a predicate. Creates a stream that yields elements based on a predicate.
# Examples # Examples
``` ```
# fn main() { async_std::task::block_on(async { # fn main() { async_std::task::block_on(async {
# #
@ -300,12 +301,11 @@ extension_trait! {
assert_eq!(s.next().await, Some(1)); assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None); assert_eq!(s.next().await, None);
# #
# }) } # }) }
``` ```
"#] "#]
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P, Self::Item> fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where where
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
@ -410,10 +410,10 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn cloned<'a,T>(self) -> Cloned<Self> fn cloned<'a, T>(self) -> Cloned<Self>
where where
Self: Sized + Stream<Item = &'a T>, Self: Sized + Stream<Item = &'a T>,
T : 'a + Clone, T: Clone + 'a,
{ {
Cloned::new(self) Cloned::new(self)
} }
@ -443,10 +443,10 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn copied<'a,T>(self) -> Copied<Self> fn copied<'a, T>(self) -> Copied<Self>
where where
Self: Sized + Stream<Item = &'a T>, Self: Sized + Stream<Item = &'a T>,
T : 'a + Copy, T: Copy + 'a,
{ {
Copied::new(self) Copied::new(self)
} }
@ -475,10 +475,9 @@ extension_trait! {
# }) # })
``` ```
"#] "#]
fn cycle(self) -> Cycle<Self, Self::Item> fn cycle(self) -> Cycle<Self>
where where
Self: Sized, Self: Clone + Sized,
Self::Item: Clone,
{ {
Cycle::new(self) Cycle::new(self)
} }
@ -505,7 +504,6 @@ extension_trait! {
assert_eq!(s.next().await, Some((1, 'b'))); assert_eq!(s.next().await, Some((1, 'b')));
assert_eq!(s.next().await, Some((2, 'c'))); assert_eq!(s.next().await, Some((2, 'c')));
assert_eq!(s.next().await, None); assert_eq!(s.next().await, None);
# #
# }) } # }) }
``` ```
@ -540,7 +538,7 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B> fn map<B, F>(self, f: F) -> Map<Self, F>
where where
Self: Sized, Self: Sized,
F: FnMut(Self::Item) -> B, F: FnMut(Self::Item) -> B,
@ -565,17 +563,18 @@ extension_trait! {
let s = stream::from_iter(vec![1, 2, 3, 4, 5]); let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
let sum = s let sum = s
.inspect(|x| println!("about to filter {}", x)) .inspect(|x| println!("about to filter {}", x))
.filter(|x| x % 2 == 0) .filter(|x| x % 2 == 0)
.inspect(|x| println!("made it through filter: {}", x)) .inspect(|x| println!("made it through filter: {}", x))
.fold(0, |sum, i| sum + i).await; .fold(0, |sum, i| sum + i)
.await;
assert_eq!(sum, 6); assert_eq!(sum, 6);
# #
# }) } # }) }
``` ```
"#] "#]
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item> fn inspect<F>(self, f: F) -> Inspect<Self, F>
where where
Self: Sized, Self: Sized,
F: FnMut(&Self::Item), F: FnMut(&Self::Item),
@ -618,7 +617,6 @@ extension_trait! {
# #
# }) } # }) }
``` ```
"#] "#]
fn last( fn last(
self, self,
@ -685,7 +683,7 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item> fn filter<P>(self, predicate: P) -> Filter<Self, P>
where where
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
@ -721,7 +719,7 @@ extension_trait! {
"#] "#]
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F> fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where where
Self: Sized, Self: Sized,
U: IntoStream, U: IntoStream,
@ -755,7 +753,7 @@ extension_trait! {
"#] "#]
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flatten(self) -> Flatten<Self, Self::Item> fn flatten(self) -> Flatten<Self>
where where
Self: Sized, Self: Sized,
Self::Item: IntoStream, Self::Item: IntoStream,
@ -796,7 +794,7 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B> fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
where where
Self: Sized, Self: Sized,
F: FnMut(Self::Item) -> Option<B>, F: FnMut(Self::Item) -> Option<B>,
@ -804,7 +802,7 @@ extension_trait! {
FilterMap::new(self, f) FilterMap::new(self, f)
} }
#[doc = r#" #[doc = r#"
Returns the element that gives the minimum value with respect to the Returns the element that gives the minimum value with respect to the
specified key function. If several elements are equally minimum, specified key function. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned. the first element is returned. If the stream is empty, `None` is returned.
@ -828,19 +826,19 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn min_by_key<K>( fn min_by_key<B, F>(
self, self,
key_by: K, key_by: F,
) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, K>] ) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, F>]
where where
Self: Sized, Self: Sized,
Self::Item: Ord, B: Ord,
K: FnMut(&Self::Item) -> Self::Item, F: FnMut(&Self::Item) -> B,
{ {
MinByKeyFuture::new(self, key_by) MinByKeyFuture::new(self, key_by)
} }
#[doc = r#" #[doc = r#"
Returns the element that gives the maximum value with respect to the Returns the element that gives the maximum value with respect to the
specified key function. If several elements are equally maximum, specified key function. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned. the first element is returned. If the stream is empty, `None` is returned.
@ -864,14 +862,14 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn max_by_key<K>( fn max_by_key<B, F>(
self, self,
key_by: K, key_by: F,
) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, K>] ) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, F>]
where where
Self: Sized, Self: Sized,
Self::Item: Ord, B: Ord,
K: FnMut(&Self::Item) -> Self::Item, F: FnMut(&Self::Item) -> B,
{ {
MaxByKeyFuture::new(self, key_by) MaxByKeyFuture::new(self, key_by)
} }
@ -1043,7 +1041,7 @@ extension_trait! {
n: usize, n: usize,
) -> impl Future<Output = Option<Self::Item>> + '_ [NthFuture<'_, Self>] ) -> impl Future<Output = Option<Self::Item>> + '_ [NthFuture<'_, Self>]
where where
Self: Sized, Self: Unpin + Sized,
{ {
NthFuture::new(self, n) NthFuture::new(self, n)
} }
@ -1151,9 +1149,9 @@ extension_trait! {
fn find<P>( fn find<P>(
&mut self, &mut self,
p: P, p: P,
) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P, Self::Item>] ) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P>]
where where
Self: Sized, Self: Unpin + Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
{ {
FindFuture::new(self, p) FindFuture::new(self, p)
@ -1179,9 +1177,9 @@ extension_trait! {
fn find_map<F, B>( fn find_map<F, B>(
&mut self, &mut self,
f: F, f: F,
) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F, Self::Item, B>] ) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F>]
where where
Self: Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> Option<B>, F: FnMut(Self::Item) -> Option<B>,
{ {
FindMapFuture::new(self, f) FindMapFuture::new(self, f)
@ -1213,7 +1211,7 @@ extension_trait! {
self, self,
init: B, init: B,
f: F, f: F,
) -> impl Future<Output = B> [FoldFuture<Self, F, Self::Item, B>] ) -> impl Future<Output = B> [FoldFuture<Self, F, B>]
where where
Self: Sized, Self: Sized,
F: FnMut(B, Self::Item) -> B, F: FnMut(B, Self::Item) -> B,
@ -1248,7 +1246,7 @@ extension_trait! {
fn for_each<F>( fn for_each<F>(
self, self,
f: F, f: F,
) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>] ) -> impl Future<Output = ()> [ForEachFuture<Self, F>]
where where
Self: Sized, Self: Sized,
F: FnMut(Self::Item), F: FnMut(Self::Item),
@ -1389,7 +1387,7 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item> fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
where where
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
@ -1472,7 +1470,7 @@ extension_trait! {
use async_std::prelude::*; use async_std::prelude::*;
use async_std::stream; use async_std::stream;
let s = stream::from_iter(vec![1usize, 2, 3]); let mut s = stream::from_iter(vec![1usize, 2, 3]);
let sum = s.try_fold(0, |acc, v| { let sum = s.try_fold(0, |acc, v| {
if (acc+v) % 2 == 1 { if (acc+v) % 2 == 1 {
Ok(v+3) Ok(v+3)
@ -1487,12 +1485,12 @@ extension_trait! {
``` ```
"#] "#]
fn try_fold<B, F, T, E>( fn try_fold<B, F, T, E>(
self, &mut self,
init: T, init: T,
f: F, f: F,
) -> impl Future<Output = Result<T, E>> [TryFoldFuture<Self, F, T>] ) -> impl Future<Output = Result<T, E>> + '_ [TryFoldFuture<'_, Self, F, T>]
where where
Self: Sized, Self: Unpin + Sized,
F: FnMut(B, Self::Item) -> Result<T, E>, F: FnMut(B, Self::Item) -> Result<T, E>,
{ {
TryFoldFuture::new(self, init, f) TryFoldFuture::new(self, init, f)
@ -1512,7 +1510,7 @@ extension_trait! {
let (tx, rx) = channel(); let (tx, rx) = channel();
let s = stream::from_iter(vec![1u8, 2, 3]); let mut s = stream::from_iter(vec![1u8, 2, 3]);
let s = s.try_for_each(|v| { let s = s.try_for_each(|v| {
if v % 2 == 1 { if v % 2 == 1 {
tx.clone().send(v).unwrap(); tx.clone().send(v).unwrap();
@ -1533,11 +1531,11 @@ extension_trait! {
``` ```
"#] "#]
fn try_for_each<F, E>( fn try_for_each<F, E>(
self, &mut self,
f: F, f: F,
) -> impl Future<Output = E> [TryForEachFuture<Self, F, Self::Item, E>] ) -> impl Future<Output = E> + 'a [TryForEachFuture<'_, Self, F>]
where where
Self: Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> Result<(), E>, F: FnMut(Self::Item) -> Result<(), E>,
{ {
TryForEachFuture::new(self, f) TryForEachFuture::new(self, f)
@ -1582,7 +1580,7 @@ extension_trait! {
#[inline] #[inline]
fn zip<U>(self, other: U) -> Zip<Self, U> fn zip<U>(self, other: U) -> Zip<Self, U>
where where
Self: Sized + Stream, Self: Sized,
U: Stream, U: Stream,
{ {
Zip::new(self, other) Zip::new(self, other)
@ -1641,7 +1639,6 @@ extension_trait! {
"#] "#]
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>( fn collect<'a, B>(
self, self,
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>] ) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
@ -1740,28 +1737,28 @@ extension_trait! {
use async_std::stream; use async_std::stream;
let s = stream::from_iter(vec![1usize, 2, 3]); let s = stream::from_iter(vec![1usize, 2, 3]);
let res = s.clone().position(|x| *x == 1).await; let res = s.clone().position(|x| x == 1).await;
assert_eq!(res, Some(0)); assert_eq!(res, Some(0));
let res = s.clone().position(|x| *x == 2).await; let res = s.clone().position(|x| x == 2).await;
assert_eq!(res, Some(1)); assert_eq!(res, Some(1));
let res = s.clone().position(|x| *x == 3).await; let res = s.clone().position(|x| x == 3).await;
assert_eq!(res, Some(2)); assert_eq!(res, Some(2));
let res = s.clone().position(|x| *x == 4).await; let res = s.clone().position(|x| x == 4).await;
assert_eq!(res, None); assert_eq!(res, None);
# #
# }) } # }) }
``` ```
"#] "#]
fn position<P>( fn position<P>(
self, &mut self,
predicate: P predicate: P,
) -> impl Future<Output = Option<usize>> [PositionFuture<Self, P>] ) -> impl Future<Output = Option<usize>> + '_ [PositionFuture<'_, Self, P>]
where where
Self: Sized, Self: Unpin + Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(Self::Item) -> bool,
{ {
PositionFuture::new(self, predicate) PositionFuture::new(self, predicate)
} }
@ -1805,10 +1802,12 @@ extension_trait! {
CmpFuture::new(self, other) CmpFuture::new(self, other)
} }
#[doc = r#" #[doc = r#"
Determines if the elements of this `Stream` are lexicographically Determines if the elements of this `Stream` are lexicographically
not equal to those of another. not equal to those of another.
# Examples # Examples
``` ```
# fn main() { async_std::task::block_on(async { # fn main() { async_std::task::block_on(async {
# #
@ -1833,7 +1832,7 @@ extension_trait! {
other: S other: S
) -> impl Future<Output = bool> [NeFuture<Self, S>] ) -> impl Future<Output = bool> [NeFuture<Self, S>]
where where
Self: Sized + Stream, Self: Sized,
S: Sized + Stream, S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>, <Self as Stream>::Item: PartialEq<S::Item>,
{ {
@ -2026,11 +2025,11 @@ extension_trait! {
} }
#[doc = r#" #[doc = r#"
Sums the elements of an iterator. Sums the elements of a stream.
Takes each element, adds them together, and returns the result. Takes each element, adds them together, and returns the result.
An empty iterator returns the zero value of the type. An empty streams returns the zero value of the type.
# Panics # Panics
@ -2063,15 +2062,15 @@ extension_trait! {
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>] ) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
where where
Self: Sized + Stream<Item = S> + 'a, Self: Sized + Stream<Item = S> + 'a,
S: Sum, S: Sum<Self::Item>,
{ {
Sum::sum(self) Sum::sum(self)
} }
#[doc = r#" #[doc = r#"
Iterates over the entire iterator, multiplying all the elements Multiplies all elements of the stream.
An empty iterator returns the one value of the type. An empty stream returns the one value of the type.
# Panics # Panics

@ -1,24 +1,21 @@
use std::pin::Pin;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { #[doc(hidden)]
#[doc(hidden)] #[allow(missing_debug_implementations)]
#[allow(missing_debug_implementations)] pub struct PositionFuture<'a, S, P> {
pub struct PositionFuture<S, P> { stream: &'a mut S,
#[pin] predicate: P,
stream: S, index: usize,
predicate: P,
index:usize,
}
} }
impl<S, P> PositionFuture<S, P> { impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {}
pub(super) fn new(stream: S, predicate: P) -> Self {
impl<'a, S, P> PositionFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, predicate: P) -> Self {
PositionFuture { PositionFuture {
stream, stream,
predicate, predicate,
@ -27,23 +24,25 @@ impl<S, P> PositionFuture<S, P> {
} }
} }
impl<S, P> Future for PositionFuture<S, P> impl<'a, S, P> Future for PositionFuture<'a, S, P>
where where
S: Stream, S: Stream + Unpin,
P: FnMut(&S::Item) -> bool, P: FnMut(S::Item) -> bool,
{ {
type Output = Option<usize>; type Output = Option<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
let next = futures_core::ready!(this.stream.poll_next(cx));
match next { match next {
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), Some(v) => {
Some(_) => { if (&mut self.predicate)(v) {
cx.waker().wake_by_ref(); Poll::Ready(Some(self.index))
*this.index += 1; } else {
Poll::Pending cx.waker().wake_by_ref();
self.index += 1;
Poll::Pending
}
} }
None => Poll::Ready(None), None => Poll::Ready(None),
} }

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`skip_while`]: trait.Stream.html#method.skip_while /// [`skip_while`]: trait.Stream.html#method.skip_while
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct SkipWhile<S, P, T> { pub struct SkipWhile<S, P> {
#[pin] #[pin]
stream: S, stream: S,
predicate: Option<P>, predicate: Option<P>,
__t: PhantomData<T>,
} }
} }
impl<S, P, T> SkipWhile<S, P, T> { impl<S, P> SkipWhile<S, P> {
pub(crate) fn new(stream: S, predicate: P) -> Self { pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile { SkipWhile {
stream, stream,
predicate: Some(predicate), predicate: Some(predicate),
__t: PhantomData,
} }
} }
} }
impl<S, P> Stream for SkipWhile<S, P, S::Item> impl<S, P> Stream for SkipWhile<S, P>
where where
S: Stream, S: Stream,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`take_while`]: trait.Stream.html#method.take_while /// [`take_while`]: trait.Stream.html#method.take_while
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct TakeWhile<S, P, T> { pub struct TakeWhile<S, P> {
#[pin] #[pin]
stream: S, stream: S,
predicate: P, predicate: P,
__t: PhantomData<T>,
} }
} }
impl<S, P, T> TakeWhile<S, P, T> { impl<S, P> TakeWhile<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self { pub(super) fn new(stream: S, predicate: P) -> Self {
TakeWhile { TakeWhile {
stream, stream,
predicate, predicate,
__t: PhantomData,
} }
} }
} }
impl<S, P> Stream for TakeWhile<S, P, S::Item> impl<S, P> Stream for TakeWhile<S, P>
where where
S: Stream, S: Stream,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,

@ -22,7 +22,7 @@ pin_project! {
} }
impl<S: Stream> Timeout<S> { impl<S: Stream> Timeout<S> {
pub fn new(stream: S, dur: Duration) -> Timeout<S> { pub(crate) fn new(stream: S, dur: Duration) -> Timeout<S> {
let delay = Delay::new(dur); let delay = Delay::new(dur);
Timeout { stream, delay } Timeout { stream, delay }

@ -1,58 +1,51 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { #[doc(hidden)]
#[doc(hidden)] #[allow(missing_debug_implementations)]
#[allow(missing_debug_implementations)] pub struct TryFoldFuture<'a, S, F, T> {
pub struct TryFoldFuture<S, F, T> { stream: &'a mut S,
#[pin] f: F,
stream: S, acc: Option<T>,
f: F,
acc: Option<T>,
__t: PhantomData<T>,
}
} }
impl<S, F, T> TryFoldFuture<S, F, T> { impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {}
pub(super) fn new(stream: S, init: T, f: F) -> Self {
impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> {
pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self {
TryFoldFuture { TryFoldFuture {
stream, stream,
f, f,
acc: Some(init), acc: Some(init),
__t: PhantomData,
} }
} }
} }
impl<S, F, T, E> Future for TryFoldFuture<S, F, T> impl<'a, S, F, T, E> Future for TryFoldFuture<'a, S, F, T>
where where
S: Stream + Sized, S: Stream + Unpin,
F: FnMut(T, S::Item) -> Result<T, E>, F: FnMut(T, S::Item) -> Result<T, E>,
{ {
type Output = Result<T, E>; type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop { loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
match next { match next {
Some(v) => { Some(v) => {
let old = this.acc.take().unwrap(); let old = self.acc.take().unwrap();
let new = (this.f)(old, v); let new = (&mut self.f)(old, v);
match new { match new {
Ok(o) => *this.acc = Some(o), Ok(o) => self.acc = Some(o),
Err(e) => return Poll::Ready(Err(e)), Err(e) => return Poll::Ready(Err(e)),
} }
} }
None => return Poll::Ready(Ok(this.acc.take().unwrap())), None => return Poll::Ready(Ok(self.acc.take().unwrap())),
} }
} }
} }

@ -1,52 +1,39 @@
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { #[doc(hidden)]
#[doc(hidden)] #[allow(missing_debug_implementations)]
#[allow(missing_debug_implementations)] pub struct TryForEachFuture<'a, S, F> {
pub struct TryForEachFuture<S, F, T, R> { stream: &'a mut S,
#[pin] f: F,
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<R>,
}
} }
impl<S, F, T, R> TryForEachFuture<S, F, T, R> { impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {}
pub(crate) fn new(stream: S, f: F) -> Self {
TryForEachFuture { impl<'a, S, F> TryForEachFuture<'a, S, F> {
stream, pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
f, TryForEachFuture { stream, f }
__from: PhantomData,
__to: PhantomData,
}
} }
} }
impl<S, F, E> Future for TryForEachFuture<S, F, S::Item, E> impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
where where
S: Stream, S: Stream + Unpin,
S::Item: std::fmt::Debug,
F: FnMut(S::Item) -> Result<(), E>, F: FnMut(S::Item) -> Result<(), E>,
{ {
type Output = Result<(), E>; type Output = Result<(), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop { loop {
let item = futures_core::ready!(this.stream.as_mut().poll_next(cx)); let item = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
match item { match item {
None => return Poll::Ready(Ok(())), None => return Poll::Ready(Ok(())),
Some(v) => { Some(v) => {
let res = (this.f)(v); let res = (&mut self.f)(v);
if let Err(e) = res { if let Err(e) = res {
return Poll::Ready(Err(e)); return Poll::Ready(Err(e));
} }

@ -7,7 +7,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// An iterator that iterates two other iterators simultaneously. /// A stream that takes items from two other streams simultaneously.
/// ///
/// This `struct` is created by the [`zip`] method on [`Stream`]. See its /// This `struct` is created by the [`zip`] method on [`Stream`]. See its
/// documentation for more. /// documentation for more.

Loading…
Cancel
Save