Merge pull request #548 from yjhmelody/fix-stream-code-style

fix stream code style
new-scheduler
Yoshua Wuyts 5 years ago committed by GitHub
commit d1189f9974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -14,6 +14,17 @@ pub struct AllFuture<'a, S, F, T> {
pub(crate) _marker: PhantomData<T>, pub(crate) _marker: PhantomData<T>,
} }
impl<'a, S, F, T> AllFuture<'a, S, F, T> {
pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
Self {
stream,
f,
result: true, // the default if the empty stream
_marker: PhantomData,
}
}
}
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {} impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
impl<S, F> Future for AllFuture<'_, S, F, S::Item> impl<S, F> Future for AllFuture<'_, S, F, S::Item>

@ -14,6 +14,17 @@ pub struct AnyFuture<'a, S, F, T> {
pub(crate) _marker: PhantomData<T>, pub(crate) _marker: PhantomData<T>,
} }
impl<'a, S, F, T> AnyFuture<'a, S, F, T> {
pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
Self {
stream,
f,
result: false, // the default if the empty stream
_marker: PhantomData,
}
}
}
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {} impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
impl<S, F> Future for AnyFuture<'_, S, F, S::Item> impl<S, F> Future for AnyFuture<'_, S, F, S::Item>

@ -25,7 +25,7 @@ pin_project! {
impl<S: Stream, U: Stream> Chain<S, U> { impl<S: Stream, U: Stream> Chain<S, U> {
pub(super) fn new(first: S, second: U) -> Self { pub(super) fn new(first: S, second: U) -> Self {
Chain { Self {
first: first.fuse(), first: first.fuse(),
second: second.fuse(), second: second.fuse(),
} }

@ -26,7 +26,7 @@ pin_project! {
impl<L: Stream, R: Stream> CmpFuture<L, R> { impl<L: Stream, R: Stream> CmpFuture<L, R> {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
CmpFuture { Self {
l: l.fuse(), l: l.fuse(),
r: r.fuse(), r: r.fuse(),
l_cache: None, l_cache: None,

@ -14,7 +14,7 @@ pin_project! {
impl<S> Copied<S> { impl<S> Copied<S> {
pub(super) fn new(stream: S) -> Self { pub(super) fn new(stream: S) -> Self {
Copied { stream } Self { stream }
} }
} }

@ -20,7 +20,7 @@ pin_project! {
impl<S> CountFuture<S> { impl<S> CountFuture<S> {
pub(crate) fn new(stream: S) -> Self { pub(crate) fn new(stream: S) -> Self {
CountFuture { stream, count: 0 } Self { stream, count: 0 }
} }
} }

@ -15,8 +15,8 @@ impl<S> Cycle<S>
where where
S: Stream + Clone, S: Stream + Clone,
{ {
pub fn new(source: S) -> Cycle<S> { pub(crate) fn new(source: S) -> Self {
Cycle { Self {
orig: source.clone(), orig: source.clone(),
source: ManuallyDrop::new(source), source: ManuallyDrop::new(source),
} }

@ -16,7 +16,7 @@ pin_project! {
impl<S> Enumerate<S> { impl<S> Enumerate<S> {
pub(super) fn new(stream: S) -> Self { pub(super) fn new(stream: S) -> Self {
Enumerate { stream, i: 0 } Self { stream, i: 0 }
} }
} }

@ -26,7 +26,7 @@ where
L::Item: PartialEq<R::Item>, L::Item: PartialEq<R::Item>,
{ {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
EqFuture { Self {
l: l.fuse(), l: l.fuse(),
r: r.fuse(), r: r.fuse(),
} }

@ -23,7 +23,7 @@ pin_project! {
impl<S, P> Filter<S, P> { impl<S, P> Filter<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self { pub(super) fn new(stream: S, predicate: P) -> Self {
Filter { Self {
stream, stream,
predicate, predicate,
} }

@ -16,7 +16,7 @@ pin_project! {
impl<S, F> FilterMap<S, F> { impl<S, F> FilterMap<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self { pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap { stream, f } Self { stream, f }
} }
} }

@ -13,7 +13,7 @@ pub struct FindFuture<'a, S, P> {
impl<'a, S, P> FindFuture<'a, S, P> { impl<'a, S, P> FindFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, p: P) -> Self { pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture { stream, p } Self { stream, p }
} }
} }

@ -13,7 +13,7 @@ pub struct FindMapFuture<'a, S, F> {
impl<'a, S, F> FindMapFuture<'a, S, F> { impl<'a, S, F> FindMapFuture<'a, S, F> {
pub(super) fn new(stream: &'a mut S, f: F) -> Self { pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture { stream, f } Self { stream, f }
} }
} }

@ -30,8 +30,8 @@ where
U: IntoStream, U: IntoStream,
F: FnMut(S::Item) -> U, F: FnMut(S::Item) -> U,
{ {
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, F> { pub(super) fn new(stream: S, f: F) -> Self {
FlatMap { Self {
stream: stream.map(f), stream: stream.map(f),
inner_stream: None, inner_stream: None,
} }

@ -32,8 +32,8 @@ where
S: Stream, S: Stream,
S::Item: IntoStream, S::Item: IntoStream,
{ {
pub(super) fn new(stream: S) -> Flatten<S> { pub(super) fn new(stream: S) -> Self {
Flatten { Self {
stream, stream,
inner_stream: None, inner_stream: None,
} }

@ -18,7 +18,7 @@ pin_project! {
impl<S, F, B> FoldFuture<S, F, B> { impl<S, F, B> FoldFuture<S, F, B> {
pub(super) fn new(stream: S, init: B, f: F) -> Self { pub(super) fn new(stream: S, init: B, f: F) -> Self {
FoldFuture { Self {
stream, stream,
f, f,
acc: Some(init), acc: Some(init),

@ -18,7 +18,7 @@ pin_project! {
impl<S, F> ForEachFuture<S, F> { impl<S, F> ForEachFuture<S, F> {
pub(super) fn new(stream: S, f: F) -> Self { pub(super) fn new(stream: S, f: F) -> Self {
ForEachFuture { Self {
stream, stream,
f, f,
} }

@ -21,6 +21,15 @@ pin_project! {
} }
} }
impl<S> Fuse<S> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
done: false,
}
}
}
impl<S: Stream> Stream for Fuse<S> { impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item; type Item = S::Item;

@ -25,7 +25,7 @@ where
L::Item: PartialOrd<R::Item>, L::Item: PartialOrd<R::Item>,
{ {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
GeFuture { Self {
partial_cmp: l.partial_cmp(r), partial_cmp: l.partial_cmp(r),
} }
} }

@ -25,7 +25,7 @@ where
L::Item: PartialOrd<R::Item>, L::Item: PartialOrd<R::Item>,
{ {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
GtFuture { Self {
partial_cmp: l.partial_cmp(r), partial_cmp: l.partial_cmp(r),
} }
} }

@ -23,7 +23,7 @@ pin_project! {
impl<S, F> Inspect<S, F> { impl<S, F> Inspect<S, F> {
pub(super) fn new(stream: S, f: F) -> Self { pub(super) fn new(stream: S, f: F) -> Self {
Inspect { Self {
stream, stream,
f, f,
} }

@ -18,7 +18,7 @@ pin_project! {
impl<S, T> LastFuture<S, T> { impl<S, T> LastFuture<S, T> {
pub(crate) fn new(stream: S) -> Self { pub(crate) fn new(stream: S) -> Self {
LastFuture { stream, last: None } Self { stream, last: None }
} }
} }

@ -25,7 +25,7 @@ where
L::Item: PartialOrd<R::Item>, L::Item: PartialOrd<R::Item>,
{ {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
LeFuture { Self {
partial_cmp: l.partial_cmp(r), partial_cmp: l.partial_cmp(r),
} }
} }

@ -25,7 +25,7 @@ where
L::Item: PartialOrd<R::Item>, L::Item: PartialOrd<R::Item>,
{ {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
LtFuture { Self {
partial_cmp: l.partial_cmp(r), partial_cmp: l.partial_cmp(r),
} }
} }

@ -17,7 +17,7 @@ pin_project! {
impl<S, F> Map<S, F> { impl<S, F> Map<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self { pub(crate) fn new(stream: S, f: F) -> Self {
Map { Self {
stream, stream,
f, f,
} }

@ -20,7 +20,7 @@ pin_project! {
impl<S, F, T> MaxByFuture<S, F, T> { impl<S, F, T> MaxByFuture<S, F, T> {
pub(super) fn new(stream: S, compare: F) -> Self { pub(super) fn new(stream: S, compare: F) -> Self {
MaxByFuture { Self {
stream, stream,
compare, compare,
max: None, max: None,

@ -20,7 +20,7 @@ pin_project! {
impl<S, F, T> MinByFuture<S, F, T> { impl<S, F, T> MinByFuture<S, F, T> {
pub(super) fn new(stream: S, compare: F) -> Self { pub(super) fn new(stream: S, compare: F) -> Self {
MinByFuture { Self {
stream, stream,
compare, compare,
min: None, min: None,

@ -20,7 +20,7 @@ pin_project! {
impl<S, T, K> MinByKeyFuture<S, T, K> { impl<S, T, K> MinByKeyFuture<S, T, K> {
pub(super) fn new(stream: S, key_by: K) -> Self { pub(super) fn new(stream: S, key_by: K) -> Self {
MinByKeyFuture { Self {
stream, stream,
min: None, min: None,
key_by, key_by,

@ -111,7 +111,6 @@ pub use take_while::TakeWhile;
pub use zip::Zip; pub use zip::Zip;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::marker::PhantomData;
cfg_unstable! { cfg_unstable! {
use std::future::Future; use std::future::Future;
@ -288,10 +287,7 @@ extension_trait! {
where where
Self: Sized, Self: Sized,
{ {
Take { Take::new(self, n)
stream: self,
remaining: n,
}
} }
#[doc = r#" #[doc = r#"
@ -714,10 +710,7 @@ extension_trait! {
where where
Self: Sized, Self: Sized,
{ {
Fuse { Fuse::new(self)
stream: self,
done: false,
}
} }
#[doc = r#" #[doc = r#"
@ -1193,12 +1186,7 @@ extension_trait! {
Self: Unpin + Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool, F: FnMut(Self::Item) -> bool,
{ {
AllFuture { AllFuture::new(self, f)
stream: self,
result: true, // the default if the empty stream
_marker: PhantomData,
f,
}
} }
#[doc = r#" #[doc = r#"
@ -1438,12 +1426,7 @@ extension_trait! {
Self: Unpin + Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool, F: FnMut(Self::Item) -> bool,
{ {
AnyFuture { AnyFuture::new(self, f)
stream: self,
result: false, // the default if the empty stream
_marker: PhantomData,
f,
}
} }
#[doc = r#" #[doc = r#"
@ -1468,7 +1451,7 @@ extension_trait! {
assert_eq!(sum, 6); assert_eq!(sum, 6);
// if we try to use stream again, it won't work. The following line // if we try to use stream again, it won't work. The following line
// gives "error: use of moved value: `stream` // gives error: use of moved value: `stream`
// assert_eq!(stream.next(), None); // assert_eq!(stream.next(), None);
// let's try that again // let's try that again

@ -15,7 +15,7 @@ impl<S: Unpin> Unpin for NthFuture<'_, S> {}
impl<'a, S> NthFuture<'a, S> { impl<'a, S> NthFuture<'a, S> {
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthFuture { stream, n } Self { stream, n }
} }
} }

@ -26,7 +26,7 @@ pin_project! {
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> { impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
pub(super) fn new(l: L, r: R) -> Self { pub(super) fn new(l: L, r: R) -> Self {
PartialCmpFuture { Self {
l: l.fuse(), l: l.fuse(),
r: r.fuse(), r: r.fuse(),
l_cache: None, l_cache: None,

@ -16,7 +16,7 @@ impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {}
impl<'a, S, P> PositionFuture<'a, S, P> { impl<'a, S, P> PositionFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, predicate: P) -> Self { pub(super) fn new(stream: &'a mut S, predicate: P) -> Self {
PositionFuture { Self {
stream, stream,
predicate, predicate,
index: 0, index: 0,

@ -23,7 +23,7 @@ pin_project! {
impl<S> Skip<S> { impl<S> Skip<S> {
pub(crate) fn new(stream: S, n: usize) -> Self { pub(crate) fn new(stream: S, n: usize) -> Self {
Skip { stream, n } Self { stream, n }
} }
} }

@ -23,7 +23,7 @@ pin_project! {
impl<S, P> SkipWhile<S, P> { impl<S, P> SkipWhile<S, P> {
pub(crate) fn new(stream: S, predicate: P) -> Self { pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile { Self {
stream, stream,
predicate: Some(predicate), predicate: Some(predicate),
} }

@ -24,7 +24,7 @@ pin_project! {
impl<S> StepBy<S> { impl<S> StepBy<S> {
pub(crate) fn new(stream: S, step: usize) -> Self { pub(crate) fn new(stream: S, step: usize) -> Self {
StepBy { Self {
stream, stream,
step: step.checked_sub(1).unwrap(), step: step.checked_sub(1).unwrap(),
i: 0, i: 0,

@ -21,6 +21,15 @@ pin_project! {
} }
} }
impl<S> Take<S> {
pub(super) fn new(stream: S, remaining: usize) -> Self {
Self {
stream,
remaining,
}
}
}
impl<S: Stream> Stream for Take<S> { impl<S: Stream> Stream for Take<S> {
type Item = S::Item; type Item = S::Item;

@ -23,7 +23,7 @@ pin_project! {
impl<S, P> TakeWhile<S, P> { impl<S, P> TakeWhile<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self { pub(super) fn new(stream: S, predicate: P) -> Self {
TakeWhile { Self {
stream, stream,
predicate, predicate,
} }

@ -31,7 +31,7 @@ pin_project! {
impl<S: Stream> Throttle<S> { impl<S: Stream> Throttle<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self { pub(super) fn new(stream: S, duration: Duration) -> Self {
Throttle { Self {
stream, stream,
duration, duration,
blocked: false, blocked: false,

@ -22,10 +22,10 @@ pin_project! {
} }
impl<S: Stream> Timeout<S> { impl<S: Stream> Timeout<S> {
pub(crate) fn new(stream: S, dur: Duration) -> Timeout<S> { pub(crate) fn new(stream: S, dur: Duration) -> Self {
let delay = Delay::new(dur); let delay = Delay::new(dur);
Timeout { stream, delay } Self { stream, delay }
} }
} }

@ -16,7 +16,7 @@ impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {}
impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> { impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> {
pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self { pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self {
TryFoldFuture { Self {
stream, stream,
f, f,
acc: Some(init), acc: Some(init),

@ -15,7 +15,7 @@ impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {}
impl<'a, S, F> TryForEachFuture<'a, S, F> { impl<'a, S, F> TryForEachFuture<'a, S, F> {
pub(crate) fn new(stream: &'a mut S, f: F) -> Self { pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
TryForEachFuture { stream, f } Self { stream, f }
} }
} }

@ -34,7 +34,7 @@ impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
impl<A: Stream, B> Zip<A, B> { impl<A: Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self { pub(crate) fn new(first: A, second: B) -> Self {
Zip { Self {
item_slot: None, item_slot: None,
first, first,
second, second,

Loading…
Cancel
Save