diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 7b84abe..5adb68f 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -14,6 +14,17 @@ pub struct AllFuture<'a, S, F, T> { pub(crate) _marker: PhantomData, } +impl<'a, S, F, T> AllFuture<'a, S, F, T> { + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { + Self { + stream, + f, + result: true, // the default if the empty stream + _marker: PhantomData, + } + } +} + impl Unpin for AllFuture<'_, S, F, T> {} impl Future for AllFuture<'_, S, F, S::Item> diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index c7fc766..d6853a1 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -14,6 +14,17 @@ pub struct AnyFuture<'a, S, F, T> { pub(crate) _marker: PhantomData, } +impl<'a, S, F, T> AnyFuture<'a, S, F, T> { + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { + Self { + stream, + f, + result: false, // the default if the empty stream + _marker: PhantomData, + } + } +} + impl Unpin for AnyFuture<'_, S, F, T> {} impl Future for AnyFuture<'_, S, F, S::Item> diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index f6d9cf6..909fc19 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -25,7 +25,7 @@ pin_project! { impl Chain { pub(super) fn new(first: S, second: U) -> Self { - Chain { + Self { first: first.fuse(), second: second.fuse(), } diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs index 19437e7..2be0c1a 100644 --- a/src/stream/stream/cmp.rs +++ b/src/stream/stream/cmp.rs @@ -26,7 +26,7 @@ pin_project! { impl CmpFuture { pub(super) fn new(l: L, r: R) -> Self { - CmpFuture { + Self { l: l.fuse(), r: r.fuse(), l_cache: None, diff --git a/src/stream/stream/copied.rs b/src/stream/stream/copied.rs index e3c8367..651c31b 100644 --- a/src/stream/stream/copied.rs +++ b/src/stream/stream/copied.rs @@ -14,7 +14,7 @@ pin_project! { impl Copied { pub(super) fn new(stream: S) -> Self { - Copied { stream } + Self { stream } } } diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index 09657cf..ebf2a2f 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -20,7 +20,7 @@ pin_project! { impl CountFuture { pub(crate) fn new(stream: S) -> Self { - CountFuture { stream, count: 0 } + Self { stream, count: 0 } } } diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs index 7f01a61..5f8eaa2 100644 --- a/src/stream/stream/cycle.rs +++ b/src/stream/stream/cycle.rs @@ -15,8 +15,8 @@ impl Cycle where S: Stream + Clone, { - pub fn new(source: S) -> Cycle { - Cycle { + pub(crate) fn new(source: S) -> Self { + Self { orig: source.clone(), source: ManuallyDrop::new(source), } diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index a758010..c4a37d6 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -16,7 +16,7 @@ pin_project! { impl Enumerate { pub(super) fn new(stream: S) -> Self { - Enumerate { stream, i: 0 } + Self { stream, i: 0 } } } diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs index addcfa2..58ccc90 100644 --- a/src/stream/stream/eq.rs +++ b/src/stream/stream/eq.rs @@ -26,7 +26,7 @@ where L::Item: PartialEq, { pub(super) fn new(l: L, r: R) -> Self { - EqFuture { + Self { l: l.fuse(), r: r.fuse(), } diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 594b094..00344b0 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -23,7 +23,7 @@ pin_project! { impl Filter { pub(super) fn new(stream: S, predicate: P) -> Self { - Filter { + Self { stream, predicate, } diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index e110f51..3cd1e47 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -16,7 +16,7 @@ pin_project! { impl FilterMap { pub(crate) fn new(stream: S, f: F) -> Self { - FilterMap { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs index 0c5ad62..4a0749b 100644 --- a/src/stream/stream/find.rs +++ b/src/stream/stream/find.rs @@ -13,7 +13,7 @@ pub struct FindFuture<'a, S, P> { impl<'a, S, P> FindFuture<'a, S, P> { pub(super) fn new(stream: &'a mut S, p: P) -> Self { - FindFuture { stream, p } + Self { stream, p } } } diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index b10bd9c..c794943 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -13,7 +13,7 @@ pub struct FindMapFuture<'a, S, F> { impl<'a, S, F> FindMapFuture<'a, S, F> { pub(super) fn new(stream: &'a mut S, f: F) -> Self { - FindMapFuture { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index ab45c9c..6c828c9 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -30,8 +30,8 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pub(super) fn new(stream: S, f: F) -> FlatMap { - FlatMap { + pub(super) fn new(stream: S, f: F) -> Self { + Self { stream: stream.map(f), inner_stream: None, } diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index edaffd0..1d6fcae 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -32,8 +32,8 @@ where S: Stream, S::Item: IntoStream, { - pub(super) fn new(stream: S) -> Flatten { - Flatten { + pub(super) fn new(stream: S) -> Self { + Self { stream, inner_stream: None, } diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index c4da591..a346eb6 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -18,7 +18,7 @@ pin_project! { impl FoldFuture { pub(super) fn new(stream: S, init: B, f: F) -> Self { - FoldFuture { + Self { stream, f, acc: Some(init), diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index 01833fd..dce5cda 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -18,7 +18,7 @@ pin_project! { impl ForEachFuture { pub(super) fn new(stream: S, f: F) -> Self { - ForEachFuture { + Self { stream, f, } diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 6297bef..c7449c2 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -21,6 +21,15 @@ pin_project! { } } +impl Fuse { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + done: false, + } + } +} + impl Stream for Fuse { type Item = S::Item; diff --git a/src/stream/stream/ge.rs b/src/stream/stream/ge.rs index f901269..67b20be 100644 --- a/src/stream/stream/ge.rs +++ b/src/stream/stream/ge.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - GeFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/gt.rs b/src/stream/stream/gt.rs index 81e95a1..1c12189 100644 --- a/src/stream/stream/gt.rs +++ b/src/stream/stream/gt.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - GtFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index acf2246..bb39662 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -23,7 +23,7 @@ pin_project! { impl Inspect { pub(super) fn new(stream: S, f: F) -> Self { - Inspect { + Self { stream, f, } diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index 188da3c..3e0a0b3 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -18,7 +18,7 @@ pin_project! { impl LastFuture { pub(crate) fn new(stream: S) -> Self { - LastFuture { stream, last: None } + Self { stream, last: None } } } diff --git a/src/stream/stream/le.rs b/src/stream/stream/le.rs index 35b04bf..7b86161 100644 --- a/src/stream/stream/le.rs +++ b/src/stream/stream/le.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - LeFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/lt.rs b/src/stream/stream/lt.rs index 86c3129..100a003 100644 --- a/src/stream/stream/lt.rs +++ b/src/stream/stream/lt.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - LtFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 7accb6f..8e074a7 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -17,7 +17,7 @@ pin_project! { impl Map { pub(crate) fn new(stream: S, f: F) -> Self { - Map { + Self { stream, f, } diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index cfba9b9..36b876b 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -20,7 +20,7 @@ pin_project! { impl MaxByFuture { pub(super) fn new(stream: S, compare: F) -> Self { - MaxByFuture { + Self { stream, compare, max: None, diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index fc332c2..e35719e 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -20,7 +20,7 @@ pin_project! { impl MinByFuture { pub(super) fn new(stream: S, compare: F) -> Self { - MinByFuture { + Self { stream, compare, min: None, diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index 142dfe1..07c3642 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -20,7 +20,7 @@ pin_project! { impl MinByKeyFuture { pub(super) fn new(stream: S, key_by: K) -> Self { - MinByKeyFuture { + Self { stream, min: None, key_by, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e3b90e4..d140228 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -111,7 +111,6 @@ pub use take_while::TakeWhile; pub use zip::Zip; use std::cmp::Ordering; -use std::marker::PhantomData; cfg_unstable! { use std::future::Future; @@ -288,10 +287,7 @@ extension_trait! { where Self: Sized, { - Take { - stream: self, - remaining: n, - } + Take::new(self, n) } #[doc = r#" @@ -714,10 +710,7 @@ extension_trait! { where Self: Sized, { - Fuse { - stream: self, - done: false, - } + Fuse::new(self) } #[doc = r#" @@ -1193,12 +1186,7 @@ extension_trait! { Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { - AllFuture { - stream: self, - result: true, // the default if the empty stream - _marker: PhantomData, - f, - } + AllFuture::new(self, f) } #[doc = r#" @@ -1438,12 +1426,7 @@ extension_trait! { Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { - AnyFuture { - stream: self, - result: false, // the default if the empty stream - _marker: PhantomData, - f, - } + AnyFuture::new(self, f) } #[doc = r#" @@ -1468,7 +1451,7 @@ extension_trait! { assert_eq!(sum, 6); // if we try to use stream again, it won't work. The following line - // gives "error: use of moved value: `stream` + // gives error: use of moved value: `stream` // assert_eq!(stream.next(), None); // let's try that again diff --git a/src/stream/stream/nth.rs b/src/stream/stream/nth.rs index 711287a..267bd40 100644 --- a/src/stream/stream/nth.rs +++ b/src/stream/stream/nth.rs @@ -15,7 +15,7 @@ impl Unpin for NthFuture<'_, S> {} impl<'a, S> NthFuture<'a, S> { pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { - NthFuture { stream, n } + Self { stream, n } } } diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index 6bc28f7..85587c9 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -26,7 +26,7 @@ pin_project! { impl PartialCmpFuture { pub(super) fn new(l: L, r: R) -> Self { - PartialCmpFuture { + Self { l: l.fuse(), r: r.fuse(), l_cache: None, diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs index 5a51d7a..df60eaa 100644 --- a/src/stream/stream/position.rs +++ b/src/stream/stream/position.rs @@ -16,7 +16,7 @@ impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {} impl<'a, S, P> PositionFuture<'a, S, P> { pub(super) fn new(stream: &'a mut S, predicate: P) -> Self { - PositionFuture { + Self { stream, predicate, index: 0, diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index cc2ba90..bcff50d 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -23,7 +23,7 @@ pin_project! { impl Skip { pub(crate) fn new(stream: S, n: usize) -> Self { - Skip { stream, n } + Self { stream, n } } } diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 5cb273e..2334713 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -23,7 +23,7 @@ pin_project! { impl SkipWhile { pub(crate) fn new(stream: S, predicate: P) -> Self { - SkipWhile { + Self { stream, predicate: Some(predicate), } diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs index 1302098..2149cda 100644 --- a/src/stream/stream/step_by.rs +++ b/src/stream/stream/step_by.rs @@ -24,7 +24,7 @@ pin_project! { impl StepBy { pub(crate) fn new(stream: S, step: usize) -> Self { - StepBy { + Self { stream, step: step.checked_sub(1).unwrap(), i: 0, diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index e680b42..8c85227 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -21,6 +21,15 @@ pin_project! { } } +impl Take { + pub(super) fn new(stream: S, remaining: usize) -> Self { + Self { + stream, + remaining, + } + } +} + impl Stream for Take { type Item = S::Item; diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 08b5a86..2ba8490 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -23,7 +23,7 @@ pin_project! { impl TakeWhile { pub(super) fn new(stream: S, predicate: P) -> Self { - TakeWhile { + Self { stream, predicate, } diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 8896899..b2480bb 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -31,7 +31,7 @@ pin_project! { impl Throttle { pub(super) fn new(stream: S, duration: Duration) -> Self { - Throttle { + Self { stream, duration, blocked: false, diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 560a0e4..f580360 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -22,10 +22,10 @@ pin_project! { } impl Timeout { - pub(crate) fn new(stream: S, dur: Duration) -> Timeout { + pub(crate) fn new(stream: S, dur: Duration) -> Self { let delay = Delay::new(dur); - Timeout { stream, delay } + Self { stream, delay } } } diff --git a/src/stream/stream/try_fold.rs b/src/stream/stream/try_fold.rs index efb9e33..3b92d95 100644 --- a/src/stream/stream/try_fold.rs +++ b/src/stream/stream/try_fold.rs @@ -16,7 +16,7 @@ impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {} impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> { pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self { - TryFoldFuture { + Self { stream, f, acc: Some(init), diff --git a/src/stream/stream/try_for_each.rs b/src/stream/stream/try_for_each.rs index 30e3185..86f1674 100644 --- a/src/stream/stream/try_for_each.rs +++ b/src/stream/stream/try_for_each.rs @@ -15,7 +15,7 @@ impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {} impl<'a, S, F> TryForEachFuture<'a, S, F> { pub(crate) fn new(stream: &'a mut S, f: F) -> Self { - TryForEachFuture { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index f57d735..597691b 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -34,7 +34,7 @@ impl fmt::Debug for Zip { impl Zip { pub(crate) fn new(first: A, second: B) -> Self { - Zip { + Self { item_slot: None, first, second,