184: housekeeping after 145 r=yoshuawuyts a=montekki

Now that #145 is merged combinators can follow.
1. All combinators taking `&mut self` should imply `Self: Pin`.
2. Trait bounds are to `Stream`
3. Cleans up docs and `Debug` derives.

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
pull/194/head
bors[bot] 5 years ago committed by GitHub
commit 3054509fd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,8 +2,10 @@ use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
/// A stream that both filters and maps. use crate::stream::Stream;
#[derive(Clone, Debug)]
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> { pub struct FilterMap<S, F, T, B> {
stream: S, stream: S,
f: F, f: F,
@ -27,7 +29,7 @@ impl<S, F, T, B> FilterMap<S, F, T, B> {
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B> impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
where where
S: futures_core::stream::Stream, S: Stream,
F: FnMut(S::Item) -> Option<B>, F: FnMut(S::Item) -> Option<B>,
{ {
type Item = B; type Item = B;

@ -1,7 +1,10 @@
use crate::task::{Context, Poll};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> { pub struct FindFuture<'a, S, P, T> {
@ -11,9 +14,6 @@ pub struct FindFuture<'a, S, P, T> {
} }
impl<'a, S, P, T> FindFuture<'a, S, P, T> { impl<'a, S, P, T> FindFuture<'a, S, P, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(p: P);
pub(super) fn new(stream: &'a mut S, p: P) -> Self { pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture { FindFuture {
stream, stream,
@ -23,20 +23,20 @@ impl<'a, S, P, T> FindFuture<'a, S, P, T> {
} }
} }
impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item> impl<S: Unpin, P, T> Unpin for FindFuture<'_, S, P, T> {}
impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item>
where where
S: futures_core::stream::Stream + Unpin + Sized, S: Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,
{ {
type Output = Option<S::Item>; type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream; let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match item { match item {
Some(v) => match (self.as_mut().p())(&v) { Some(v) => match (&mut self.p)(&v) {
true => Poll::Ready(Some(v)), true => Poll::Ready(Some(v)),
false => { false => {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();

@ -2,6 +2,10 @@ use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::future::Future;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct FindMapFuture<'a, S, F, T, B> { pub struct FindMapFuture<'a, S, F, T, B> {
stream: &'a mut S, stream: &'a mut S,
@ -11,9 +15,6 @@ pub struct FindMapFuture<'a, S, F, T, B> {
} }
impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: &'a mut S, f: F) -> Self { pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture { FindMapFuture {
stream, stream,
@ -24,20 +25,20 @@ impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
} }
} }
impl<'a, S, B, F> futures_core::future::Future for FindMapFuture<'a, S, F, S::Item, B> impl<S: Unpin, F, T, B> Unpin for FindMapFuture<'_, S, F, T, B> {}
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B>
where where
S: futures_core::stream::Stream + Unpin + Sized, S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> Option<B>, F: FnMut(S::Item) -> Option<B>,
{ {
type Output = Option<B>; type Output = Option<B>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream; let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match item { match item {
Some(v) => match (self.as_mut().f())(v) { Some(v) => match (&mut self.f)(v) {
Some(v) => Poll::Ready(Some(v)), Some(v) => Poll::Ready(Some(v)),
None => { None => {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();

@ -1,36 +1,37 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::future::Future;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct NthFuture<'a, S> { pub struct NthFuture<'a, S> {
stream: &'a mut S, stream: &'a mut S,
n: usize, n: usize,
} }
impl<'a, S> NthFuture<'a, S> { impl<S: Unpin> Unpin for NthFuture<'_, S> {}
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(n: usize);
impl<'a, S> NthFuture<'a, S> {
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthFuture { stream, n } NthFuture { stream, n }
} }
} }
impl<'a, S> futures_core::future::Future for NthFuture<'a, S> impl<'a, S> Future for NthFuture<'a, S>
where where
S: futures_core::stream::Stream + Unpin + Sized, S: Stream + Unpin + Sized,
{ {
type Output = Option<S::Item>; type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream; let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next { match next {
Some(v) => match self.n { Some(v) => match self.n {
0 => Poll::Ready(Some(v)), 0 => Poll::Ready(Some(v)),
_ => { _ => {
*self.as_mut().n() -= 1; self.n -= 1;
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
Poll::Pending Poll::Pending
} }

Loading…
Cancel
Save