fix: Using pin_project!

yoshuawuyts-patch-1
k-nasa 5 years ago
parent 3297a0f327
commit 271b6f4a1c

@ -1,18 +1,22 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::prelude::*;
use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S: Stream, U: IntoStream, F> {
inner: FlattenCompat<Map<S, F, S::Item, U>, U>,
pin_project! {
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S: Stream, U: IntoStream, F> {
#[pin]
inner: FlattenCompat<Map<S, F, S::Item, U>, U>,
}
}
impl<S, U, F> FlatMap<S, U, F>
@ -21,7 +25,6 @@ where
U: IntoStream,
F: FnMut(S::Item) -> U,
{
pin_utils::unsafe_pinned!(inner: FlattenCompat<Map<S, F, S::Item, U>, U>);
pub fn new(stream: S, f: F) -> FlatMap<S, U, F> {
FlatMap {
@ -33,33 +36,33 @@ where
impl<S, U, F> Stream for FlatMap<S, U, F>
where
S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin,
S::Item: std::marker::Unpin,
U: Stream + std::marker::Unpin,
F: FnMut(S::Item) -> U + std::marker::Unpin,
F: FnMut(S::Item) -> U,
{
type Item = U::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().inner().poll_next(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S: Stream>
where
S::Item: IntoStream,
{
inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>,
pin_project!{
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S: Stream>
where
S::Item: IntoStream,
{
#[pin]
inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>,
}
}
impl<S: Stream<Item: IntoStream>> Flatten<S> {
pin_utils::unsafe_pinned!(inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>);
pub fn new(stream: S) -> Flatten<S> {
Flatten { inner: FlattenCompat::new(stream) }
}
@ -72,24 +75,23 @@ where
{
type Item = U::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().inner().poll_next(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
/// this type.
#[derive(Clone, Debug)]
struct FlattenCompat<S, U> {
stream: S,
frontiter: Option<U>,
pin_project! {
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
/// this type.
#[derive(Clone, Debug)]
struct FlattenCompat<S, U> {
stream: S,
frontiter: Option<U>,
}
}
impl<S, U> FlattenCompat<S, U> {
pin_utils::unsafe_unpinned!(stream: S);
pin_utils::unsafe_unpinned!(frontiter: Option<U>);
/// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`.
pub fn new(stream: S) -> FlattenCompat<S, U> {
FlattenCompat {
@ -106,17 +108,18 @@ where
{
type Item = U::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(ref mut inner) = self.as_mut().frontiter() {
if let Some(inner) = this.frontiter {
if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) {
return Poll::Ready(item);
}
}
match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) {
match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()),
Some(inner) => *this.frontiter = Some(inner.into_stream()),
}
}
}

Loading…
Cancel
Save