//! Composable asynchronous iteration. //! //! This module is an async version of [`std::iter`]. //! //! If you've found yourself with an asynchronous collection of some kind, //! and needed to perform an operation on the elements of said collection, //! you'll quickly run into 'streams'. Streams are heavily used in idiomatic //! asynchronous Rust code, so it's worth becoming familiar with them. //! //! Before explaining more, let's talk about how this module is structured: //! //! # Organization //! //! This module is largely organized by type: //! //! * [Traits] are the core portion: these traits define what kind of streams //! exist and what you can do with them. The methods of these traits are worth //! putting some extra study time into. //! * [Functions] provide some helpful ways to create some basic streams. //! * [Structs] are often the return types of the various methods on this //! module's traits. You'll usually want to look at the method that creates //! the `struct`, rather than the `struct` itself. For more detail about why, //! see '[Implementing Stream](#implementing-stream)'. //! //! [Traits]: #traits //! [Functions]: #functions //! [Structs]: #structs //! //! That's it! Let's dig into streams. //! //! # Stream //! //! The heart and soul of this module is the [`Stream`] trait. The core of //! [`Stream`] looks like this: //! //! ``` //! # use async_std::task::{Context, Poll}; //! # use std::pin::Pin; //! trait Stream { //! type Item; //! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; //! } //! ``` //! //! A stream has a method, [`next`], which when called, returns an //! [`Poll`]<[`Option`]`>`. [`next`] will return `Ready(Some(Item))` //! as long as there are elements, and once they've all been exhausted, will //! return `None` to indicate that iteration is finished. If we're waiting on //! something asynchronous to resolve `Pending` is returned. //! //! Individual streams may choose to resume iteration, and so calling //! [`next`] again may or may not eventually start returning `Ready(Some(Item))` //! again at some point. //! //! [`Stream`]'s full definition includes a number of other methods as well, //! but they are default methods, built on top of [`next`], and so you get //! them for free. //! //! Streams are also composable, and it's common to chain them together to do //! more complex forms of processing. See the [Adapters](#adapters) section //! below for more details. //! //! [`Poll`]: ../task/enum.Poll.html //! [`Stream`]: trait.Stream.html //! [`next`]: trait.Stream.html#tymethod.next //! [`Option`]: ../../std/option/enum.Option.html //! //! # The three forms of streaming //! //! There are three common methods which can create streams from a collection: //! //! * `stream()`, which iterates over `&T`. //! * `stream_mut()`, which iterates over `&mut T`. //! * `into_stream()`, which iterates over `T`. //! //! Various things in async-std may implement one or more of the //! three, where appropriate. //! //! # Implementing Stream //! //! Creating a stream of your own involves two steps: creating a `struct` to //! hold the stream's state, and then `impl`ementing [`Stream`] for that //! `struct`. This is why there are so many `struct`s in this module: there is //! one for each stream and iterator adapter. //! //! Let's make a stream named `Counter` which counts from `1` to `5`: //! //! ``` //! # use async_std::prelude::*; //! # use async_std::task::{Context, Poll}; //! # use std::pin::Pin; //! // First, the struct: //! //! /// A stream which counts from one to five //! struct Counter { //! count: usize, //! } //! //! // we want our count to start at one, so let's add a new() method to help. //! // This isn't strictly necessary, but is convenient. Note that we start //! // `count` at zero, we'll see why in `next()`'s implementation below. //! impl Counter { //! fn new() -> Counter { //! Counter { count: 0 } //! } //! } //! //! // Then, we implement `Stream` for our `Counter`: //! //! impl Stream for Counter { //! // we will be counting with usize //! type Item = usize; //! //! // poll_next() is the only required method //! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { //! // Increment our count. This is why we started at zero. //! self.count += 1; //! //! // Check to see if we've finished counting or not. //! if self.count < 6 { //! Poll::Ready(Some(self.count)) //! } else { //! Poll::Ready(None) //! } //! } //! } //! //! // And now we can use it! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # //! let mut counter = Counter::new(); //! //! let x = counter.next().await.unwrap(); //! println!("{}", x); //! //! let x = counter.next().await.unwrap(); //! println!("{}", x); //! //! let x = counter.next().await.unwrap(); //! println!("{}", x); //! //! let x = counter.next().await.unwrap(); //! println!("{}", x); //! //! let x = counter.next().await.unwrap(); //! println!("{}", x); //! # //! # Ok(()) }) } //! ``` //! //! This will print `1` through `5`, each on their own line. //! //! Calling `next().await` this way gets repetitive. Rust has a construct which //! can call `next()` on your stream, until it reaches `None`. Let's go over //! that next. //! //! # while let Loops and IntoStream //! //! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic //! example of `while let`: //! //! ``` //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # //! # use async_std::prelude::*; //! # use async_std::stream; //! let mut values = stream::repeat(1u8).take(5); //! //! while let Some(x) = values.next().await { //! println!("{}", x); //! } //! # //! # Ok(()) }) } //! ``` //! //! This will print the numbers one through five, each on their own line. But //! you'll notice something here: we never called anything on our vector to //! produce a stream. What gives? //! //! There's a trait in the standard library for converting something into an //! stream: [`IntoStream`]. This trait has one method, [`into_stream`], //! which converts the thing implementing [`IntoStream`] into a stream. //! //! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler //! support yet. This means that automatic conversions like with `for` loops //! doesn't occur yet, and `into_stream` will always have to be called manually. //! //! [`IntoStream`]: trait.IntoStream.html //! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream //! //! # Adapters //! //! Functions which take an [`Stream`] and return another [`Stream`] are //! often called 'stream adapters', as they are a form of the 'adapter //! pattern'. //! //! Common stream adapters include [`map`], [`take`], and [`filter`]. //! For more, see their documentation. //! //! [`map`]: trait.Stream.html#method.map //! [`take`]: trait.Stream.html#method.take //! [`filter`]: trait.Stream.html#method.filter //! //! # Laziness //! //! Streams (and stream [adapters](#adapters)) are *lazy*. This means that //! just creating a stream doesn't _do_ a whole lot. Nothing really happens //! until you call [`next`]. This is sometimes a source of confusion when //! creating a stream solely for its side effects. For example, the [`map`] //! method calls a closure on each element it iterates over: //! //! ``` //! # #![allow(unused_must_use)] //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # //! # use async_std::prelude::*; //! # use async_std::stream; //! let v = stream::repeat(1u8).take(5); //! v.map(|x| println!("{}", x)); //! # //! # Ok(()) }) } //! ``` //! //! This will not print any values, as we only created a stream, rather than //! using it. The compiler will warn us about this kind of behavior: //! //! ```text //! warning: unused result that must be used: streams are lazy and //! do nothing unless consumed //! ``` //! //! The idiomatic way to write a [`map`] for its side effects is to use a //! `while let` loop instead: //! //! ``` //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # //! # use async_std::prelude::*; //! # use async_std::stream; //! let mut v = stream::repeat(1u8).take(5); //! //! while let Some(x) = &v.next().await { //! println!("{}", x); //! } //! # //! # Ok(()) }) } //! ``` //! //! [`map`]: trait.Stream.html#method.map //! //! The two most common ways to evaluate a stream are to use a `while let` loop //! like this, or using the [`collect`] method to produce a new collection. //! //! [`collect`]: trait.Stream.html#method.collect //! //! # Infinity //! //! Streams do not have to be finite. As an example, an repeat stream is //! an infinite stream: //! //! ``` //! # use async_std::stream; //! let numbers = stream::repeat(1u8); //! ``` //! //! It is common to use the [`take`] stream adapter to turn an infinite //! stream into a finite one: //! //! ``` //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # //! # use async_std::prelude::*; //! # use async_std::stream; //! let numbers = stream::repeat(1u8); //! let mut five_numbers = numbers.take(5); //! //! while let Some(number) = five_numbers.next().await { //! println!("{}", number); //! } //! # //! # Ok(()) }) } //! ``` //! //! This will print the numbers `0` through `4`, each on their own line. //! //! Bear in mind that methods on infinite streams, even those for which a //! result can be determined mathematically in finite time, may not terminate. //! Specifically, methods such as [`min`], which in the general case require //! traversing every element in the stream, are likely not to return //! successfully for any infinite streams. //! //! ```ignore //! let ones = async_std::stream::repeat(1); //! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! //! // `ones.min()` causes an infinite loop, so we won't reach this point! //! println!("The smallest number one is {}.", least); //! ``` //! //! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html //! [`take`]: trait.Stream.html#method.take //! [`min`]: trait.Stream.html#method.min pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; pub use from_iter::{from_iter, FromIter}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; pub(crate) mod stream; mod empty; mod from_fn; mod from_iter; mod once; mod repeat; mod repeat_with; cfg_unstable! { mod double_ended_stream; mod exact_size_stream; mod extend; mod from_stream; mod fused_stream; mod interval; mod into_stream; mod product; mod successors; mod sum; pub use double_ended_stream::DoubleEndedStream; pub use exact_size_stream::ExactSizeStream; pub use extend::{extend, Extend}; pub use from_stream::FromStream; pub use fused_stream::FusedStream; pub use interval::{interval, Interval}; pub use into_stream::IntoStream; pub use product::Product; pub use stream::Merge; pub use successors::{successors, Successors}; pub use sum::Sum; }