diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 626a8ece..2f275156 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -2,8 +2,10 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -/// A stream that both filters and maps. -#[derive(Clone, Debug)] +use crate::stream::Stream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] pub struct FilterMap { stream: S, f: F, @@ -27,7 +29,7 @@ impl FilterMap { impl futures_core::stream::Stream for FilterMap where - S: futures_core::stream::Stream, + S: Stream, F: FnMut(S::Item) -> Option, { type Item = B; diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs index dfab0893..014c8b78 100644 --- a/src/stream/stream/find.rs +++ b/src/stream/stream/find.rs @@ -1,7 +1,10 @@ -use crate::task::{Context, Poll}; use std::marker::PhantomData; use std::pin::Pin; +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + #[doc(hidden)] #[allow(missing_debug_implementations)] pub struct FindFuture<'a, S, P, T> { @@ -11,9 +14,6 @@ pub struct FindFuture<'a, S, P, T> { } impl<'a, S, P, T> FindFuture<'a, S, P, T> { - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(p: P); - pub(super) fn new(stream: &'a mut S, p: P) -> Self { FindFuture { stream, @@ -23,20 +23,20 @@ impl<'a, S, P, T> FindFuture<'a, S, P, T> { } } -impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item> +impl Unpin for FindFuture<'_, S, P, T> {} + +impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, P: FnMut(&S::Item) -> bool, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - - let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); match item { - Some(v) => match (self.as_mut().p())(&v) { + Some(v) => match (&mut self.p)(&v) { true => Poll::Ready(Some(v)), false => { cx.waker().wake_by_ref(); diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index dcc29d88..dfcf92d6 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -2,6 +2,10 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::future::Future; +use crate::stream::Stream; + +#[doc(hidden)] #[allow(missing_debug_implementations)] pub struct FindMapFuture<'a, S, F, T, B> { stream: &'a mut S, @@ -11,9 +15,6 @@ pub struct FindMapFuture<'a, S, F, T, B> { } impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(f: F); - pub(super) fn new(stream: &'a mut S, f: F) -> Self { FindMapFuture { stream, @@ -24,20 +25,20 @@ impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { } } -impl<'a, S, B, F> futures_core::future::Future for FindMapFuture<'a, S, F, S::Item, B> +impl Unpin for FindMapFuture<'_, S, F, T, B> {} + +impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> Option, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - - let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); match item { - Some(v) => match (self.as_mut().f())(v) { + Some(v) => match (&mut self.f)(v) { Some(v) => Poll::Ready(Some(v)), None => { cx.waker().wake_by_ref(); diff --git a/src/stream/stream/nth.rs b/src/stream/stream/nth.rs index 169ded5a..e7e042a9 100644 --- a/src/stream/stream/nth.rs +++ b/src/stream/stream/nth.rs @@ -1,36 +1,37 @@ use std::pin::Pin; use std::task::{Context, Poll}; +use crate::future::Future; +use crate::stream::Stream; + +#[doc(hidden)] #[allow(missing_debug_implementations)] pub struct NthFuture<'a, S> { stream: &'a mut S, n: usize, } -impl<'a, S> NthFuture<'a, S> { - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(n: usize); +impl Unpin for NthFuture<'_, S> {} +impl<'a, S> NthFuture<'a, S> { pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { NthFuture { stream, n } } } -impl<'a, S> futures_core::future::Future for NthFuture<'a, S> +impl<'a, S> Future for NthFuture<'a, S> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); match next { Some(v) => match self.n { 0 => Poll::Ready(Some(v)), _ => { - *self.as_mut().n() -= 1; + self.n -= 1; cx.waker().wake_by_ref(); Poll::Pending }