forked from mirror/async-std
Merge branch 'master' into fs-stream-enumerate
This commit is contained in:
commit
9b3658244d
15 changed files with 267 additions and 71 deletions
|
@ -115,7 +115,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||||
Event::Message { from, to, msg } => {
|
Event::Message { from, to, msg } => {
|
||||||
for addr in to {
|
for addr in to {
|
||||||
if let Some(peer) = peers.get_mut(&addr) {
|
if let Some(peer) = peers.get_mut(&addr) {
|
||||||
peer.send(format!("from {}: {}\n", from, msg)).await?
|
let msg = format!("from {}: {}\n", from, msg);
|
||||||
|
peer.send(msg).await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,8 @@ Let's add waiting to the server:
|
||||||
# Event::Message { from, to, msg } => {
|
# Event::Message { from, to, msg } => {
|
||||||
# for addr in to {
|
# for addr in to {
|
||||||
# if let Some(peer) = peers.get_mut(&addr) {
|
# if let Some(peer) = peers.get_mut(&addr) {
|
||||||
# peer.send(format!("from {}: {}\n", from, msg)).await?
|
# let msg = format!("from {}: {}\n", from, msg);
|
||||||
|
# peer.send(msg).await?
|
||||||
# }
|
# }
|
||||||
# }
|
# }
|
||||||
# }
|
# }
|
||||||
|
@ -217,7 +218,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||||
Event::Message { from, to, msg } => {
|
Event::Message { from, to, msg } => {
|
||||||
for addr in to {
|
for addr in to {
|
||||||
if let Some(peer) = peers.get_mut(&addr) {
|
if let Some(peer) = peers.get_mut(&addr) {
|
||||||
peer.send(format!("from {}: {}\n", from, msg)).await?
|
let msg = format!("from {}: {}\n", from, msg);
|
||||||
|
peer.send(msg).await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||||
Event::Message { from, to, msg } => { // 3
|
Event::Message { from, to, msg } => { // 3
|
||||||
for addr in to {
|
for addr in to {
|
||||||
if let Some(peer) = peers.get_mut(&addr) {
|
if let Some(peer) = peers.get_mut(&addr) {
|
||||||
peer.send(format!("from {}: {}\n", from, msg)).await?
|
let msg = format!("from {}: {}\n", from, msg);
|
||||||
|
peer.send(msg).await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -257,7 +257,8 @@ async fn broker(events: Receiver<Event>) {
|
||||||
Event::Message { from, to, msg } => {
|
Event::Message { from, to, msg } => {
|
||||||
for addr in to {
|
for addr in to {
|
||||||
if let Some(peer) = peers.get_mut(&addr) {
|
if let Some(peer) = peers.get_mut(&addr) {
|
||||||
peer.send(format!("from {}: {}\n", from, msg)).await
|
let msg = format!("from {}: {}\n", from, msg);
|
||||||
|
peer.send(msg).await
|
||||||
.unwrap() // 6
|
.unwrap() // 6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,9 +139,8 @@ async fn broker_loop(mut events: Receiver<Event>) {
|
||||||
Event::Message { from, to, msg } => {
|
Event::Message { from, to, msg } => {
|
||||||
for addr in to {
|
for addr in to {
|
||||||
if let Some(peer) = peers.get_mut(&addr) {
|
if let Some(peer) = peers.get_mut(&addr) {
|
||||||
peer.send(format!("from {}: {}\n", from, msg))
|
let msg = format!("from {}: {}\n", from, msg);
|
||||||
.await
|
peer.send(msg).await.unwrap();
|
||||||
.unwrap()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ async fn get() -> io::Result<Vec<u8>> {
|
||||||
|
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
|
|
||||||
io::timeout(Duration::from_secs(5), async {
|
io::timeout(Duration::from_secs(5), async move {
|
||||||
stream.read_to_end(&mut buf).await?;
|
stream.read_to_end(&mut buf).await?;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
})
|
})
|
||||||
|
|
32
src/io/buf_read/fill_buf.rs
Normal file
32
src/io/buf_read/fill_buf.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use futures_io::AsyncBufRead;
|
||||||
|
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::io;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct FillBufFuture<'a, R: ?Sized> {
|
||||||
|
reader: &'a mut R,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, R: ?Sized> FillBufFuture<'a, R> {
|
||||||
|
pub(crate) fn new(reader: &'a mut R) -> Self {
|
||||||
|
Self { reader }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> {
|
||||||
|
type Output = io::Result<&'a [u8]>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'a [u8]>> {
|
||||||
|
let Self { reader } = &mut *self;
|
||||||
|
let result = Pin::new(reader).poll_fill_buf(cx);
|
||||||
|
// This is safe because:
|
||||||
|
// 1. The buffer is valid for the lifetime of the reader.
|
||||||
|
// 2. Output is unrelated to the wrapper (Self).
|
||||||
|
result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) })
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,9 @@
|
||||||
|
mod fill_buf;
|
||||||
mod lines;
|
mod lines;
|
||||||
mod read_line;
|
mod read_line;
|
||||||
mod read_until;
|
mod read_until;
|
||||||
|
|
||||||
|
use fill_buf::FillBufFuture;
|
||||||
pub use lines::Lines;
|
pub use lines::Lines;
|
||||||
use read_line::ReadLineFuture;
|
use read_line::ReadLineFuture;
|
||||||
use read_until::ReadUntilFuture;
|
use read_until::ReadUntilFuture;
|
||||||
|
@ -41,6 +43,26 @@ cfg_if! {
|
||||||
/// [`futures::io::AsyncBufRead`]:
|
/// [`futures::io::AsyncBufRead`]:
|
||||||
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
|
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
|
||||||
pub trait BufRead {
|
pub trait BufRead {
|
||||||
|
/// Returns the contents of the internal buffer, filling it with more data from the inner
|
||||||
|
/// reader if it is empty.
|
||||||
|
///
|
||||||
|
/// This function is a lower-level call. It needs to be paired with the [`consume`] method to
|
||||||
|
/// function properly. When calling this method, none of the contents will be "read" in the
|
||||||
|
/// sense that later calling `read` may return the same contents. As such, [`consume`] must be
|
||||||
|
/// called with the number of bytes that are consumed from this buffer to ensure that the bytes
|
||||||
|
/// are never returned twice.
|
||||||
|
///
|
||||||
|
/// [`consume`]: #tymethod.consume
|
||||||
|
///
|
||||||
|
/// An empty buffer returned indicates that the stream has reached EOF.
|
||||||
|
// TODO: write a proper doctest with `consume`
|
||||||
|
fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>)
|
||||||
|
where
|
||||||
|
Self: Unpin,
|
||||||
|
{
|
||||||
|
FillBufFuture::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
|
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
|
||||||
///
|
///
|
||||||
/// This function will read bytes from the underlying stream until the delimiter or EOF is
|
/// This function will read bytes from the underlying stream until the delimiter or EOF is
|
||||||
|
|
|
@ -1,43 +1,36 @@
|
||||||
use crate::future::Future;
|
|
||||||
use crate::task::{Context, Poll};
|
|
||||||
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
#[derive(Debug)]
|
use crate::future::Future;
|
||||||
pub struct AllFuture<'a, S, F, T>
|
use crate::stream::Stream;
|
||||||
where
|
use crate::task::{Context, Poll};
|
||||||
F: FnMut(T) -> bool,
|
|
||||||
{
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct AllFuture<'a, S, F, T> {
|
||||||
pub(crate) stream: &'a mut S,
|
pub(crate) stream: &'a mut S,
|
||||||
pub(crate) f: F,
|
pub(crate) f: F,
|
||||||
pub(crate) result: bool,
|
pub(crate) result: bool,
|
||||||
pub(crate) __item: PhantomData<T>,
|
pub(crate) _marker: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, S, F, T> AllFuture<'a, S, F, T>
|
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
|
||||||
where
|
|
||||||
F: FnMut(T) -> bool,
|
|
||||||
{
|
|
||||||
pin_utils::unsafe_pinned!(stream: &'a mut S);
|
|
||||||
pin_utils::unsafe_unpinned!(result: bool);
|
|
||||||
pin_utils::unsafe_unpinned!(f: F);
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
|
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
|
||||||
where
|
where
|
||||||
S: futures_core::stream::Stream + Unpin + Sized,
|
S: Stream + Unpin + Sized,
|
||||||
F: FnMut(S::Item) -> bool,
|
F: FnMut(S::Item) -> bool,
|
||||||
{
|
{
|
||||||
type Output = bool;
|
type Output = bool;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
use futures_core::stream::Stream;
|
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
|
||||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
|
||||||
match next {
|
match next {
|
||||||
Some(v) => {
|
Some(v) => {
|
||||||
let result = (self.as_mut().f())(v);
|
let result = (&mut self.f)(v);
|
||||||
*self.as_mut().result() = result;
|
self.result = result;
|
||||||
|
|
||||||
if result {
|
if result {
|
||||||
// don't forget to wake this task again to pull the next item from stream
|
// don't forget to wake this task again to pull the next item from stream
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
|
|
|
@ -1,43 +1,36 @@
|
||||||
use crate::future::Future;
|
|
||||||
use crate::task::{Context, Poll};
|
|
||||||
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
#[derive(Debug)]
|
use crate::future::Future;
|
||||||
pub struct AnyFuture<'a, S, F, T>
|
use crate::stream::Stream;
|
||||||
where
|
use crate::task::{Context, Poll};
|
||||||
F: FnMut(T) -> bool,
|
|
||||||
{
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct AnyFuture<'a, S, F, T> {
|
||||||
pub(crate) stream: &'a mut S,
|
pub(crate) stream: &'a mut S,
|
||||||
pub(crate) f: F,
|
pub(crate) f: F,
|
||||||
pub(crate) result: bool,
|
pub(crate) result: bool,
|
||||||
pub(crate) __item: PhantomData<T>,
|
pub(crate) _marker: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
|
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
|
||||||
where
|
|
||||||
F: FnMut(T) -> bool,
|
|
||||||
{
|
|
||||||
pin_utils::unsafe_pinned!(stream: &'a mut S);
|
|
||||||
pin_utils::unsafe_unpinned!(result: bool);
|
|
||||||
pin_utils::unsafe_unpinned!(f: F);
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
|
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
|
||||||
where
|
where
|
||||||
S: futures_core::stream::Stream + Unpin + Sized,
|
S: Stream + Unpin + Sized,
|
||||||
F: FnMut(S::Item) -> bool,
|
F: FnMut(S::Item) -> bool,
|
||||||
{
|
{
|
||||||
type Output = bool;
|
type Output = bool;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
use futures_core::stream::Stream;
|
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
|
||||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
|
||||||
match next {
|
match next {
|
||||||
Some(v) => {
|
Some(v) => {
|
||||||
let result = (self.as_mut().f())(v);
|
let result = (&mut self.f)(v);
|
||||||
*self.as_mut().result() = result;
|
self.result = result;
|
||||||
|
|
||||||
if result {
|
if result {
|
||||||
Poll::Ready(true)
|
Poll::Ready(true)
|
||||||
} else {
|
} else {
|
||||||
|
|
49
src/stream/stream/find.rs
Normal file
49
src/stream/stream/find.rs
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct FindFuture<'a, S, P, T> {
|
||||||
|
stream: &'a mut S,
|
||||||
|
p: P,
|
||||||
|
__t: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S, P, T> FindFuture<'a, S, P, T> {
|
||||||
|
pin_utils::unsafe_pinned!(stream: &'a mut S);
|
||||||
|
pin_utils::unsafe_unpinned!(p: P);
|
||||||
|
|
||||||
|
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
|
||||||
|
FindFuture {
|
||||||
|
stream,
|
||||||
|
p,
|
||||||
|
__t: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item>
|
||||||
|
where
|
||||||
|
S: futures_core::stream::Stream + Unpin + Sized,
|
||||||
|
P: FnMut(&S::Item) -> bool,
|
||||||
|
{
|
||||||
|
type Output = Option<S::Item>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
use futures_core::stream::Stream;
|
||||||
|
|
||||||
|
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||||
|
|
||||||
|
match item {
|
||||||
|
Some(v) => match (self.as_mut().p())(&v) {
|
||||||
|
true => Poll::Ready(Some(v)),
|
||||||
|
false => {
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => Poll::Ready(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,8 +2,10 @@ use std::cmp::Ordering;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct MinByFuture<S, F, T> {
|
pub struct MinByFuture<S, F, T> {
|
||||||
stream: S,
|
stream: S,
|
||||||
|
@ -27,7 +29,7 @@ impl<S, F, T> MinByFuture<S, F, T> {
|
||||||
|
|
||||||
impl<S, F> Future for MinByFuture<S, F, S::Item>
|
impl<S, F> Future for MinByFuture<S, F, S::Item>
|
||||||
where
|
where
|
||||||
S: futures_core::stream::Stream + Unpin + Sized,
|
S: Stream + Unpin + Sized,
|
||||||
S::Item: Copy,
|
S::Item: Copy,
|
||||||
F: FnMut(&S::Item, &S::Item) -> Ordering,
|
F: FnMut(&S::Item, &S::Item) -> Ordering,
|
||||||
{
|
{
|
||||||
|
|
|
@ -25,6 +25,7 @@ mod all;
|
||||||
mod any;
|
mod any;
|
||||||
mod enumerate;
|
mod enumerate;
|
||||||
mod filter_map;
|
mod filter_map;
|
||||||
|
mod find;
|
||||||
mod find_map;
|
mod find_map;
|
||||||
mod min_by;
|
mod min_by;
|
||||||
mod next;
|
mod next;
|
||||||
|
@ -37,6 +38,7 @@ use all::AllFuture;
|
||||||
use any::AnyFuture;
|
use any::AnyFuture;
|
||||||
use enumerate::Enumerate;
|
use enumerate::Enumerate;
|
||||||
use filter_map::FilterMap;
|
use filter_map::FilterMap;
|
||||||
|
use find::FindFuture;
|
||||||
use find_map::FindMapFuture;
|
use find_map::FindMapFuture;
|
||||||
use min_by::MinByFuture;
|
use min_by::MinByFuture;
|
||||||
use next::NextFuture;
|
use next::NextFuture;
|
||||||
|
@ -44,9 +46,12 @@ use nth::NthFuture;
|
||||||
|
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
use cfg_if::cfg_if;
|
use cfg_if::cfg_if;
|
||||||
|
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(feature = "docs")] {
|
if #[cfg(feature = "docs")] {
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -83,6 +88,55 @@ pub trait Stream {
|
||||||
/// The type of items yielded by this stream.
|
/// The type of items yielded by this stream.
|
||||||
type Item;
|
type Item;
|
||||||
|
|
||||||
|
/// Attempts to receive the next item from the stream.
|
||||||
|
///
|
||||||
|
/// There are several possible return values:
|
||||||
|
///
|
||||||
|
/// * `Poll::Pending` means this stream's next value is not ready yet.
|
||||||
|
/// * `Poll::Ready(None)` means this stream has been exhausted.
|
||||||
|
/// * `Poll::Ready(Some(item))` means `item` was received out of the stream.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use std::pin::Pin;
|
||||||
|
///
|
||||||
|
/// use async_std::prelude::*;
|
||||||
|
/// use async_std::stream;
|
||||||
|
/// use async_std::task::{Context, Poll};
|
||||||
|
///
|
||||||
|
/// fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin {
|
||||||
|
/// struct Increment<S>(S);
|
||||||
|
///
|
||||||
|
/// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
|
||||||
|
/// type Item = S::Item;
|
||||||
|
///
|
||||||
|
/// fn poll_next(
|
||||||
|
/// mut self: Pin<&mut Self>,
|
||||||
|
/// cx: &mut Context<'_>,
|
||||||
|
/// ) -> Poll<Option<Self::Item>> {
|
||||||
|
/// match Pin::new(&mut self.0).poll_next(cx) {
|
||||||
|
/// Poll::Pending => Poll::Pending,
|
||||||
|
/// Poll::Ready(None) => Poll::Ready(None),
|
||||||
|
/// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Increment(s)
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// let mut s = increment(stream::once(7));
|
||||||
|
///
|
||||||
|
/// assert_eq!(s.next().await, Some(8));
|
||||||
|
/// assert_eq!(s.next().await, None);
|
||||||
|
/// #
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
|
||||||
|
|
||||||
/// Advances the stream and returns the next value.
|
/// Advances the stream and returns the next value.
|
||||||
///
|
///
|
||||||
/// Returns [`None`] when iteration is finished. Individual stream implementations may
|
/// Returns [`None`] when iteration is finished. Individual stream implementations may
|
||||||
|
@ -108,7 +162,10 @@ pub trait Stream {
|
||||||
/// ```
|
/// ```
|
||||||
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
|
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
|
||||||
where
|
where
|
||||||
Self: Unpin;
|
Self: Unpin,
|
||||||
|
{
|
||||||
|
NextFuture { stream: self }
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a stream that yields its first `n` elements.
|
/// Creates a stream that yields its first `n` elements.
|
||||||
///
|
///
|
||||||
|
@ -342,17 +399,61 @@ pub trait Stream {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
|
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Unpin + Sized,
|
||||||
F: FnMut(Self::Item) -> bool,
|
F: FnMut(Self::Item) -> bool,
|
||||||
{
|
{
|
||||||
AllFuture {
|
AllFuture {
|
||||||
stream: self,
|
stream: self,
|
||||||
result: true, // the default if the empty stream
|
result: true, // the default if the empty stream
|
||||||
__item: PhantomData,
|
_marker: PhantomData,
|
||||||
f,
|
f,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Searches for an element in a stream that satisfies a predicate.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// Basic usage:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::prelude::*;
|
||||||
|
/// use std::collections::VecDeque;
|
||||||
|
///
|
||||||
|
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
|
||||||
|
/// let res = s.find(|x| *x == 2).await;
|
||||||
|
/// assert_eq!(res, Some(2));
|
||||||
|
/// #
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Resuming after a first find:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::prelude::*;
|
||||||
|
/// use std::collections::VecDeque;
|
||||||
|
///
|
||||||
|
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
|
||||||
|
/// let res = s.find(|x| *x == 2).await;
|
||||||
|
/// assert_eq!(res, Some(2));
|
||||||
|
///
|
||||||
|
/// let next = s.next().await;
|
||||||
|
/// assert_eq!(next, Some(3));
|
||||||
|
/// #
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
fn find<P>(&mut self, p: P) -> ret!('_, FindFuture, Option<Self::Item>, P, Self::Item)
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
P: FnMut(&Self::Item) -> bool,
|
||||||
|
{
|
||||||
|
FindFuture::new(self, p)
|
||||||
|
}
|
||||||
|
|
||||||
/// Applies function to the elements of stream and returns the first non-none result.
|
/// Applies function to the elements of stream and returns the first non-none result.
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -422,13 +523,13 @@ pub trait Stream {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
|
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Unpin + Sized,
|
||||||
F: FnMut(Self::Item) -> bool,
|
F: FnMut(Self::Item) -> bool,
|
||||||
{
|
{
|
||||||
AnyFuture {
|
AnyFuture {
|
||||||
stream: self,
|
stream: self,
|
||||||
result: false, // the default if the empty stream
|
result: false, // the default if the empty stream
|
||||||
__item: PhantomData,
|
_marker: PhantomData,
|
||||||
f,
|
f,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -437,10 +538,7 @@ pub trait Stream {
|
||||||
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
|
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
|
||||||
type Item = <Self as futures_core::stream::Stream>::Item;
|
type Item = <Self as futures_core::stream::Stream>::Item;
|
||||||
|
|
||||||
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
where
|
futures_core::stream::Stream::poll_next(self, cx)
|
||||||
Self: Unpin,
|
|
||||||
{
|
|
||||||
NextFuture { stream: self }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
use crate::future::Future;
|
|
||||||
use crate::task::{Context, Poll};
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::stream::Stream;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct NextFuture<'a, T: Unpin + ?Sized> {
|
pub struct NextFuture<'a, T: Unpin + ?Sized> {
|
||||||
pub(crate) stream: &'a mut T,
|
pub(crate) stream: &'a mut T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
|
impl<T: Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
|
||||||
type Output = Option<T::Item>;
|
type Output = Option<T::Item>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use crate::task::{Context, Poll};
|
|
||||||
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use crate::stream::Stream;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
/// A stream that yields the first `n` items of another stream.
|
/// A stream that yields the first `n` items of another stream.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Take<S> {
|
pub struct Take<S> {
|
||||||
|
@ -11,12 +12,12 @@ pub struct Take<S> {
|
||||||
|
|
||||||
impl<S: Unpin> Unpin for Take<S> {}
|
impl<S: Unpin> Unpin for Take<S> {}
|
||||||
|
|
||||||
impl<S: futures_core::stream::Stream> Take<S> {
|
impl<S: Stream> Take<S> {
|
||||||
pin_utils::unsafe_pinned!(stream: S);
|
pin_utils::unsafe_pinned!(stream: S);
|
||||||
pin_utils::unsafe_unpinned!(remaining: usize);
|
pin_utils::unsafe_unpinned!(remaining: usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
|
impl<S: Stream> futures_core::stream::Stream for Take<S> {
|
||||||
type Item = S::Item;
|
type Item = S::Item;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||||
|
|
Loading…
Reference in a new issue