diff --git a/src/stream/successors.rs b/src/stream/successors.rs index e7084165..4186c667 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,25 +1,31 @@ use std::marker::PhantomData; use std::pin::Pin; +use std::mem; use crate::future::Future; use crate::stream::Stream; -use crate::task::{Context, Poll}; +use crate::task::{Context, Poll, ready}; -/// A stream that yields elements by calling an async closure with the previous value as an -/// argument -/// -/// This stream is constructed by [`successor`] function -/// -/// [`successor`]: fn.successor.html -#[derive(Debug)] -pub struct Successors -where - Fut: Future>, -{ - successor: F, - future: Option, - next: Option, - _marker: PhantomData, + + +pin_project_lite::pin_project! { + /// A stream that yields elements by calling an async closure with the previous value as an + /// argument + /// + /// This stream is constructed by [`successor`] function + /// + /// [`successor`]: fn.successor.html + #[derive(Debug)] + pub struct Successors + where + Fut: Future>, + { + successor: F, + #[pin] + future: Option, + slot: Option, + _marker: PhantomData, + } } /// Creates a new stream where to produce each new element a closure is called with the previous @@ -40,6 +46,7 @@ where /// }); /// /// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(22)); /// assert_eq!(s.next().await, Some(23)); /// assert_eq!(s.next().await, Some(24)); /// assert_eq!(s.next().await, Some(25)); @@ -58,31 +65,20 @@ where /// # }) } /// /// ``` -pub fn successors(start: Option, func: F) -> Successors +pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(T) -> Fut, Fut: Future>, T: Copy, { Successors { - successor: func, + successor: succ, future: None, - next: start, + slot: first, _marker: PhantomData, } } -impl Successors -where - F: FnMut(T) -> Fut, - Fut: Future>, - T: Copy, -{ - pin_utils::unsafe_unpinned!(successor: F); - pin_utils::unsafe_unpinned!(next: Option); - pin_utils::unsafe_pinned!(future: Option); -} - impl Stream for Successors where Fut: Future>, @@ -91,23 +87,23 @@ where { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.next.is_none() { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if this.slot.is_none() { return Poll::Ready(None); } - match &self.future { - None => { - let x = self.next.unwrap(); - let fut = (self.as_mut().successor())(x); - self.as_mut().future().set(Some(fut)); - } - _ => {} + if this.future.is_none() { + let x = this.slot.unwrap(); + let fut = (this.successor)(x); + this.future.set(Some(fut)); } - let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - *self.as_mut().next() = next; - self.as_mut().future().set(None); + let mut next = ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + + this.future.set(None); + mem::swap(this.slot, &mut next); Poll::Ready(next) } }