diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 7b84abe3..5adb68f3 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -14,6 +14,17 @@ pub struct AllFuture<'a, S, F, T> { pub(crate) _marker: PhantomData, } +impl<'a, S, F, T> AllFuture<'a, S, F, T> { + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { + Self { + stream, + f, + result: true, // the default if the empty stream + _marker: PhantomData, + } + } +} + impl Unpin for AllFuture<'_, S, F, T> {} impl Future for AllFuture<'_, S, F, S::Item> diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index c7fc7665..d6853a1c 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -14,6 +14,17 @@ pub struct AnyFuture<'a, S, F, T> { pub(crate) _marker: PhantomData, } +impl<'a, S, F, T> AnyFuture<'a, S, F, T> { + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { + Self { + stream, + f, + result: false, // the default if the empty stream + _marker: PhantomData, + } + } +} + impl Unpin for AnyFuture<'_, S, F, T> {} impl Future for AnyFuture<'_, S, F, S::Item> diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index f6d9cf64..909fc19b 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -25,7 +25,7 @@ pin_project! { impl Chain { pub(super) fn new(first: S, second: U) -> Self { - Chain { + Self { first: first.fuse(), second: second.fuse(), } diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs index 19437e70..2be0c1a3 100644 --- a/src/stream/stream/cmp.rs +++ b/src/stream/stream/cmp.rs @@ -26,7 +26,7 @@ pin_project! { impl CmpFuture { pub(super) fn new(l: L, r: R) -> Self { - CmpFuture { + Self { l: l.fuse(), r: r.fuse(), l_cache: None, diff --git a/src/stream/stream/copied.rs b/src/stream/stream/copied.rs index e3c8367b..651c31b6 100644 --- a/src/stream/stream/copied.rs +++ b/src/stream/stream/copied.rs @@ -14,7 +14,7 @@ pin_project! { impl Copied { pub(super) fn new(stream: S) -> Self { - Copied { stream } + Self { stream } } } diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index 09657cff..ebf2a2f1 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -20,7 +20,7 @@ pin_project! { impl CountFuture { pub(crate) fn new(stream: S) -> Self { - CountFuture { stream, count: 0 } + Self { stream, count: 0 } } } diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs index 7f01a61d..5f8eaa20 100644 --- a/src/stream/stream/cycle.rs +++ b/src/stream/stream/cycle.rs @@ -15,8 +15,8 @@ impl Cycle where S: Stream + Clone, { - pub fn new(source: S) -> Cycle { - Cycle { + pub(crate) fn new(source: S) -> Self { + Self { orig: source.clone(), source: ManuallyDrop::new(source), } diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index a758010e..c4a37d6e 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -16,7 +16,7 @@ pin_project! { impl Enumerate { pub(super) fn new(stream: S) -> Self { - Enumerate { stream, i: 0 } + Self { stream, i: 0 } } } diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs index addcfa2e..58ccc90e 100644 --- a/src/stream/stream/eq.rs +++ b/src/stream/stream/eq.rs @@ -26,7 +26,7 @@ where L::Item: PartialEq, { pub(super) fn new(l: L, r: R) -> Self { - EqFuture { + Self { l: l.fuse(), r: r.fuse(), } diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 594b0949..00344b0e 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -23,7 +23,7 @@ pin_project! { impl Filter { pub(super) fn new(stream: S, predicate: P) -> Self { - Filter { + Self { stream, predicate, } diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index e110f514..3cd1e47a 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -16,7 +16,7 @@ pin_project! { impl FilterMap { pub(crate) fn new(stream: S, f: F) -> Self { - FilterMap { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs index 0c5ad62f..4a0749b1 100644 --- a/src/stream/stream/find.rs +++ b/src/stream/stream/find.rs @@ -13,7 +13,7 @@ pub struct FindFuture<'a, S, P> { impl<'a, S, P> FindFuture<'a, S, P> { pub(super) fn new(stream: &'a mut S, p: P) -> Self { - FindFuture { stream, p } + Self { stream, p } } } diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index b10bd9ca..c7949439 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -13,7 +13,7 @@ pub struct FindMapFuture<'a, S, F> { impl<'a, S, F> FindMapFuture<'a, S, F> { pub(super) fn new(stream: &'a mut S, f: F) -> Self { - FindMapFuture { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index ab45c9c7..6c828c92 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -30,8 +30,8 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pub(super) fn new(stream: S, f: F) -> FlatMap { - FlatMap { + pub(super) fn new(stream: S, f: F) -> Self { + Self { stream: stream.map(f), inner_stream: None, } diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index edaffd04..1d6fcae6 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -32,8 +32,8 @@ where S: Stream, S::Item: IntoStream, { - pub(super) fn new(stream: S) -> Flatten { - Flatten { + pub(super) fn new(stream: S) -> Self { + Self { stream, inner_stream: None, } diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index c4da5915..a346eb67 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -18,7 +18,7 @@ pin_project! { impl FoldFuture { pub(super) fn new(stream: S, init: B, f: F) -> Self { - FoldFuture { + Self { stream, f, acc: Some(init), diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index 01833fd9..dce5cdae 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -18,7 +18,7 @@ pin_project! { impl ForEachFuture { pub(super) fn new(stream: S, f: F) -> Self { - ForEachFuture { + Self { stream, f, } diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 6297bef7..c7449c27 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -21,6 +21,15 @@ pin_project! { } } +impl Fuse { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + done: false, + } + } +} + impl Stream for Fuse { type Item = S::Item; diff --git a/src/stream/stream/ge.rs b/src/stream/stream/ge.rs index f9012697..67b20bed 100644 --- a/src/stream/stream/ge.rs +++ b/src/stream/stream/ge.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - GeFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/gt.rs b/src/stream/stream/gt.rs index 81e95a1a..1c121891 100644 --- a/src/stream/stream/gt.rs +++ b/src/stream/stream/gt.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - GtFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index acf22465..bb39662b 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -23,7 +23,7 @@ pin_project! { impl Inspect { pub(super) fn new(stream: S, f: F) -> Self { - Inspect { + Self { stream, f, } diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index 188da3c8..3e0a0b38 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -18,7 +18,7 @@ pin_project! { impl LastFuture { pub(crate) fn new(stream: S) -> Self { - LastFuture { stream, last: None } + Self { stream, last: None } } } diff --git a/src/stream/stream/le.rs b/src/stream/stream/le.rs index 35b04bfb..7b86161c 100644 --- a/src/stream/stream/le.rs +++ b/src/stream/stream/le.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - LeFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/lt.rs b/src/stream/stream/lt.rs index 86c31295..100a0034 100644 --- a/src/stream/stream/lt.rs +++ b/src/stream/stream/lt.rs @@ -25,7 +25,7 @@ where L::Item: PartialOrd, { pub(super) fn new(l: L, r: R) -> Self { - LtFuture { + Self { partial_cmp: l.partial_cmp(r), } } diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 7accb6fc..8e074a75 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -17,7 +17,7 @@ pin_project! { impl Map { pub(crate) fn new(stream: S, f: F) -> Self { - Map { + Self { stream, f, } diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index cfba9b93..36b876bb 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -20,7 +20,7 @@ pin_project! { impl MaxByFuture { pub(super) fn new(stream: S, compare: F) -> Self { - MaxByFuture { + Self { stream, compare, max: None, diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index fc332c26..e35719e6 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -20,7 +20,7 @@ pin_project! { impl MinByFuture { pub(super) fn new(stream: S, compare: F) -> Self { - MinByFuture { + Self { stream, compare, min: None, diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index 8179fb31..c515dad7 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -20,7 +20,7 @@ pin_project! { impl MinByKeyFuture { pub(super) fn new(stream: S, key_by: K) -> Self { - MinByKeyFuture { + Self { stream, min: None, key_by, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index f8765762..220e791a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -111,7 +111,6 @@ pub use take_while::TakeWhile; pub use zip::Zip; use std::cmp::Ordering; -use std::marker::PhantomData; cfg_unstable! { use std::future::Future; @@ -288,10 +287,7 @@ extension_trait! { where Self: Sized, { - Take { - stream: self, - remaining: n, - } + Take::new(self, n) } #[doc = r#" @@ -714,10 +710,7 @@ extension_trait! { where Self: Sized, { - Fuse { - stream: self, - done: false, - } + Fuse::new(self) } #[doc = r#" @@ -1193,12 +1186,7 @@ extension_trait! { Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { - AllFuture { - stream: self, - result: true, // the default if the empty stream - _marker: PhantomData, - f, - } + AllFuture::new(self, f) } #[doc = r#" @@ -1438,12 +1426,7 @@ extension_trait! { Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { - AnyFuture { - stream: self, - result: false, // the default if the empty stream - _marker: PhantomData, - f, - } + AnyFuture::new(self, f) } #[doc = r#" @@ -1468,7 +1451,7 @@ extension_trait! { assert_eq!(sum, 6); // if we try to use stream again, it won't work. The following line - // gives "error: use of moved value: `stream` + // gives error: use of moved value: `stream` // assert_eq!(stream.next(), None); // let's try that again diff --git a/src/stream/stream/nth.rs b/src/stream/stream/nth.rs index 711287a3..267bd40a 100644 --- a/src/stream/stream/nth.rs +++ b/src/stream/stream/nth.rs @@ -15,7 +15,7 @@ impl Unpin for NthFuture<'_, S> {} impl<'a, S> NthFuture<'a, S> { pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { - NthFuture { stream, n } + Self { stream, n } } } diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index 6bc28f78..85587c99 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -26,7 +26,7 @@ pin_project! { impl PartialCmpFuture { pub(super) fn new(l: L, r: R) -> Self { - PartialCmpFuture { + Self { l: l.fuse(), r: r.fuse(), l_cache: None, diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs index 5a51d7a7..df60eaae 100644 --- a/src/stream/stream/position.rs +++ b/src/stream/stream/position.rs @@ -16,7 +16,7 @@ impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {} impl<'a, S, P> PositionFuture<'a, S, P> { pub(super) fn new(stream: &'a mut S, predicate: P) -> Self { - PositionFuture { + Self { stream, predicate, index: 0, diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index cc2ba905..bcff50d6 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -23,7 +23,7 @@ pin_project! { impl Skip { pub(crate) fn new(stream: S, n: usize) -> Self { - Skip { stream, n } + Self { stream, n } } } diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 5cb273ee..23347132 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -23,7 +23,7 @@ pin_project! { impl SkipWhile { pub(crate) fn new(stream: S, predicate: P) -> Self { - SkipWhile { + Self { stream, predicate: Some(predicate), } diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs index 13020982..2149cdad 100644 --- a/src/stream/stream/step_by.rs +++ b/src/stream/stream/step_by.rs @@ -24,7 +24,7 @@ pin_project! { impl StepBy { pub(crate) fn new(stream: S, step: usize) -> Self { - StepBy { + Self { stream, step: step.checked_sub(1).unwrap(), i: 0, diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index e680b42b..8c852276 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -21,6 +21,15 @@ pin_project! { } } +impl Take { + pub(super) fn new(stream: S, remaining: usize) -> Self { + Self { + stream, + remaining, + } + } +} + impl Stream for Take { type Item = S::Item; diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 08b5a86c..2ba8490e 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -23,7 +23,7 @@ pin_project! { impl TakeWhile { pub(super) fn new(stream: S, predicate: P) -> Self { - TakeWhile { + Self { stream, predicate, } diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 8896899e..b2480bbd 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -31,7 +31,7 @@ pin_project! { impl Throttle { pub(super) fn new(stream: S, duration: Duration) -> Self { - Throttle { + Self { stream, duration, blocked: false, diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 560a0e41..f580360d 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -22,10 +22,10 @@ pin_project! { } impl Timeout { - pub(crate) fn new(stream: S, dur: Duration) -> Timeout { + pub(crate) fn new(stream: S, dur: Duration) -> Self { let delay = Delay::new(dur); - Timeout { stream, delay } + Self { stream, delay } } } diff --git a/src/stream/stream/try_fold.rs b/src/stream/stream/try_fold.rs index efb9e339..3b92d95a 100644 --- a/src/stream/stream/try_fold.rs +++ b/src/stream/stream/try_fold.rs @@ -16,7 +16,7 @@ impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {} impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> { pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self { - TryFoldFuture { + Self { stream, f, acc: Some(init), diff --git a/src/stream/stream/try_for_each.rs b/src/stream/stream/try_for_each.rs index 30e31850..86f1674a 100644 --- a/src/stream/stream/try_for_each.rs +++ b/src/stream/stream/try_for_each.rs @@ -15,7 +15,7 @@ impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {} impl<'a, S, F> TryForEachFuture<'a, S, F> { pub(crate) fn new(stream: &'a mut S, f: F) -> Self { - TryForEachFuture { stream, f } + Self { stream, f } } } diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index f57d7359..597691b4 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -34,7 +34,7 @@ impl fmt::Debug for Zip { impl Zip { pub(crate) fn new(first: A, second: B) -> Self { - Zip { + Self { item_slot: None, first, second,