From 271b6f4a1c6a8096ba68682430a984001de19d56 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 25 Oct 2019 23:58:08 +0900 Subject: [PATCH] fix: Using pin_project! --- src/stream/stream/flatten.rs | 85 +++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index b970087..d44fab0 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,18 +1,22 @@ use std::pin::Pin; +use pin_project_lite::pin_project; use crate::prelude::*; use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; -/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`flat_map`]: trait.Stream.html#method.flat_map -/// [`Stream`]: trait.Stream.html -#[allow(missing_debug_implementations)] -pub struct FlatMap { - inner: FlattenCompat, U>, +pin_project! { + /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flat_map`]: trait.Stream.html#method.flat_map + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct FlatMap { + #[pin] + inner: FlattenCompat, U>, + } } impl FlatMap @@ -21,7 +25,6 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pin_utils::unsafe_pinned!(inner: FlattenCompat, U>); pub fn new(stream: S, f: F) -> FlatMap { FlatMap { @@ -33,33 +36,33 @@ where impl Stream for FlatMap where S: Stream> + std::marker::Unpin, - S::Item: std::marker::Unpin, U: Stream + std::marker::Unpin, - F: FnMut(S::Item) -> U + std::marker::Unpin, + F: FnMut(S::Item) -> U, { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().inner().poll_next(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`flatten`]: trait.Stream.html#method.flatten -/// [`Stream`]: trait.Stream.html -#[allow(missing_debug_implementations)] -pub struct Flatten -where - S::Item: IntoStream, -{ - inner: FlattenCompat::IntoStream>, +pin_project!{ + /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flatten`]: trait.Stream.html#method.flatten + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct Flatten + where + S::Item: IntoStream, + { + #[pin] + inner: FlattenCompat::IntoStream>, + } } impl> Flatten { - pin_utils::unsafe_pinned!(inner: FlattenCompat::IntoStream>); - pub fn new(stream: S) -> Flatten { Flatten { inner: FlattenCompat::new(stream) } } @@ -72,24 +75,23 @@ where { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().inner().poll_next(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -/// Real logic of both `Flatten` and `FlatMap` which simply delegate to -/// this type. -#[derive(Clone, Debug)] -struct FlattenCompat { - stream: S, - frontiter: Option, +pin_project! { + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to + /// this type. + #[derive(Clone, Debug)] + struct FlattenCompat { + stream: S, + frontiter: Option, + } } impl FlattenCompat { - pin_utils::unsafe_unpinned!(stream: S); - pin_utils::unsafe_unpinned!(frontiter: Option); - /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. pub fn new(stream: S) -> FlattenCompat { FlattenCompat { @@ -106,17 +108,18 @@ where { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); loop { - if let Some(ref mut inner) = self.as_mut().frontiter() { + if let Some(inner) = this.frontiter { if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { return Poll::Ready(item); } } - match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) { + match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) { None => return Poll::Ready(None), - Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()), + Some(inner) => *this.frontiter = Some(inner.into_stream()), } } }