diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index b65d88db..f63d52b8 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -2,20 +2,20 @@ use std::cmp::Ordering; use std::pin::Pin; use crate::future::Future; -use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A future that yields the minimum item in a stream by a given comparison function. -#[derive(Clone, Debug)] -pub struct MinByFuture { +#[allow(missing_debug_implementations)] +pub struct MinByFuture { stream: S, compare: F, - min: Option, + min: Option, } -impl Unpin for MinByFuture {} +impl MinByFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(compare: F); + pin_utils::unsafe_unpinned!(min: Option); -impl MinByFuture { pub(super) fn new(stream: S, compare: F) -> Self { MinByFuture { stream, @@ -25,25 +25,25 @@ impl MinByFuture { } } -impl Future for MinByFuture +impl Future for MinByFuture where - S: futures_core::stream::Stream + Unpin, + S: futures_core::stream::Stream + Unpin + Sized, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(new) => { cx.waker().wake_by_ref(); - match self.as_mut().min.take() { - None => self.as_mut().min = Some(new), - Some(old) => match (&mut self.as_mut().compare)(&new, &old) { - Ordering::Less => self.as_mut().min = Some(new), - _ => self.as_mut().min = Some(old), + match self.as_mut().min().take() { + None => *self.as_mut().min() = Some(new), + Some(old) => match (&mut self.as_mut().compare())(&new, &old) { + Ordering::Less => *self.as_mut().min() = Some(new), + _ => *self.as_mut().min() = Some(old), }, } Poll::Pending diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 91b111e2..4e1e4168 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -153,9 +153,9 @@ pub trait Stream { /// # /// # }) } /// ``` - fn min_by(self, compare: F) -> MinByFuture + fn min_by(self, compare: F) -> MinByFuture where - Self: Sized + Unpin, + Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, { MinByFuture::new(self, compare)