housekeeping after 145

staging
Fedor Sakharov 5 years ago
parent 2ecaf1811b
commit 18428d6bfe
No known key found for this signature in database
GPG Key ID: 93D436E666BF0FEE

@ -2,8 +2,10 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A stream that both filters and maps.
#[derive(Clone, Debug)]
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> {
stream: S,
f: F,
@ -27,7 +29,7 @@ impl<S, F, T, B> FilterMap<S, F, T, B> {
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
where
S: futures_core::stream::Stream,
S: Stream,
F: FnMut(S::Item) -> Option<B>,
{
type Item = B;

@ -1,7 +1,10 @@
use crate::task::{Context, Poll};
use std::marker::PhantomData;
use std::pin::Pin;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> {
@ -11,9 +14,6 @@ pub struct FindFuture<'a, S, P, T> {
}
impl<'a, S, P, T> FindFuture<'a, S, P, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(p: P);
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture {
stream,
@ -23,20 +23,20 @@ impl<'a, S, P, T> FindFuture<'a, S, P, T> {
}
}
impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item>
impl<S: Unpin, P, T> Unpin for FindFuture<'_, S, P, T> {}
impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
match item {
Some(v) => match (self.as_mut().p())(&v) {
Some(v) => match (&mut self.p)(&v) {
true => Poll::Ready(Some(v)),
false => {
cx.waker().wake_by_ref();

@ -2,6 +2,10 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::future::Future;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindMapFuture<'a, S, F, T, B> {
stream: &'a mut S,
@ -11,9 +15,6 @@ pub struct FindMapFuture<'a, S, F, T, B> {
}
impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture {
stream,
@ -24,20 +25,20 @@ impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
}
}
impl<'a, S, B, F> futures_core::future::Future for FindMapFuture<'a, S, F, S::Item, B>
impl<S: Unpin, F, T, B> Unpin for FindMapFuture<'_, S, F, T, B> {}
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> Option<B>,
{
type Output = Option<B>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
match item {
Some(v) => match (self.as_mut().f())(v) {
Some(v) => match (&mut self.f)(v) {
Some(v) => Poll::Ready(Some(v)),
None => {
cx.waker().wake_by_ref();

@ -1,36 +1,37 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::future::Future;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NthFuture<'a, S> {
stream: &'a mut S,
n: usize,
}
impl<'a, S> NthFuture<'a, S> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(n: usize);
impl<S: Unpin> Unpin for NthFuture<'_, S> {}
impl<'a, S> NthFuture<'a, S> {
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthFuture { stream, n }
}
}
impl<'a, S> futures_core::future::Future for NthFuture<'a, S>
impl<'a, S> Future for NthFuture<'a, S>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
match next {
Some(v) => match self.n {
0 => Poll::Ready(Some(v)),
_ => {
*self.as_mut().n() -= 1;
self.n -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}

Loading…
Cancel
Save