Merge branch 'master' into add_stdin_lock
commit
48b255897e
@ -1,68 +0,0 @@
|
||||
language: rust
|
||||
|
||||
env:
|
||||
- RUSTFLAGS="-D warnings"
|
||||
|
||||
# Cache the whole `~/.cargo` directory to keep `~/cargo/.crates.toml`.
|
||||
cache:
|
||||
directories:
|
||||
- /home/travis/.cargo
|
||||
|
||||
# Don't cache the cargo registry because it's too big.
|
||||
before_cache:
|
||||
- rm -rf /home/travis/.cargo/registry
|
||||
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- staging
|
||||
- trying
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
- rust: nightly
|
||||
os: linux
|
||||
|
||||
- rust: nightly
|
||||
os: osx
|
||||
osx_image: xcode9.2
|
||||
|
||||
- rust: nightly-x86_64-pc-windows-msvc
|
||||
os: windows
|
||||
|
||||
- name: fmt
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script: |
|
||||
if ! rustup component add rustfmt; then
|
||||
target=`curl https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/rustfmt`;
|
||||
echo "'rustfmt' is unavailable on the toolchain 'nightly', use the toolchain 'nightly-$target' instead";
|
||||
rustup toolchain install nightly-$target;
|
||||
rustup default nightly-$target;
|
||||
rustup component add rustfmt;
|
||||
fi
|
||||
script:
|
||||
- cargo fmt --all -- --check
|
||||
|
||||
- name: docs
|
||||
rust: nightly
|
||||
os: linux
|
||||
script:
|
||||
- cargo doc --features docs
|
||||
|
||||
- name: book
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script:
|
||||
- test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
|
||||
- cargo build # to find 'extern crate async_std' by `mdbook test`
|
||||
script:
|
||||
- mdbook build docs
|
||||
- mdbook test -L ./target/debug/deps docs
|
||||
|
||||
script:
|
||||
- cargo check --all --benches --bins --examples --tests
|
||||
- cargo check --features unstable --all --benches --bins --examples --tests
|
||||
- cargo test --all --doc --features unstable
|
@ -0,0 +1,139 @@
|
||||
extension_trait! {
|
||||
use std::pin::Pin;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc = r#"
|
||||
A future represents an asynchronous computation.
|
||||
|
||||
A future is a value that may not have finished computing yet. This kind of
|
||||
"asynchronous value" makes it possible for a thread to continue doing useful
|
||||
work while it waits for the value to become available.
|
||||
|
||||
# The `poll` method
|
||||
|
||||
The core method of future, `poll`, *attempts* to resolve the future into a
|
||||
final value. This method does not block if the value is not ready. Instead,
|
||||
the current task is scheduled to be woken up when it's possible to make
|
||||
further progress by `poll`ing again. The `context` passed to the `poll`
|
||||
method can provide a [`Waker`], which is a handle for waking up the current
|
||||
task.
|
||||
|
||||
When using a future, you generally won't call `poll` directly, but instead
|
||||
`.await` the value.
|
||||
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
"#]
|
||||
pub trait Future {
|
||||
#[doc = r#"
|
||||
The type of value produced on completion.
|
||||
"#]
|
||||
type Output;
|
||||
|
||||
#[doc = r#"
|
||||
Attempt to resolve the future to a final value, registering
|
||||
the current task for wakeup if the value is not yet available.
|
||||
|
||||
# Return value
|
||||
|
||||
This function returns:
|
||||
|
||||
- [`Poll::Pending`] if the future is not ready yet
|
||||
- [`Poll::Ready(val)`] with the result `val` of this future if it
|
||||
finished successfully.
|
||||
|
||||
Once a future has finished, clients should not `poll` it again.
|
||||
|
||||
When a future is not ready yet, `poll` returns `Poll::Pending` and
|
||||
stores a clone of the [`Waker`] copied from the current [`Context`].
|
||||
This [`Waker`] is then woken once the future can make progress.
|
||||
For example, a future waiting for a socket to become
|
||||
readable would call `.clone()` on the [`Waker`] and store it.
|
||||
When a signal arrives elsewhere indicating that the socket is readable,
|
||||
[`Waker::wake`] is called and the socket future's task is awoken.
|
||||
Once a task has been woken up, it should attempt to `poll` the future
|
||||
again, which may or may not produce a final value.
|
||||
|
||||
Note that on multiple calls to `poll`, only the [`Waker`] from the
|
||||
[`Context`] passed to the most recent call should be scheduled to
|
||||
receive a wakeup.
|
||||
|
||||
# Runtime characteristics
|
||||
|
||||
Futures alone are *inert*; they must be *actively* `poll`ed to make
|
||||
progress, meaning that each time the current task is woken up, it should
|
||||
actively re-`poll` pending futures that it still has an interest in.
|
||||
|
||||
The `poll` function is not called repeatedly in a tight loop -- instead,
|
||||
it should only be called when the future indicates that it is ready to
|
||||
make progress (by calling `wake()`). If you're familiar with the
|
||||
`poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures
|
||||
typically do *not* suffer the same problems of "all wakeups must poll
|
||||
all events"; they are more like `epoll(4)`.
|
||||
|
||||
An implementation of `poll` should strive to return quickly, and should
|
||||
not block. Returning quickly prevents unnecessarily clogging up
|
||||
threads or event loops. If it is known ahead of time that a call to
|
||||
`poll` may end up taking awhile, the work should be offloaded to a
|
||||
thread pool (or something similar) to ensure that `poll` can return
|
||||
quickly.
|
||||
|
||||
# Panics
|
||||
|
||||
Once a future has completed (returned `Ready` from `poll`), calling its
|
||||
`poll` method again may panic, block forever, or cause other kinds of
|
||||
problems; the `Future` trait places no requirements on the effects of
|
||||
such a call. However, as the `poll` method is not marked `unsafe`,
|
||||
Rust's usual rules apply: calls must never cause undefined behavior
|
||||
(memory corruption, incorrect use of `unsafe` functions, or the like),
|
||||
regardless of the future's state.
|
||||
|
||||
[`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
|
||||
[`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
|
||||
[`Context`]: ../task/struct.Context.html
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
[`Waker::wake`]: ../task/struct.Waker.html#method.wake
|
||||
"#]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
|
||||
}
|
||||
|
||||
pub trait FutureExt: std::future::Future {
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for Box<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for &mut F {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> Future for Pin<P>
|
||||
where
|
||||
P: DerefMut + Unpin,
|
||||
<P as Deref>::Target: Future,
|
||||
{
|
||||
type Output = <<P as Deref>::Target as Future>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future> Future for std::panic::AssertUnwindSafe<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
use crate::future::Future;
|
||||
|
||||
/// Convert a type into a `Future`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::future::{Future, IntoFuture};
|
||||
/// use async_std::io;
|
||||
/// use async_std::pin::Pin;
|
||||
///
|
||||
/// struct Client;
|
||||
///
|
||||
/// impl Client {
|
||||
/// pub async fn send(self) -> io::Result<()> {
|
||||
/// // Send a request
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl IntoFuture for Client {
|
||||
/// type Output = io::Result<()>;
|
||||
///
|
||||
/// type Future = Pin<Box<dyn Future<Output = Self::Output>>>;
|
||||
///
|
||||
/// fn into_future(self) -> Self::Future {
|
||||
/// Box::pin(async {
|
||||
/// self.send().await
|
||||
/// })
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait IntoFuture {
|
||||
/// The type of value produced on completion.
|
||||
type Output;
|
||||
|
||||
/// Which kind of future are we turning this into?
|
||||
type Future: Future<Output = Self::Output>;
|
||||
|
||||
/// Create a future from a value
|
||||
fn into_future(self) -> Self::Future;
|
||||
}
|
||||
|
||||
impl<T: Future> IntoFuture for T {
|
||||
type Output = T::Output;
|
||||
|
||||
type Future = T;
|
||||
|
||||
fn into_future(self) -> Self::Future {
|
||||
self
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::io::{self, Seek, SeekFrom};
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct SeekFuture<'a, T: Unpin + ?Sized> {
|
||||
pub(crate) seeker: &'a mut T,
|
||||
pub(crate) pos: SeekFrom,
|
||||
}
|
||||
|
||||
impl<T: Seek + Unpin + ?Sized> Future for SeekFuture<'_, T> {
|
||||
type Output = io::Result<u64>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let pos = self.pos;
|
||||
Pin::new(&mut *self.seeker).poll_seek(cx, pos)
|
||||
}
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
//! OS-specific extensions.
|
||||
|
||||
#[cfg(any(unix, feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
|
||||
cfg_unix! {
|
||||
pub mod unix;
|
||||
}
|
||||
|
||||
#[cfg(any(windows, feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
|
||||
cfg_windows! {
|
||||
pub mod windows;
|
||||
}
|
||||
|
@ -0,0 +1,97 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// A stream that yields elements by calling a closure.
|
||||
///
|
||||
/// This stream is constructed by [`from_fn`] function.
|
||||
///
|
||||
/// [`from_fn`]: fn.from_fn.html
|
||||
#[derive(Debug)]
|
||||
pub struct FromFn<F, Fut, T> {
|
||||
f: F,
|
||||
#[pin]
|
||||
future: Option<Fut>,
|
||||
__t: PhantomData<T>,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new stream where to produce each new element a provided closure is called.
|
||||
///
|
||||
/// This allows creating a custom stream with any behaviour without using the more verbose
|
||||
/// syntax of creating a dedicated type and implementing a `Stream` trait for it.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::sync::Mutex;
|
||||
/// use std::sync::Arc;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let count = Arc::new(Mutex::new(0u8));
|
||||
/// let s = stream::from_fn(|| {
|
||||
/// let count = Arc::clone(&count);
|
||||
///
|
||||
/// async move {
|
||||
/// *count.lock().await += 1;
|
||||
///
|
||||
/// if *count.lock().await > 3 {
|
||||
/// None
|
||||
/// } else {
|
||||
/// Some(*count.lock().await)
|
||||
/// }
|
||||
/// }
|
||||
/// });
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(2));
|
||||
/// assert_eq!(s.next().await, Some(3));
|
||||
/// assert_eq!(s.next().await, None);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Option<T>>,
|
||||
{
|
||||
FromFn {
|
||||
f,
|
||||
future: None,
|
||||
__t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Fut, T> Stream for FromFn<F, Fut, T>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Option<T>>,
|
||||
{
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
if this.future.is_some() {
|
||||
let next =
|
||||
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
|
||||
this.future.set(None);
|
||||
|
||||
return Poll::Ready(next);
|
||||
} else {
|
||||
let fut = (this.f)();
|
||||
this.future.set(Some(fut));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,192 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures_core::future::Future;
|
||||
use futures_core::stream::Stream;
|
||||
use futures_timer::Delay;
|
||||
|
||||
/// Creates a new stream that yields at a set interval.
|
||||
///
|
||||
/// The stream first yields after `dur`, and continues to yield every
|
||||
/// `dur` after that. The stream accounts for time elapsed between calls, and
|
||||
/// will adjust accordingly to prevent time skews.
|
||||
///
|
||||
/// Each interval may be slightly longer than the specified duration, but never
|
||||
/// less.
|
||||
///
|
||||
/// Note that intervals are not intended for high resolution timers, but rather
|
||||
/// they will likely fire some granularity after the exact instant that they're
|
||||
/// otherwise indicated to fire at.
|
||||
///
|
||||
/// See also: [`task::sleep`].
|
||||
///
|
||||
/// [`task::sleep`]: ../task/fn.sleep.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic example:
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// let mut interval = stream::interval(Duration::from_secs(4));
|
||||
/// while let Some(_) = interval.next().await {
|
||||
/// println!("prints every four seconds");
|
||||
/// }
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub fn interval(dur: Duration) -> Interval {
|
||||
Interval {
|
||||
delay: Delay::new(dur),
|
||||
interval: dur,
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream representing notifications at fixed interval
|
||||
///
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug)]
|
||||
pub struct Interval {
|
||||
delay: Delay,
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
impl Stream for Interval {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if Pin::new(&mut self.delay).poll(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
let when = Instant::now();
|
||||
let next = next_interval(when, Instant::now(), self.interval);
|
||||
self.delay.reset(next);
|
||||
Poll::Ready(Some(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts Duration object to raw nanoseconds if possible
|
||||
///
|
||||
/// This is useful to divide intervals.
|
||||
///
|
||||
/// While technically for large duration it's impossible to represent any
|
||||
/// duration as nanoseconds, the largest duration we can represent is about
|
||||
/// 427_000 years. Large enough for any interval we would use or calculate in
|
||||
/// tokio.
|
||||
fn duration_to_nanos(dur: Duration) -> Option<u64> {
|
||||
dur.as_secs()
|
||||
.checked_mul(1_000_000_000)
|
||||
.and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
|
||||
}
|
||||
|
||||
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
|
||||
let new = prev + interval;
|
||||
if new > now {
|
||||
return new;
|
||||
}
|
||||
|
||||
let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
|
||||
let interval_ns =
|
||||
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
|
||||
let mult = spent_ns / interval_ns + 1;
|
||||
assert!(
|
||||
mult < (1 << 32),
|
||||
"can't skip more than 4 billion intervals of {:?} \
|
||||
(trying to skip {})",
|
||||
interval,
|
||||
mult
|
||||
);
|
||||
prev + interval * (mult as u32)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::next_interval;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct Timeline(Instant);
|
||||
|
||||
impl Timeline {
|
||||
fn new() -> Timeline {
|
||||
Timeline(Instant::now())
|
||||
}
|
||||
fn at(&self, millis: u64) -> Instant {
|
||||
self.0 + Duration::from_millis(millis)
|
||||
}
|
||||
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
|
||||
self.0 + Duration::new(sec, nanos)
|
||||
}
|
||||
}
|
||||
|
||||
fn dur(millis: u64) -> Duration {
|
||||
Duration::from_millis(millis)
|
||||
}
|
||||
|
||||
// The math around Instant/Duration isn't 100% precise due to rounding
|
||||
// errors, see #249 for more info
|
||||
fn almost_eq(a: Instant, b: Instant) -> bool {
|
||||
if a == b {
|
||||
true
|
||||
} else if a > b {
|
||||
a - b < Duration::from_millis(1)
|
||||
} else {
|
||||
b - a < Duration::from_millis(1)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn norm_next() {
|
||||
let tm = Timeline::new();
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(2), dur(10)),
|
||||
tm.at(11)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(7777), tm.at(7788), dur(100)),
|
||||
tm.at(7877)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(1000), dur(2100)),
|
||||
tm.at(2101)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fast_forward() {
|
||||
let tm = Timeline::new();
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(1000), dur(10)),
|
||||
tm.at(1001)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(7777), tm.at(8888), dur(100)),
|
||||
tm.at(8977)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(10000), dur(2100)),
|
||||
tm.at(10501)
|
||||
));
|
||||
}
|
||||
|
||||
/// TODO: this test actually should be successful, but since we can't
|
||||
/// multiply Duration on anything larger than u32 easily we decided
|
||||
/// to allow it to fail for now
|
||||
#[test]
|
||||
#[should_panic(expected = "can't skip more than 4 billion intervals")]
|
||||
fn large_skip() {
|
||||
let tm = Timeline::new();
|
||||
assert_eq!(
|
||||
next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
|
||||
tm.at_ns(25, 1)
|
||||
);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
|
||||
/// Trait to represent types that can be created by productming up a stream.
|
||||
///
|
||||
/// This trait is used to implement the [`product`] method on streams. Types which
|
||||
/// implement the trait can be generated by the [`product`] method. Like
|
||||
/// [`FromStream`] this trait should rarely be called directly and instead
|
||||
/// interacted with through [`Stream::product`].
|
||||
///
|
||||
/// [`product`]: trait.Product.html#tymethod.product
|
||||
/// [`FromStream`]: trait.FromStream.html
|
||||
/// [`Stream::product`]: trait.Stream.html#method.product
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait Product<A = Self>: Sized {
|
||||
/// Method which takes a stream and generates `Self` from the elements by
|
||||
/// multiplying the items.
|
||||
fn product<S, F>(stream: S) -> F
|
||||
where
|
||||
S: Stream<Item = A>,
|
||||
F: Future<Output = Self>;
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
|
||||
///
|
||||
/// This stream is constructed by the [`repeat_with`] function.
|
||||
///
|
||||
/// [`repeat_with`]: fn.repeat_with.html
|
||||
#[derive(Debug)]
|
||||
pub struct RepeatWith<F, Fut, A> {
|
||||
f: F,
|
||||
#[pin]
|
||||
future: Option<Fut>,
|
||||
__a: PhantomData<A>,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic usage:
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let s = stream::repeat_with(|| async { 1 });
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
///
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// # })
|
||||
/// ```
|
||||
///
|
||||
/// Going finite:
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let s = stream::repeat_with(|| async { 1u8 }).take(2);
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
///
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, None);
|
||||
/// # })
|
||||
/// ```
|
||||
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = A>,
|
||||
{
|
||||
RepeatWith {
|
||||
f: repeater,
|
||||
future: None,
|
||||
__a: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Fut, A> Stream for RepeatWith<F, Fut, A>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = A>,
|
||||
{
|
||||
type Item = A;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
if this.future.is_some() {
|
||||
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
|
||||
|
||||
this.future.set(None);
|
||||
|
||||
return Poll::Ready(Some(res));
|
||||
} else {
|
||||
let fut = (this.f)();
|
||||
|
||||
this.future.set(Some(fut));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::fuse::Fuse;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
// Lexicographically compares the elements of this `Stream` with those
|
||||
// of another using `Ord`.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct CmpFuture<L: Stream, R: Stream> {
|
||||
#[pin]
|
||||
l: Fuse<L>,
|
||||
#[pin]
|
||||
r: Fuse<R>,
|
||||
l_cache: Option<L::Item>,
|
||||
r_cache: Option<R::Item>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> CmpFuture<L, R> {
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
CmpFuture {
|
||||
l: l.fuse(),
|
||||
r: r.fuse(),
|
||||
l_cache: None,
|
||||
r_cache: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for CmpFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream<Item = L::Item> + Sized,
|
||||
L::Item: Ord,
|
||||
{
|
||||
type Output = Ordering;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
// Stream that completes earliest can be considered Less, etc
|
||||
let l_complete = this.l.done && this.l_cache.is_none();
|
||||
let r_complete = this.r.done && this.r_cache.is_none();
|
||||
|
||||
if l_complete && r_complete {
|
||||
return Poll::Ready(Ordering::Equal);
|
||||
} else if l_complete {
|
||||
return Poll::Ready(Ordering::Less);
|
||||
} else if r_complete {
|
||||
return Poll::Ready(Ordering::Greater);
|
||||
}
|
||||
|
||||
// Get next value if possible and necesary
|
||||
if !this.l.done && this.l_cache.is_none() {
|
||||
let l_next = futures_core::ready!(this.l.as_mut().poll_next(cx));
|
||||
if let Some(item) = l_next {
|
||||
*this.l_cache = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
if !this.r.done && this.r_cache.is_none() {
|
||||
let r_next = futures_core::ready!(this.r.as_mut().poll_next(cx));
|
||||
if let Some(item) = r_next {
|
||||
*this.r_cache = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
// Compare if both values are available.
|
||||
if this.l_cache.is_some() && this.r_cache.is_some() {
|
||||
let l_value = this.l_cache.take().unwrap();
|
||||
let r_value = this.r_cache.take().unwrap();
|
||||
let result = l_value.cmp(&r_value);
|
||||
|
||||
if let Ordering::Equal = result {
|
||||
// Reset cache to prepare for next comparison
|
||||
*this.l_cache = None;
|
||||
*this.r_cache = None;
|
||||
} else {
|
||||
// Return non equal value
|
||||
return Poll::Ready(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// greater than or equal to those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct GeFuture<L: Stream, R: Stream> {
|
||||
#[pin]
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> GeFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
GeFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for GeFuture<L, R>
|
||||
where
|
||||
L: Stream,
|
||||
R: Stream,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Greater) | Some(Ordering::Equal) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// greater than those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct GtFuture<L: Stream, R: Stream> {
|
||||
#[pin]
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> GtFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
GtFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for GtFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Greater) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LastFuture<S, T> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
last: Option<T>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> LastFuture<S, T> {
|
||||
pub(crate) fn new(stream: S) -> Self {
|
||||
LastFuture { stream, last: None }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for LastFuture<S, S::Item>
|
||||
where
|
||||
S: Stream + Unpin + Sized,
|
||||
S::Item: Copy,
|
||||
{
|
||||
type Output = Option<S::Item>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let next = futures_core::ready!(this.stream.poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(new) => {
|
||||
cx.waker().wake_by_ref();
|
||||
*this.last = Some(new);
|
||||
Poll::Pending
|
||||
}
|
||||
None => Poll::Ready(*this.last),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// Determines if the elements of this `Stream` are lexicographically
|
||||
/// less or equal to those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LeFuture<L: Stream, R: Stream> {
|
||||
#[pin]
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> LeFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
LeFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for LeFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Less) | Some(Ordering::Equal) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// less than those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LtFuture<L: Stream, R: Stream> {
|
||||
#[pin]
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> LtFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
LtFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for LtFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Less) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue