diff --git a/Cargo.toml b/Cargo.toml index e920739..7ffaae3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,33 +50,33 @@ std = [ ] [dependencies] -async-attributes = { version = "1.1.0", optional = true } +async-attributes = { version = "1.1.1", optional = true } async-macros = { version = "2.0.0", optional = true } async-task = { version = "1.0.0", optional = true } broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } -crossbeam-channel = { version = "0.3.9", optional = true } -crossbeam-deque = { version = "0.7.1", optional = true } -crossbeam-utils = { version = "0.6.6", optional = true } -futures-core = { version = "0.3.0", optional = true } -futures-io = { version = "0.3.0", optional = true } -futures-timer = { version = "1.0.2", optional = true } +crossbeam-channel = { version = "0.4.0", optional = true } +crossbeam-deque = { version = "0.7.2", optional = true } +crossbeam-utils = { version = "0.7.0", optional = true } +futures-core = { version = "0.3.1", optional = true } +futures-io = { version = "0.3.1", optional = true } +futures-timer = { version = "2.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.2.1", optional = true } mio = { version = "0.6.19", optional = true } mio-uds = { version = "0.6.7", optional = true } -num_cpus = { version = "1.10.1", optional = true } +num_cpus = { version = "1.11.1", optional = true } once_cell = { version = "1.2.0", optional = true } -pin-project-lite = { version = "0.1", optional = true } +pin-project-lite = { version = "0.1.1", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } [dev-dependencies] -femme = "1.2.0" +femme = "1.3.0" rand = "0.7.2" surf = "1.0.3" tempdir = "0.3.7" -futures = "0.3.0" +futures = "0.3.1" [[test]] name = "stream" diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f782882..d8b96ec 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -326,6 +326,7 @@ cfg_unstable! { mod interval; mod into_stream; mod product; + mod successors; mod sum; pub use double_ended_stream::DoubleEndedStream; @@ -337,5 +338,6 @@ cfg_unstable! { pub use into_stream::IntoStream; pub use product::Product; pub use stream::Merge; + pub use successors::{successors, Successors}; pub use sum::Sum; } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8938583..f876576 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1446,6 +1446,54 @@ extension_trait! { } } + #[doc = r#" + Borrows an stream, rather than consuming it. + + This is useful to allow applying stream adaptors while still retaining ownership of the original stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let a = vec![1isize, 2, 3]; + + let stream = stream::from_iter(a); + + let sum: isize = stream.take(5).sum().await; + + assert_eq!(sum, 6); + + // if we try to use stream again, it won't work. The following line + // gives "error: use of moved value: `stream` + // assert_eq!(stream.next(), None); + + // let's try that again + let a = vec![1isize, 2, 3]; + + let mut stream = stream::from_iter(a); + + // instead, we add in a .by_ref() + let sum: isize = stream.by_ref().take(2).sum().await; + + assert_eq!(sum, 3); + + // now this is just fine: + assert_eq!(stream.next().await, Some(3)); + assert_eq!(stream.next().await, None); + # + # }) } + ``` + "#] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn by_ref(&mut self) -> &mut Self { + self + } + #[doc = r#" A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs index 74231ab..7e2caad 100644 --- a/src/stream/stream/partition.rs +++ b/src/stream/stream/partition.rs @@ -45,6 +45,7 @@ where match next { Some(v) => { let res = this.res.as_mut().unwrap(); + if (this.f)(&v) { res.0.extend(Some(v)) } else { diff --git a/src/stream/successors.rs b/src/stream/successors.rs new file mode 100644 index 0000000..4421564 --- /dev/null +++ b/src/stream/successors.rs @@ -0,0 +1,77 @@ +use std::mem; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +/// Creates a new stream where to produce each new element a closure is called with the previous +/// value. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let mut s = stream::successors(Some(22), |&val| Some(val + 1)); +/// +/// assert_eq!(s.next().await, Some(22)); +/// assert_eq!(s.next().await, Some(23)); +/// assert_eq!(s.next().await, Some(24)); +/// assert_eq!(s.next().await, Some(25)); +/// +/// # +/// # }) } +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub fn successors(first: Option, succ: F) -> Successors +where + F: FnMut(&T) -> Option, +{ + Successors { succ, slot: first } +} + +pin_project! { + /// A stream that yields elements by calling an async closure with the previous value as an + /// argument + /// + /// This stream is constructed by [`successors`] function + /// + /// [`successors`]: fn.succssors.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[derive(Debug)] + pub struct Successors + where + F: FnMut(&T) -> Option + { + succ: F, + slot: Option, + } +} + +impl Stream for Successors +where + F: FnMut(&T) -> Option, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if this.slot.is_none() { + return Poll::Ready(None); + } + + let mut next = (this.succ)(&this.slot.as_ref().unwrap()); + + // 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration + mem::swap(this.slot, &mut next); + Poll::Ready(next) + } +}