From 97a5f9b50c95e49639bf999227229094afeb37b0 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 10 Sep 2019 23:38:11 +0300 Subject: [PATCH 1/5] adds stream::find combinator --- src/stream/stream/find.rs | 49 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 src/stream/stream/find.rs diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs new file mode 100644 index 00000000..dfab0893 --- /dev/null +++ b/src/stream/stream/find.rs @@ -0,0 +1,49 @@ +use crate::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FindFuture<'a, S, P, T> { + stream: &'a mut S, + p: P, + __t: PhantomData, +} + +impl<'a, S, P, T> FindFuture<'a, S, P, T> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(p: P); + + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + FindFuture { + stream, + p, + __t: PhantomData, + } + } +} + +impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item> +where + S: futures_core::stream::Stream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_core::stream::Stream; + + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match item { + Some(v) => match (self.as_mut().p())(&v) { + true => Poll::Ready(Some(v)), + false => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe28..462aeaed 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod filter_map; +mod find; mod find_map; mod min_by; mod next; @@ -35,6 +36,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; use filter_map::FilterMap; +use find::FindFuture; use find_map::FindMapFuture; use min_by::MinByFuture; use next::NextFuture; @@ -321,6 +323,50 @@ pub trait Stream { } } + /// Searches for an element in a stream that satisfies a predicate. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// # + /// # }) } + /// ``` + /// + /// Resuming after a first find: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// + /// let next = s.next().await; + /// assert_eq!(next, Some(3)); + /// # + /// # }) } + /// ``` + fn find

(&mut self, p: P) -> ret!('_, FindFuture, Option, P, Self::Item) + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + FindFuture::new(self, p) + } + /// Applies function to the elements of stream and returns the first non-none result. /// /// ``` From 06f2569d23732f2bb48427770ae6ef1861e460cc Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Thu, 12 Sep 2019 00:02:57 +0900 Subject: [PATCH 2/5] Add BufRead::fill_buf (#176) * Add BufRead::fill_buf * Make FillBufFuture constructor pub(crate) * Give more information about the transmutation source type --- src/io/buf_read/fill_buf.rs | 32 ++++++++++++++++++++++++++++++++ src/io/buf_read/mod.rs | 22 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 src/io/buf_read/fill_buf.rs diff --git a/src/io/buf_read/fill_buf.rs b/src/io/buf_read/fill_buf.rs new file mode 100644 index 00000000..0ce58cfb --- /dev/null +++ b/src/io/buf_read/fill_buf.rs @@ -0,0 +1,32 @@ +use std::pin::Pin; + +use futures_io::AsyncBufRead; + +use crate::future::Future; +use crate::io; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FillBufFuture<'a, R: ?Sized> { + reader: &'a mut R, +} + +impl<'a, R: ?Sized> FillBufFuture<'a, R> { + pub(crate) fn new(reader: &'a mut R) -> Self { + Self { reader } + } +} + +impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> { + type Output = io::Result<&'a [u8]>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { reader } = &mut *self; + let result = Pin::new(reader).poll_fill_buf(cx); + // This is safe because: + // 1. The buffer is valid for the lifetime of the reader. + // 2. Output is unrelated to the wrapper (Self). + result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) }) + } +} diff --git a/src/io/buf_read/mod.rs b/src/io/buf_read/mod.rs index e320375f..6018439a 100644 --- a/src/io/buf_read/mod.rs +++ b/src/io/buf_read/mod.rs @@ -1,7 +1,9 @@ +mod fill_buf; mod lines; mod read_line; mod read_until; +use fill_buf::FillBufFuture; pub use lines::Lines; use read_line::ReadLineFuture; use read_until::ReadUntilFuture; @@ -41,6 +43,26 @@ cfg_if! { /// [`futures::io::AsyncBufRead`]: /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html pub trait BufRead { + /// Returns the contents of the internal buffer, filling it with more data from the inner + /// reader if it is empty. + /// + /// This function is a lower-level call. It needs to be paired with the [`consume`] method to + /// function properly. When calling this method, none of the contents will be "read" in the + /// sense that later calling `read` may return the same contents. As such, [`consume`] must be + /// called with the number of bytes that are consumed from this buffer to ensure that the bytes + /// are never returned twice. + /// + /// [`consume`]: #tymethod.consume + /// + /// An empty buffer returned indicates that the stream has reached EOF. + // TODO: write a proper doctest with `consume` + fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>) + where + Self: Unpin, + { + FillBufFuture::new(self) + } + /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. /// /// This function will read bytes from the underlying stream until the delimiter or EOF is From 724a9f4eb03e59b4873954bde45949e6d71a92c6 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 11 Sep 2019 17:06:02 +0200 Subject: [PATCH 3/5] Add Stream::poll_next --- src/stream/stream/all.rs | 37 ++++++++----------- src/stream/stream/any.rs | 37 ++++++++----------- src/stream/stream/min_by.rs | 5 +-- src/stream/stream/mod.rs | 72 +++++++++++++++++++++++++++++++------ src/stream/stream/next.rs | 6 ++-- src/stream/stream/take.rs | 9 ++--- 6 files changed, 104 insertions(+), 62 deletions(-) diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 62710240..3b65fc76 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AllFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AllFuture<'_, S, F, T> {} impl Future for AllFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { // don't forget to wake this task again to pull the next item from stream cx.waker().wake_by_ref(); diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index f1f551a1..a23adf4b 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AnyFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AnyFuture<'_, S, F, T> {} impl Future for AnyFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { Poll::Ready(true) } else { diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index b65d88db..3223af92 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -6,7 +6,8 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; /// A future that yields the minimum item in a stream by a given comparison function. -#[derive(Clone, Debug)] +#[doc(hidden)] +#[allow(missing_debug_implementations)] pub struct MinByFuture { stream: S, compare: F, @@ -27,7 +28,7 @@ impl MinByFuture { impl Future for MinByFuture where - S: futures_core::stream::Stream + Unpin, + S: Stream + Unpin, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 91b111e2..17ba5246 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -36,9 +36,12 @@ use next::NextFuture; use std::cmp::Ordering; use std::marker::PhantomData; +use std::pin::Pin; use cfg_if::cfg_if; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -73,6 +76,55 @@ pub trait Stream { /// The type of items yielded by this stream. type Item; + /// Attempts to receive the next item from the stream. + /// + /// There are several possible return values: + /// + /// * `Poll::Pending` means this stream's next value is not ready yet. + /// * `Poll::Ready(None)` means this stream has been exhausted. + /// * `Poll::Ready(Some(item))` means `item` was received out of the stream. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::pin::Pin; + /// + /// use async_std::prelude::*; + /// use async_std::stream; + /// use async_std::task::{Context, Poll}; + /// + /// fn increment(s: impl Stream + Unpin) -> impl Stream + Unpin { + /// struct Increment(S); + /// + /// impl + Unpin> Stream for Increment { + /// type Item = S::Item; + /// + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_>, + /// ) -> Poll> { + /// match Pin::new(&mut self.0).poll_next(cx) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(None) => Poll::Ready(None), + /// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + /// } + /// } + /// } + /// + /// Increment(s) + /// } + /// + /// let mut s = increment(stream::once(7)); + /// + /// assert_eq!(s.next().await, Some(8)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + /// Advances the stream and returns the next value. /// /// Returns [`None`] when iteration is finished. Individual stream implementations may @@ -98,7 +150,10 @@ pub trait Stream { /// ``` fn next(&mut self) -> ret!('_, NextFuture, Option) where - Self: Unpin; + Self: Unpin, + { + NextFuture { stream: self } + } /// Creates a stream that yields its first `n` elements. /// @@ -207,13 +262,13 @@ pub trait Stream { #[inline] fn all(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AllFuture { stream: self, result: true, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -264,13 +319,13 @@ pub trait Stream { #[inline] fn any(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AnyFuture { stream: self, result: false, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -279,10 +334,7 @@ pub trait Stream { impl Stream for T { type Item = ::Item; - fn next(&mut self) -> ret!('_, NextFuture, Option) - where - Self: Unpin, - { - NextFuture { stream: self } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures_core::stream::Stream::poll_next(self, cx) } } diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index b64750d0..de75f5e9 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -1,6 +1,8 @@ +use std::pin::Pin; + use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; -use std::pin::Pin; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -8,7 +10,7 @@ pub struct NextFuture<'a, T: Unpin + ?Sized> { pub(crate) stream: &'a mut T, } -impl Future for NextFuture<'_, T> { +impl Future for NextFuture<'_, T> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 0499a6ac..0dea1d0c 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,7 +1,8 @@ -use crate::task::{Context, Poll}; - use std::pin::Pin; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + /// A stream that yields the first `n` items of another stream. #[derive(Clone, Debug)] pub struct Take { @@ -11,12 +12,12 @@ pub struct Take { impl Unpin for Take {} -impl Take { +impl Take { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(remaining: usize); } -impl futures_core::stream::Stream for Take { +impl futures_core::stream::Stream for Take { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { From ab1e2b403a8a45a92d889aed4acc6d6493496c37 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 11 Sep 2019 17:17:20 +0200 Subject: [PATCH 4/5] Fix compilation errors on latest nightly --- docs/src/tutorial/all_together.md | 3 ++- docs/src/tutorial/clean_shutdown.md | 6 ++++-- docs/src/tutorial/connecting_readers_and_writers.md | 3 ++- docs/src/tutorial/handling_disconnection.md | 3 ++- examples/a-chat/server.rs | 5 ++--- examples/socket-timeouts.rs | 2 +- 6 files changed, 13 insertions(+), 9 deletions(-) diff --git a/docs/src/tutorial/all_together.md b/docs/src/tutorial/all_together.md index a638e02c..641c7da7 100644 --- a/docs/src/tutorial/all_together.md +++ b/docs/src/tutorial/all_together.md @@ -115,7 +115,8 @@ async fn broker(mut events: Receiver) -> Result<()> { Event::Message { from, to, msg } => { for addr in to { if let Some(peer) = peers.get_mut(&addr) { - peer.send(format!("from {}: {}\n", from, msg)).await? + let msg = format!("from {}: {}\n", from, msg); + peer.send(msg).await? } } } diff --git a/docs/src/tutorial/clean_shutdown.md b/docs/src/tutorial/clean_shutdown.md index f61adf2e..992a35d9 100644 --- a/docs/src/tutorial/clean_shutdown.md +++ b/docs/src/tutorial/clean_shutdown.md @@ -115,7 +115,8 @@ Let's add waiting to the server: # Event::Message { from, to, msg } => { # for addr in to { # if let Some(peer) = peers.get_mut(&addr) { -# peer.send(format!("from {}: {}\n", from, msg)).await? +# let msg = format!("from {}: {}\n", from, msg); +# peer.send(msg).await? # } # } # } @@ -217,7 +218,8 @@ async fn broker(mut events: Receiver) -> Result<()> { Event::Message { from, to, msg } => { for addr in to { if let Some(peer) = peers.get_mut(&addr) { - peer.send(format!("from {}: {}\n", from, msg)).await? + let msg = format!("from {}: {}\n", from, msg); + peer.send(msg).await? } } } diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index 7399cec1..d5da471a 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -73,7 +73,8 @@ async fn broker(mut events: Receiver) -> Result<()> { Event::Message { from, to, msg } => { // 3 for addr in to { if let Some(peer) = peers.get_mut(&addr) { - peer.send(format!("from {}: {}\n", from, msg)).await? + let msg = format!("from {}: {}\n", from, msg); + peer.send(msg).await? } } } diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 351f2533..82bbef5a 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -257,7 +257,8 @@ async fn broker(events: Receiver) { Event::Message { from, to, msg } => { for addr in to { if let Some(peer) = peers.get_mut(&addr) { - peer.send(format!("from {}: {}\n", from, msg)).await + let msg = format!("from {}: {}\n", from, msg); + peer.send(fmt).await .unwrap() // 6 } } diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs index 911d1607..77ebfd1e 100644 --- a/examples/a-chat/server.rs +++ b/examples/a-chat/server.rs @@ -139,9 +139,8 @@ async fn broker_loop(mut events: Receiver) { Event::Message { from, to, msg } => { for addr in to { if let Some(peer) = peers.get_mut(&addr) { - peer.send(format!("from {}: {}\n", from, msg)) - .await - .unwrap() + let msg = format!("from {}: {}\n", from, msg); + peer.send(msg).await.unwrap(); } } } diff --git a/examples/socket-timeouts.rs b/examples/socket-timeouts.rs index b2f770e9..894206c6 100644 --- a/examples/socket-timeouts.rs +++ b/examples/socket-timeouts.rs @@ -10,7 +10,7 @@ async fn get() -> io::Result> { let mut buf = vec![]; - io::timeout(Duration::from_secs(5), async { + io::timeout(Duration::from_secs(5), async move { stream.read_to_end(&mut buf).await?; Ok(buf) }) From 0f4f0fb77e1bee5b51409f0d25312a1581cb6339 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 11 Sep 2019 17:29:33 +0200 Subject: [PATCH 5/5] Fix a typo --- docs/src/tutorial/handling_disconnection.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 82bbef5a..27c50523 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -258,7 +258,7 @@ async fn broker(events: Receiver) { for addr in to { if let Some(peer) = peers.get_mut(&addr) { let msg = format!("from {}: {}\n", from, msg); - peer.send(fmt).await + peer.send(msg).await .unwrap() // 6 } }