forked from mirror/async-std
Merge branch 'master' into fs-stream-repeat-with
commit
822e4bc220
@ -1,20 +0,0 @@
|
||||
on: pull_request
|
||||
|
||||
name: Clippy check
|
||||
jobs:
|
||||
clippy_check:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- id: component
|
||||
uses: actions-rs/components-nightly@v1
|
||||
with:
|
||||
component: clippy
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ steps.component.outputs.toolchain }}
|
||||
override: true
|
||||
- run: rustup component add clippy
|
||||
- uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
@ -1,68 +0,0 @@
|
||||
language: rust
|
||||
|
||||
env:
|
||||
- RUSTFLAGS="-D warnings"
|
||||
|
||||
# Cache the whole `~/.cargo` directory to keep `~/cargo/.crates.toml`.
|
||||
cache:
|
||||
directories:
|
||||
- /home/travis/.cargo
|
||||
|
||||
# Don't cache the cargo registry because it's too big.
|
||||
before_cache:
|
||||
- rm -rf /home/travis/.cargo/registry
|
||||
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- staging
|
||||
- trying
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
- rust: nightly
|
||||
os: linux
|
||||
|
||||
- rust: nightly
|
||||
os: osx
|
||||
osx_image: xcode9.2
|
||||
|
||||
- rust: nightly-x86_64-pc-windows-msvc
|
||||
os: windows
|
||||
|
||||
- name: fmt
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script: |
|
||||
if ! rustup component add rustfmt; then
|
||||
target=`curl https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/rustfmt`;
|
||||
echo "'rustfmt' is unavailable on the toolchain 'nightly', use the toolchain 'nightly-$target' instead";
|
||||
rustup toolchain install nightly-$target;
|
||||
rustup default nightly-$target;
|
||||
rustup component add rustfmt;
|
||||
fi
|
||||
script:
|
||||
- cargo fmt --all -- --check
|
||||
|
||||
- name: docs
|
||||
rust: nightly
|
||||
os: linux
|
||||
script:
|
||||
- cargo doc --features docs,unstable
|
||||
|
||||
- name: book
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script:
|
||||
- test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
|
||||
- cargo build # to find 'extern crate async_std' by `mdbook test`
|
||||
script:
|
||||
- mdbook build docs
|
||||
- mdbook test -L ./target/debug/deps docs
|
||||
|
||||
script:
|
||||
- cargo check --all --benches --bins --examples --tests
|
||||
- cargo check --features unstable --all --benches --bins --examples --tests
|
||||
- cargo test --all --doc --features unstable
|
@ -1 +1,7 @@
|
||||
status = ["continuous-integration/travis-ci/push"]
|
||||
status = [
|
||||
"Build and test (ubuntu-latest, nightly)",
|
||||
"Build and test (windows-latest, nightly)",
|
||||
"Build and test (macOS-latest, nightly)",
|
||||
"Checking fmt and docs",
|
||||
"Clippy check",
|
||||
]
|
||||
|
@ -0,0 +1,145 @@
|
||||
use crate::utils::extension_trait;
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "docs")] {
|
||||
use std::pin::Pin;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
}
|
||||
}
|
||||
|
||||
extension_trait! {
|
||||
#[doc = r#"
|
||||
A future represents an asynchronous computation.
|
||||
|
||||
A future is a value that may not have finished computing yet. This kind of
|
||||
"asynchronous value" makes it possible for a thread to continue doing useful
|
||||
work while it waits for the value to become available.
|
||||
|
||||
# The `poll` method
|
||||
|
||||
The core method of future, `poll`, *attempts* to resolve the future into a
|
||||
final value. This method does not block if the value is not ready. Instead,
|
||||
the current task is scheduled to be woken up when it's possible to make
|
||||
further progress by `poll`ing again. The `context` passed to the `poll`
|
||||
method can provide a [`Waker`], which is a handle for waking up the current
|
||||
task.
|
||||
|
||||
When using a future, you generally won't call `poll` directly, but instead
|
||||
`.await` the value.
|
||||
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
"#]
|
||||
pub trait Future {
|
||||
#[doc = r#"
|
||||
The type of value produced on completion.
|
||||
"#]
|
||||
type Output;
|
||||
|
||||
#[doc = r#"
|
||||
Attempt to resolve the future to a final value, registering
|
||||
the current task for wakeup if the value is not yet available.
|
||||
|
||||
# Return value
|
||||
|
||||
This function returns:
|
||||
|
||||
- [`Poll::Pending`] if the future is not ready yet
|
||||
- [`Poll::Ready(val)`] with the result `val` of this future if it
|
||||
finished successfully.
|
||||
|
||||
Once a future has finished, clients should not `poll` it again.
|
||||
|
||||
When a future is not ready yet, `poll` returns `Poll::Pending` and
|
||||
stores a clone of the [`Waker`] copied from the current [`Context`].
|
||||
This [`Waker`] is then woken once the future can make progress.
|
||||
For example, a future waiting for a socket to become
|
||||
readable would call `.clone()` on the [`Waker`] and store it.
|
||||
When a signal arrives elsewhere indicating that the socket is readable,
|
||||
[`Waker::wake`] is called and the socket future's task is awoken.
|
||||
Once a task has been woken up, it should attempt to `poll` the future
|
||||
again, which may or may not produce a final value.
|
||||
|
||||
Note that on multiple calls to `poll`, only the [`Waker`] from the
|
||||
[`Context`] passed to the most recent call should be scheduled to
|
||||
receive a wakeup.
|
||||
|
||||
# Runtime characteristics
|
||||
|
||||
Futures alone are *inert*; they must be *actively* `poll`ed to make
|
||||
progress, meaning that each time the current task is woken up, it should
|
||||
actively re-`poll` pending futures that it still has an interest in.
|
||||
|
||||
The `poll` function is not called repeatedly in a tight loop -- instead,
|
||||
it should only be called when the future indicates that it is ready to
|
||||
make progress (by calling `wake()`). If you're familiar with the
|
||||
`poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures
|
||||
typically do *not* suffer the same problems of "all wakeups must poll
|
||||
all events"; they are more like `epoll(4)`.
|
||||
|
||||
An implementation of `poll` should strive to return quickly, and should
|
||||
not block. Returning quickly prevents unnecessarily clogging up
|
||||
threads or event loops. If it is known ahead of time that a call to
|
||||
`poll` may end up taking awhile, the work should be offloaded to a
|
||||
thread pool (or something similar) to ensure that `poll` can return
|
||||
quickly.
|
||||
|
||||
# Panics
|
||||
|
||||
Once a future has completed (returned `Ready` from `poll`), calling its
|
||||
`poll` method again may panic, block forever, or cause other kinds of
|
||||
problems; the `Future` trait places no requirements on the effects of
|
||||
such a call. However, as the `poll` method is not marked `unsafe`,
|
||||
Rust's usual rules apply: calls must never cause undefined behavior
|
||||
(memory corruption, incorrect use of `unsafe` functions, or the like),
|
||||
regardless of the future's state.
|
||||
|
||||
[`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
|
||||
[`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
|
||||
[`Context`]: ../task/struct.Context.html
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
[`Waker::wake`]: ../task/struct.Waker.html#method.wake
|
||||
"#]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
|
||||
}
|
||||
|
||||
pub trait FutureExt: std::future::Future {
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for Box<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for &mut F {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> Future for Pin<P>
|
||||
where
|
||||
P: DerefMut + Unpin,
|
||||
<P as Deref>::Target: Future,
|
||||
{
|
||||
type Output = <<P as Deref>::Target as Future>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future> Future for std::panic::AssertUnwindSafe<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
use crate::future::Future;
|
||||
|
||||
/// Convert a type into a `Future`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::future::{Future, IntoFuture};
|
||||
/// use async_std::io;
|
||||
/// use async_std::pin::Pin;
|
||||
///
|
||||
/// struct Client;
|
||||
///
|
||||
/// impl Client {
|
||||
/// pub async fn send(self) -> io::Result<()> {
|
||||
/// // Send a request
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl IntoFuture for Client {
|
||||
/// type Output = io::Result<()>;
|
||||
///
|
||||
/// type Future = Pin<Box<dyn Future<Output = Self::Output>>>;
|
||||
///
|
||||
/// fn into_future(self) -> Self::Future {
|
||||
/// Box::pin(async {
|
||||
/// self.send().await
|
||||
/// })
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait IntoFuture {
|
||||
/// The type of value produced on completion.
|
||||
type Output;
|
||||
|
||||
/// Which kind of future are we turning this into?
|
||||
type Future: Future<Output = Self::Output>;
|
||||
|
||||
/// Create a future from a value
|
||||
fn into_future(self) -> Self::Future;
|
||||
}
|
||||
|
||||
impl<T: Future> IntoFuture for T {
|
||||
type Output = T::Output;
|
||||
|
||||
type Future = T;
|
||||
|
||||
fn into_future(self) -> Self::Future {
|
||||
self
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::read_until_internal;
|
||||
use crate::io::{self, BufRead};
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
|
||||
///
|
||||
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
|
||||
///
|
||||
/// This type is an async version of [`std::io::Split`].
|
||||
///
|
||||
/// [`split`]: trait.BufRead.html#method.lines
|
||||
/// [`BufRead`]: trait.BufRead.html
|
||||
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
|
||||
#[derive(Debug)]
|
||||
pub struct Split<R> {
|
||||
pub(crate) reader: R,
|
||||
pub(crate) buf: Vec<u8>,
|
||||
pub(crate) read: usize,
|
||||
pub(crate) delim: u8,
|
||||
}
|
||||
|
||||
impl<R: BufRead> Stream for Split<R> {
|
||||
type Item = io::Result<Vec<u8>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let Self {
|
||||
reader,
|
||||
buf,
|
||||
read,
|
||||
delim,
|
||||
} = unsafe { self.get_unchecked_mut() };
|
||||
let reader = unsafe { Pin::new_unchecked(reader) };
|
||||
let n = futures_core::ready!(read_until_internal(reader, cx, *delim, buf, read))?;
|
||||
if n == 0 && buf.is_empty() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
if buf[buf.len() - 1] == *delim {
|
||||
buf.pop();
|
||||
}
|
||||
Poll::Ready(Some(Ok(mem::replace(buf, vec![]))))
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
//! Internal types for stdio.
|
||||
//!
|
||||
//! This module is a port of `libstd/io/stdio.rs`,and contains internal types for `print`/`eprint`.
|
||||
|
||||
use crate::io::{stderr, stdout};
|
||||
use crate::prelude::*;
|
||||
use std::fmt;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn _print(args: fmt::Arguments<'_>) {
|
||||
if let Err(e) = stdout().write_fmt(args).await {
|
||||
panic!("failed printing to stdout: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn _eprint(args: fmt::Arguments<'_>) {
|
||||
if let Err(e) = stderr().write_fmt(args).await {
|
||||
panic!("failed printing to stderr: {}", e);
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::io::{self, Write};
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct WriteFmtFuture<'a, T: Unpin + ?Sized> {
|
||||
pub(crate) writer: &'a mut T,
|
||||
pub(crate) res: Option<io::Result<Vec<u8>>>,
|
||||
pub(crate) buffer: Option<Vec<u8>>,
|
||||
pub(crate) amt: u64,
|
||||
}
|
||||
|
||||
impl<T: Write + Unpin + ?Sized> Future for WriteFmtFuture<'_, T> {
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// Process the interal Result the first time we run.
|
||||
if self.buffer.is_none() {
|
||||
match self.res.take().unwrap() {
|
||||
Err(err) => return Poll::Ready(Err(err)),
|
||||
Ok(buffer) => self.buffer = Some(buffer),
|
||||
};
|
||||
}
|
||||
|
||||
// Get the types from the future.
|
||||
let Self {
|
||||
writer,
|
||||
amt,
|
||||
buffer,
|
||||
..
|
||||
} = &mut *self;
|
||||
let mut buffer = buffer.as_mut().unwrap();
|
||||
|
||||
// Copy the data from the buffer into the writer until it's done.
|
||||
loop {
|
||||
if *amt == buffer.len() as u64 {
|
||||
futures_core::ready!(Pin::new(&mut **writer).poll_flush(cx))?;
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
let i = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, &mut buffer))?;
|
||||
if i == 0 {
|
||||
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
|
||||
}
|
||||
*amt += i as u64;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,163 @@
|
||||
/// Prints to the standard output.
|
||||
///
|
||||
/// Equivalent to the [`println!`] macro except that a newline is not printed at
|
||||
/// the end of the message.
|
||||
///
|
||||
/// Note that stdout is frequently line-buffered by default so it may be
|
||||
/// necessary to use [`io::stdout().flush()`][flush] to ensure the output is emitted
|
||||
/// immediately.
|
||||
///
|
||||
/// Use `print!` only for the primary output of your program. Use
|
||||
/// [`eprint!`] instead to print error and progress messages.
|
||||
///
|
||||
/// [`println!`]: macro.println.html
|
||||
/// [flush]: io/trait.Write.html#tymethod.flush
|
||||
/// [`eprint!`]: macro.eprint.html
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if writing to `io::stdout()` fails.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::io;
|
||||
/// use async_std::print;
|
||||
///
|
||||
/// print!("this ").await;
|
||||
/// print!("will ").await;
|
||||
/// print!("be ").await;
|
||||
/// print!("on ").await;
|
||||
/// print!("the ").await;
|
||||
/// print!("same ").await;
|
||||
/// print!("line ").await;
|
||||
///
|
||||
/// io::stdout().flush().await.unwrap();
|
||||
///
|
||||
/// print!("this string has a newline, why not choose println! instead?\n").await;
|
||||
///
|
||||
/// io::stdout().flush().await.unwrap();
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[macro_export]
|
||||
macro_rules! print {
|
||||
($($arg:tt)*) => ($crate::io::_print(format_args!($($arg)*)))
|
||||
}
|
||||
|
||||
/// Prints to the standard output, with a newline.
|
||||
///
|
||||
/// On all platforms, the newline is the LINE FEED character (`\n`/`U+000A`) alone
|
||||
/// (no additional CARRIAGE RETURN (`\r`/`U+000D`)).
|
||||
///
|
||||
/// Use the [`format!`] syntax to write data to the standard output.
|
||||
/// See [`std::fmt`] for more information.
|
||||
///
|
||||
/// Use `println!` only for the primary output of your program. Use
|
||||
/// [`eprintln!`] instead to print error and progress messages.
|
||||
///
|
||||
/// [`format!`]: macro.format.html
|
||||
/// [`std::fmt`]: https://doc.rust-lang.org/std/fmt/index.html
|
||||
/// [`eprintln!`]: macro.eprintln.html
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if writing to `io::stdout` fails.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::println;
|
||||
///
|
||||
/// println!().await; // prints just a newline
|
||||
/// println!("hello there!").await;
|
||||
/// println!("format {} arguments", "some").await;
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[macro_export]
|
||||
macro_rules! println {
|
||||
() => ($crate::print!("\n"));
|
||||
($($arg:tt)*) => ($crate::io::_print(format_args!($($arg)*)))
|
||||
}
|
||||
|
||||
/// Prints to the standard error.
|
||||
///
|
||||
/// Equivalent to the [`print!`] macro, except that output goes to
|
||||
/// [`io::stderr`] instead of `io::stdout`. See [`print!`] for
|
||||
/// example usage.
|
||||
///
|
||||
/// Use `eprint!` only for error and progress messages. Use `print!`
|
||||
/// instead for the primary output of your program.
|
||||
///
|
||||
/// [`io::stderr`]: io/struct.Stderr.html
|
||||
/// [`print!`]: macro.print.html
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if writing to `io::stderr` fails.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::eprint;
|
||||
///
|
||||
/// eprint!("Error: Could not complete task").await;
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[macro_export]
|
||||
macro_rules! eprint {
|
||||
($($arg:tt)*) => ($crate::io::_eprint(format_args!($($arg)*)))
|
||||
}
|
||||
|
||||
/// Prints to the standard error, with a newline.
|
||||
///
|
||||
/// Equivalent to the [`println!`] macro, except that output goes to
|
||||
/// [`io::stderr`] instead of `io::stdout`. See [`println!`] for
|
||||
/// example usage.
|
||||
///
|
||||
/// Use `eprintln!` only for error and progress messages. Use `println!`
|
||||
/// instead for the primary output of your program.
|
||||
///
|
||||
/// [`io::stderr`]: io/struct.Stderr.html
|
||||
/// [`println!`]: macro.println.html
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if writing to `io::stderr` fails.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::eprintln;
|
||||
///
|
||||
/// eprintln!("Error: Could not complete task").await;
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[macro_export]
|
||||
macro_rules! eprintln {
|
||||
() => (async { $crate::eprint!("\n").await; });
|
||||
($($arg:tt)*) => (
|
||||
async {
|
||||
$crate::io::_eprint(format_args!($($arg)*)).await;
|
||||
}
|
||||
);
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
use std::iter::FusedIterator;
|
||||
|
||||
use crate::path::Path;
|
||||
|
||||
/// An iterator over [`Path`] and its ancestors.
|
||||
///
|
||||
/// This `struct` is created by the [`ancestors`] method on [`Path`].
|
||||
/// See its documentation for more.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::Path;
|
||||
///
|
||||
/// let path = Path::new("/foo/bar");
|
||||
///
|
||||
/// for ancestor in path.ancestors() {
|
||||
/// println!("{}", ancestor.display());
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// [`ancestors`]: struct.Path.html#method.ancestors
|
||||
/// [`Path`]: struct.Path.html
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct Ancestors<'a> {
|
||||
pub(crate) next: Option<&'a Path>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Ancestors<'a> {
|
||||
type Item = &'a Path;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let next = self.next;
|
||||
self.next = next.and_then(Path::parent);
|
||||
next
|
||||
}
|
||||
}
|
||||
|
||||
impl FusedIterator for Ancestors<'_> {}
|
@ -0,0 +1,29 @@
|
||||
//! Cross-platform path manipulation.
|
||||
//!
|
||||
//! This module is an async version of [`std::path`].
|
||||
//!
|
||||
//! [`std::path`]: https://doc.rust-lang.org/std/path/index.html
|
||||
|
||||
mod ancestors;
|
||||
mod path;
|
||||
mod pathbuf;
|
||||
|
||||
// Structs re-export
|
||||
#[doc(inline)]
|
||||
pub use std::path::{Components, Display, Iter, PrefixComponent, StripPrefixError};
|
||||
|
||||
// Enums re-export
|
||||
#[doc(inline)]
|
||||
pub use std::path::{Component, Prefix};
|
||||
|
||||
// Constants re-export
|
||||
#[doc(inline)]
|
||||
pub use std::path::MAIN_SEPARATOR;
|
||||
|
||||
// Functions re-export
|
||||
#[doc(inline)]
|
||||
pub use std::path::is_separator;
|
||||
|
||||
use ancestors::Ancestors;
|
||||
pub use path::Path;
|
||||
pub use pathbuf::PathBuf;
|
@ -0,0 +1,235 @@
|
||||
use std::ffi::{OsStr, OsString};
|
||||
|
||||
use crate::path::Path;
|
||||
|
||||
/// This struct is an async version of [`std::path::PathBuf`].
|
||||
///
|
||||
/// [`std::path::Path`]: https://doc.rust-lang.org/std/path/struct.PathBuf.html
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct PathBuf {
|
||||
inner: std::path::PathBuf,
|
||||
}
|
||||
|
||||
impl PathBuf {
|
||||
/// Allocates an empty `PathBuf`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::PathBuf;
|
||||
///
|
||||
/// let path = PathBuf::new();
|
||||
/// ```
|
||||
pub fn new() -> PathBuf {
|
||||
std::path::PathBuf::new().into()
|
||||
}
|
||||
|
||||
/// Coerces to a [`Path`] slice.
|
||||
///
|
||||
/// [`Path`]: struct.Path.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::{Path, PathBuf};
|
||||
///
|
||||
/// let p = PathBuf::from("/test");
|
||||
/// assert_eq!(Path::new("/test"), p.as_path());
|
||||
/// ```
|
||||
pub fn as_path(&self) -> &Path {
|
||||
self.inner.as_path().into()
|
||||
}
|
||||
|
||||
/// Extends `self` with `path`.
|
||||
///
|
||||
/// If `path` is absolute, it replaces the current path.
|
||||
///
|
||||
/// On Windows:
|
||||
///
|
||||
/// * if `path` has a root but no prefix (e.g., `\windows`), it
|
||||
/// replaces everything except for the prefix (if any) of `self`.
|
||||
/// * if `path` has a prefix but no root, it replaces `self`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Pushing a relative path extends the existing path:
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::PathBuf;
|
||||
///
|
||||
/// let mut path = PathBuf::from("/tmp");
|
||||
/// path.push("file.bk");
|
||||
/// assert_eq!(path, PathBuf::from("/tmp/file.bk"));
|
||||
/// ```
|
||||
///
|
||||
/// Pushing an absolute path replaces the existing path:
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::PathBuf;
|
||||
///
|
||||
/// let mut path = PathBuf::from("/tmp");
|
||||
/// path.push("/etc");
|
||||
/// assert_eq!(path, PathBuf::from("/etc"));
|
||||
/// ```
|
||||
pub fn push<P: AsRef<Path>>(&mut self, path: P) {
|
||||
self.inner.push(path.as_ref())
|
||||
}
|
||||
|
||||
/// Truncates `self` to [`self.parent`].
|
||||
///
|
||||
/// Returns `false` and does nothing if [`self.parent`] is [`None`].
|
||||
/// Otherwise, returns `true`.
|
||||
///
|
||||
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
|
||||
/// [`self.parent`]: struct.PathBuf.html#method.parent
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::{Path, PathBuf};
|
||||
///
|
||||
/// let mut p = PathBuf::from("/test/test.rs");
|
||||
///
|
||||
/// p.pop();
|
||||
/// assert_eq!(Path::new("/test"), p.as_ref());
|
||||
/// p.pop();
|
||||
/// assert_eq!(Path::new("/"), p.as_ref());
|
||||
/// ```
|
||||
pub fn pop(&mut self) -> bool {
|
||||
self.inner.pop()
|
||||
}
|
||||
|
||||
/// Updates [`self.file_name`] to `file_name`.
|
||||
///
|
||||
/// If [`self.file_name`] was [`None`], this is equivalent to pushing
|
||||
/// `file_name`.
|
||||
///
|
||||
/// Otherwise it is equivalent to calling [`pop`] and then pushing
|
||||
/// `file_name`. The new path will be a sibling of the original path.
|
||||
/// (That is, it will have the same parent.)
|
||||
///
|
||||
/// [`self.file_name`]: struct.PathBuf.html#method.file_name
|
||||
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
|
||||
/// [`pop`]: struct.PathBuf.html#method.pop
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::PathBuf;
|
||||
///
|
||||
/// let mut buf = PathBuf::from("/");
|
||||
/// assert!(buf.file_name() == None);
|
||||
/// buf.set_file_name("bar");
|
||||
/// assert!(buf == PathBuf::from("/bar"));
|
||||
/// assert!(buf.file_name().is_some());
|
||||
/// buf.set_file_name("baz.txt");
|
||||
/// assert!(buf == PathBuf::from("/baz.txt"));
|
||||
/// ```
|
||||
pub fn set_file_name<S: AsRef<OsStr>>(&mut self, file_name: S) {
|
||||
self.inner.set_file_name(file_name)
|
||||
}
|
||||
|
||||
/// Updates [`self.extension`] to `extension`.
|
||||
///
|
||||
/// Returns `false` and does nothing if [`self.file_name`] is [`None`],
|
||||
/// returns `true` and updates the extension otherwise.
|
||||
///
|
||||
/// If [`self.extension`] is [`None`], the extension is added; otherwise
|
||||
/// it is replaced.
|
||||
///
|
||||
/// [`self.file_name`]: struct.PathBuf.html#method.file_name
|
||||
/// [`self.extension`]: struct.PathBuf.html#method.extension
|
||||
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::{Path, PathBuf};
|
||||
///
|
||||
/// let mut p = PathBuf::from("/feel/the");
|
||||
///
|
||||
/// p.set_extension("force");
|
||||
/// assert_eq!(Path::new("/feel/the.force"), p.as_path());
|
||||
///
|
||||
/// p.set_extension("dark_side");
|
||||
/// assert_eq!(Path::new("/feel/the.dark_side"), p.as_path());
|
||||
/// ```
|
||||
pub fn set_extension<S: AsRef<OsStr>>(&mut self, extension: S) -> bool {
|
||||
self.inner.set_extension(extension)
|
||||
}
|
||||
|
||||
/// Consumes the `PathBuf`, yielding its internal [`OsString`] storage.
|
||||
///
|
||||
/// [`OsString`]: https://doc.rust-lang.org/std/ffi/struct.OsString.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::path::PathBuf;
|
||||
///
|
||||
/// let p = PathBuf::from("/the/head");
|
||||
/// let os_str = p.into_os_string();
|
||||
/// ```
|
||||
pub fn into_os_string(self) -> OsString {
|
||||
self.inner.into_os_string()
|
||||
}
|
||||
|
||||
/// Converts this `PathBuf` into a [boxed][`Box`] [`Path`].
|
||||
///
|
||||
/// [`Box`]: https://doc.rust-lang.org/std/boxed/struct.Box.html
|
||||
/// [`Path`]: struct.Path.html
|
||||
pub fn into_boxed_path(self) -> Box<Path> {
|
||||
let rw = Box::into_raw(self.inner.into_boxed_path()) as *mut Path;
|
||||
unsafe { Box::from_raw(rw) }
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PathBuf {
|
||||
type Target = Path;
|
||||
|
||||
fn deref(&self) -> &Path {
|
||||
self.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::borrow::Borrow<Path> for PathBuf {
|
||||
fn borrow(&self) -> &Path {
|
||||
&**self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::path::PathBuf> for PathBuf {
|
||||
fn from(path: std::path::PathBuf) -> PathBuf {
|
||||
PathBuf { inner: path }
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<std::path::PathBuf> for PathBuf {
|
||||
fn into(self) -> std::path::PathBuf {
|
||||
self.inner.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OsString> for PathBuf {
|
||||
fn from(path: OsString) -> PathBuf {
|
||||
std::path::PathBuf::from(path).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for PathBuf {
|
||||
fn from(path: &str) -> PathBuf {
|
||||
std::path::PathBuf::from(path).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Path> for PathBuf {
|
||||
fn as_ref(&self) -> &Path {
|
||||
Path::new(&self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<std::path::Path> for PathBuf {
|
||||
fn as_ref(&self) -> &std::path::Path {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
//! A module for working with processes.
|
||||
//!
|
||||
//! This module is mostly concerned with spawning and interacting with child processes, but it also
|
||||
//! provides abort and exit for terminating the current process.
|
||||
//!
|
||||
//! This is an async version of [`std::process`].
|
||||
//!
|
||||
//! [`std::process`]: https://doc.rust-lang.org/std/process/index.html
|
||||
|
||||
// Re-export structs.
|
||||
pub use std::process::{ExitStatus, Output};
|
||||
|
||||
// Re-export functions.
|
||||
pub use std::process::{abort, exit, id};
|
@ -0,0 +1,120 @@
|
||||
pub use crate::stream::Stream;
|
||||
|
||||
/// A stream that knows its exact length.
|
||||
///
|
||||
/// Many [`Stream`]s don't know how many times they will iterate, but some do.
|
||||
/// If a stream knows how many times it can iterate, providing access to
|
||||
/// that information can be useful. For example, if you want to iterate
|
||||
/// backwards, a good start is to know where the end is.
|
||||
///
|
||||
/// When implementing an `ExactSizeStream`, you must also implement
|
||||
/// [`Stream`]. When doing so, the implementation of [`size_hint`] *must*
|
||||
/// return the exact size of the stream.
|
||||
///
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
/// [`size_hint`]: trait.Stream.html#method.size_hint
|
||||
///
|
||||
/// The [`len`] method has a default implementation, so you usually shouldn't
|
||||
/// implement it. However, you may be able to provide a more performant
|
||||
/// implementation than the default, so overriding it in this case makes sense.
|
||||
///
|
||||
/// [`len`]: #method.len
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic usage:
|
||||
///
|
||||
/// ```
|
||||
/// // a finite range knows exactly how many times it will iterate
|
||||
/// let five = 0..5;
|
||||
///
|
||||
/// assert_eq!(5, five.len());
|
||||
/// ```
|
||||
///
|
||||
/// In the [module level docs][moddocs], we implemented an [`Stream`],
|
||||
/// `Counter`. Let's implement `ExactSizeStream` for it as well:
|
||||
///
|
||||
/// [moddocs]: index.html
|
||||
///
|
||||
/// ```
|
||||
/// # use std::task::{Context, Poll};
|
||||
/// # use std::pin::Pin;
|
||||
/// # use async_std::prelude::*;
|
||||
/// # struct Counter {
|
||||
/// # count: usize,
|
||||
/// # }
|
||||
/// # impl Counter {
|
||||
/// # fn new() -> Counter {
|
||||
/// # Counter { count: 0 }
|
||||
/// # }
|
||||
/// # }
|
||||
/// # impl Stream for Counter {
|
||||
/// # type Item = usize;
|
||||
/// # fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
/// # self.count += 1;
|
||||
/// # if self.count < 6 {
|
||||
/// # Poll::Ready(Some(self.count))
|
||||
/// # } else {
|
||||
/// # Poll::Ready(None)
|
||||
/// # }
|
||||
/// # }
|
||||
/// # }
|
||||
/// # fn main() { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// impl ExactSizeStream for Counter {
|
||||
/// // We can easily calculate the remaining number of iterations.
|
||||
/// fn len(&self) -> usize {
|
||||
/// 5 - self.count
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// // And now we can use it!
|
||||
///
|
||||
/// let counter = Counter::new();
|
||||
///
|
||||
/// assert_eq!(5, counter.len());
|
||||
/// # });
|
||||
/// # }
|
||||
/// ```
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
pub trait ExactSizeStream: Stream {
|
||||
/// Returns the exact number of times the stream will iterate.
|
||||
///
|
||||
/// This method has a default implementation, so you usually should not
|
||||
/// implement it directly. However, if you can provide a more efficient
|
||||
/// implementation, you can do so. See the [trait-level] docs for an
|
||||
/// example.
|
||||
///
|
||||
/// This function has the same safety guarantees as the [`size_hint`]
|
||||
/// function.
|
||||
///
|
||||
/// [trait-level]: trait.ExactSizeStream.html
|
||||
/// [`size_hint`]: trait.Stream.html#method.size_hint
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic usage:
|
||||
///
|
||||
/// ```
|
||||
/// // a finite range knows exactly how many times it will iterate
|
||||
/// let five = 0..5;
|
||||
///
|
||||
/// assert_eq!(5, five.len());
|
||||
/// ```
|
||||
fn len(&self) -> usize {
|
||||
let (lower, upper) = self.size_hint();
|
||||
// Note: This assertion is overly defensive, but it checks the invariant
|
||||
// guaranteed by the trait. If this trait were rust-internal,
|
||||
// we could use debug_assert!; assert_eq! will check all Rust user
|
||||
// implementations too.
|
||||
assert_eq!(upper, Some(lower));
|
||||
lower
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: ExactSizeStream + ?Sized + Unpin> ExactSizeStream for &mut I {
|
||||
fn len(&self) -> usize {
|
||||
(**self).len()
|
||||
}
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A stream that yields elements by calling a closure.
|
||||
///
|
||||
/// This stream is constructed by [`from_fn`] function.
|
||||
///
|
||||
/// [`from_fn`]: fn.from_fn.html
|
||||
#[derive(Debug)]
|
||||
pub struct FromFn<F, Fut, T> {
|
||||
f: F,
|
||||
future: Option<Fut>,
|
||||
__t: PhantomData<T>,
|
||||
}
|
||||
|
||||
/// Creates a new stream where to produce each new element a provided closure is called.
|
||||
///
|
||||
/// This allows creating a custom stream with any behaviour without using the more verbose
|
||||
/// syntax of creating a dedicated type and implementing a `Stream` trait for it.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # fn main() { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::sync::Mutex;
|
||||
/// use std::sync::Arc;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let count = Arc::new(Mutex::new(0u8));
|
||||
/// let s = stream::from_fn(|| {
|
||||
/// let count = Arc::clone(&count);
|
||||
///
|
||||
/// async move {
|
||||
/// *count.lock().await += 1;
|
||||
///
|
||||
/// if *count.lock().await > 3 {
|
||||
/// None
|
||||
/// } else {
|
||||
/// Some(*count.lock().await)
|
||||
/// }
|
||||
/// }
|
||||
/// });
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(2));
|
||||
/// assert_eq!(s.next().await, Some(3));
|
||||
/// assert_eq!(s.next().await, None);
|
||||
/// #
|
||||
/// # }) }
|
||||
///
|
||||
/// ```
|
||||
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Option<T>>,
|
||||
{
|
||||
FromFn {
|
||||
f,
|
||||
future: None,
|
||||
__t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Fut, T> FromFn<F, Fut, T> {
|
||||
pin_utils::unsafe_unpinned!(f: F);
|
||||
pin_utils::unsafe_pinned!(future: Option<Fut>);
|
||||
}
|
||||
|
||||
impl<F, Fut, T> Stream for FromFn<F, Fut, T>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Option<T>>,
|
||||
{
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
match &self.future {
|
||||
Some(_) => {
|
||||
let next =
|
||||
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
|
||||
self.as_mut().future().set(None);
|
||||
|
||||
return Poll::Ready(next);
|
||||
}
|
||||
None => {
|
||||
let fut = (self.as_mut().f())();
|
||||
self.as_mut().future().set(Some(fut));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
use crate::stream::Stream;
|
||||
|
||||
/// A stream that always continues to yield `None` when exhausted.
|
||||
///
|
||||
/// Calling next on a fused stream that has returned `None` once is guaranteed
|
||||
/// to return [`None`] again. This trait should be implemented by all streams
|
||||
/// that behave this way because it allows optimizing [`Stream::fuse`].
|
||||
///
|
||||
/// Note: In general, you should not use `FusedStream` in generic bounds if
|
||||
/// you need a fused stream. Instead, you should just call [`Stream::fuse`]
|
||||
/// on the stream. If the stream is already fused, the additional [`Fuse`]
|
||||
/// wrapper will be a no-op with no performance penalty.
|
||||
///
|
||||
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
|
||||
/// [`Stream::fuse`]: trait.Stream.html#method.fuse
|
||||
/// [`Fuse`]: struct.Fuse.html
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait FusedStream: Stream {}
|
||||
|
||||
impl<S: FusedStream + ?Sized + Unpin> FusedStream for &mut S {}
|
@ -0,0 +1,199 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures_core::future::Future;
|
||||
use futures_core::stream::Stream;
|
||||
use pin_utils::unsafe_pinned;
|
||||
|
||||
use futures_timer::Delay;
|
||||
|
||||
/// Creates a new stream that yields at a set interval.
|
||||
///
|
||||
/// The stream first yields after `dur`, and continues to yield every
|
||||
/// `dur` after that. The stream accounts for time elapsed between calls, and
|
||||
/// will adjust accordingly to prevent time skews.
|
||||
///
|
||||
/// Each interval may be slightly longer than the specified duration, but never
|
||||
/// less.
|
||||
///
|
||||
/// Note that intervals are not intended for high resolution timers, but rather
|
||||
/// they will likely fire some granularity after the exact instant that they're
|
||||
/// otherwise indicated to fire at.
|
||||
///
|
||||
/// See also: [`task::sleep`].
|
||||
///
|
||||
/// [`task::sleep`]: ../task/fn.sleep.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic example:
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// let mut interval = stream::interval(Duration::from_secs(4));
|
||||
/// while let Some(_) = interval.next().await {
|
||||
/// println!("prints every four seconds");
|
||||
/// }
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[doc(inline)]
|
||||
pub fn interval(dur: Duration) -> Interval {
|
||||
Interval {
|
||||
delay: Delay::new(dur),
|
||||
interval: dur,
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream representing notifications at fixed interval
|
||||
///
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[doc(inline)]
|
||||
pub struct Interval {
|
||||
delay: Delay,
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
impl Interval {
|
||||
unsafe_pinned!(delay: Delay);
|
||||
}
|
||||
|
||||
impl Stream for Interval {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if Pin::new(&mut *self).delay().poll(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
let when = Instant::now();
|
||||
let next = next_interval(when, Instant::now(), self.interval);
|
||||
self.delay.reset(next);
|
||||
Poll::Ready(Some(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts Duration object to raw nanoseconds if possible
|
||||
///
|
||||
/// This is useful to divide intervals.
|
||||
///
|
||||
/// While technically for large duration it's impossible to represent any
|
||||
/// duration as nanoseconds, the largest duration we can represent is about
|
||||
/// 427_000 years. Large enough for any interval we would use or calculate in
|
||||
/// tokio.
|
||||
fn duration_to_nanos(dur: Duration) -> Option<u64> {
|
||||
dur.as_secs()
|
||||
.checked_mul(1_000_000_000)
|
||||
.and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
|
||||
}
|
||||
|
||||
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
|
||||
let new = prev + interval;
|
||||
if new > now {
|
||||
return new;
|
||||
}
|
||||
|
||||
let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
|
||||
let interval_ns =
|
||||
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
|
||||
let mult = spent_ns / interval_ns + 1;
|
||||
assert!(
|
||||
mult < (1 << 32),
|
||||
"can't skip more than 4 billion intervals of {:?} \
|
||||
(trying to skip {})",
|
||||
interval,
|
||||
mult
|
||||
);
|
||||
prev + interval * (mult as u32)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::next_interval;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct Timeline(Instant);
|
||||
|
||||
impl Timeline {
|
||||
fn new() -> Timeline {
|
||||
Timeline(Instant::now())
|
||||
}
|
||||
fn at(&self, millis: u64) -> Instant {
|
||||
self.0 + Duration::from_millis(millis)
|
||||
}
|
||||
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
|
||||
self.0 + Duration::new(sec, nanos)
|
||||
}
|
||||
}
|
||||
|
||||
fn dur(millis: u64) -> Duration {
|
||||
Duration::from_millis(millis)
|
||||
}
|
||||
|
||||
// The math around Instant/Duration isn't 100% precise due to rounding
|
||||
// errors, see #249 for more info
|
||||
fn almost_eq(a: Instant, b: Instant) -> bool {
|
||||
if a == b {
|
||||
true
|
||||
} else if a > b {
|
||||
a - b < Duration::from_millis(1)
|
||||
} else {
|
||||
b - a < Duration::from_millis(1)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn norm_next() {
|
||||
let tm = Timeline::new();
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(2), dur(10)),
|
||||
tm.at(11)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(7777), tm.at(7788), dur(100)),
|
||||
tm.at(7877)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(1000), dur(2100)),
|
||||
tm.at(2101)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fast_forward() {
|
||||
let tm = Timeline::new();
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(1000), dur(10)),
|
||||
tm.at(1001)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(7777), tm.at(8888), dur(100)),
|
||||
tm.at(8977)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
next_interval(tm.at(1), tm.at(10000), dur(2100)),
|
||||
tm.at(10501)
|
||||
));
|
||||
}
|
||||
|
||||
/// TODO: this test actually should be successful, but since we can't
|
||||
/// multiply Duration on anything larger than u32 easily we decided
|
||||
/// to allow it to fail for now
|
||||
#[test]
|
||||
#[should_panic(expected = "can't skip more than 4 billion intervals")]
|
||||
fn large_skip() {
|
||||
let tm = Timeline::new();
|
||||
assert_eq!(
|
||||
next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
|
||||
tm.at_ns(25, 1)
|
||||
);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
|
||||
/// Trait to represent types that can be created by productming up a stream.
|
||||
///
|
||||
/// This trait is used to implement the [`product`] method on streams. Types which
|
||||
/// implement the trait can be generated by the [`product`] method. Like
|
||||
/// [`FromStream`] this trait should rarely be called directly and instead
|
||||
/// interacted with through [`Stream::product`].
|
||||
///
|
||||
/// [`product`]: trait.Product.html#tymethod.product
|
||||
/// [`FromStream`]: trait.FromStream.html
|
||||
/// [`Stream::product`]: trait.Stream.html#method.product
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
pub trait Product<A = Self>: Sized {
|
||||
/// Method which takes a stream and generates `Self` from the elements by
|
||||
/// multiplying the items.
|
||||
fn product<S, F>(stream: S) -> F
|
||||
where
|
||||
S: Stream<Item = A>,
|
||||
F: Future<Output = Self>;
|
||||
}
|
@ -0,0 +1,91 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::fuse::Fuse;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
// Lexicographically compares the elements of this `Stream` with those
|
||||
// of another using `Ord`.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct CmpFuture<L: Stream, R: Stream> {
|
||||
l: Fuse<L>,
|
||||
r: Fuse<R>,
|
||||
l_cache: Option<L::Item>,
|
||||
r_cache: Option<R::Item>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> CmpFuture<L, R> {
|
||||
pin_utils::unsafe_pinned!(l: Fuse<L>);
|
||||
pin_utils::unsafe_pinned!(r: Fuse<R>);
|
||||
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
|
||||
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
CmpFuture {
|
||||
l: l.fuse(),
|
||||
r: r.fuse(),
|
||||
l_cache: None,
|
||||
r_cache: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for CmpFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream<Item = L::Item> + Sized,
|
||||
L::Item: Ord,
|
||||
{
|
||||
type Output = Ordering;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
// Stream that completes earliest can be considered Less, etc
|
||||
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
|
||||
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
|
||||
|
||||
if l_complete && r_complete {
|
||||
return Poll::Ready(Ordering::Equal);
|
||||
} else if l_complete {
|
||||
return Poll::Ready(Ordering::Less);
|
||||
} else if r_complete {
|
||||
return Poll::Ready(Ordering::Greater);
|
||||
}
|
||||
|
||||
// Get next value if possible and necesary
|
||||
if !self.l.done && self.as_mut().l_cache.is_none() {
|
||||
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
|
||||
if let Some(item) = l_next {
|
||||
*self.as_mut().l_cache() = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.r.done && self.as_mut().r_cache.is_none() {
|
||||
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
|
||||
if let Some(item) = r_next {
|
||||
*self.as_mut().r_cache() = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
// Compare if both values are available.
|
||||
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
|
||||
let l_value = self.as_mut().l_cache().take().unwrap();
|
||||
let r_value = self.as_mut().r_cache().take().unwrap();
|
||||
let result = l_value.cmp(&r_value);
|
||||
|
||||
if let Ordering::Equal = result {
|
||||
// Reset cache to prepare for next comparison
|
||||
*self.as_mut().l_cache() = None;
|
||||
*self.as_mut().r_cache() = None;
|
||||
} else {
|
||||
// Return non equal value
|
||||
return Poll::Ready(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// greater than or equal to those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct GeFuture<L: Stream, R: Stream> {
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> GeFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
GeFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for GeFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Greater) | Some(Ordering::Equal) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// greater than those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct GtFuture<L: Stream, R: Stream> {
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> GtFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
GtFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for GtFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Greater) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LastFuture<S, T> {
|
||||
stream: S,
|
||||
last: Option<T>,
|
||||
}
|
||||
|
||||
impl<S, T> LastFuture<S, T> {
|
||||
pin_utils::unsafe_pinned!(stream: S);
|
||||
pin_utils::unsafe_unpinned!(last: Option<T>);
|
||||
|
||||
pub(crate) fn new(stream: S) -> Self {
|
||||
LastFuture { stream, last: None }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for LastFuture<S, S::Item>
|
||||
where
|
||||
S: Stream + Unpin + Sized,
|
||||
S::Item: Copy,
|
||||
{
|
||||
type Output = Option<S::Item>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(new) => {
|
||||
cx.waker().wake_by_ref();
|
||||
*self.as_mut().last() = Some(new);
|
||||
Poll::Pending
|
||||
}
|
||||
None => Poll::Ready(self.last),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// Determines if the elements of this `Stream` are lexicographically
|
||||
/// less or equal to those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LeFuture<L: Stream, R: Stream> {
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> LeFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
LeFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for LeFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Less) | Some(Ordering::Equal) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::partial_cmp::PartialCmpFuture;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
// Determines if the elements of this `Stream` are lexicographically
|
||||
// less than those of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct LtFuture<L: Stream, R: Stream> {
|
||||
partial_cmp: PartialCmpFuture<L, R>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> LtFuture<L, R>
|
||||
where
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
LtFuture {
|
||||
partial_cmp: l.partial_cmp(r),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for LtFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
|
||||
|
||||
match result {
|
||||
Some(Ordering::Less) => Poll::Ready(true),
|
||||
_ => Poll::Ready(false),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures_core::Stream;
|
||||
|
||||
/// A stream that merges two other streams into a single stream.
|
||||
///
|
||||
/// This stream is returned by [`Stream::merge`].
|
||||
///
|
||||
/// [`Stream::merge`]: trait.Stream.html#method.merge
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug)]
|
||||
pub struct Merge<L, R> {
|
||||
left: L,
|
||||
right: R,
|
||||
}
|
||||
|
||||
impl<L, R> Unpin for Merge<L, R> {}
|
||||
|
||||
impl<L, R> Merge<L, R> {
|
||||
pub(crate) fn new(left: L, right: R) -> Self {
|
||||
Self { left, right }
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R, T> Stream for Merge<L, R>
|
||||
where
|
||||
L: Stream<Item = T> + Unpin,
|
||||
R: Stream<Item = T> + Unpin,
|
||||
{
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
|
||||
// The first stream made progress. The Merge needs to be polled
|
||||
// again to check the progress of the second stream.
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Ready(Some(item))
|
||||
} else {
|
||||
Pin::new(&mut self.right).poll_next(cx)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
|
||||
use super::fuse::Fuse;
|
||||
use crate::future::Future;
|
||||
use crate::prelude::*;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
// Lexicographically compares the elements of this `Stream` with those
|
||||
// of another.
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct PartialCmpFuture<L: Stream, R: Stream> {
|
||||
l: Fuse<L>,
|
||||
r: Fuse<R>,
|
||||
l_cache: Option<L::Item>,
|
||||
r_cache: Option<R::Item>,
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
|
||||
pin_utils::unsafe_pinned!(l: Fuse<L>);
|
||||
pin_utils::unsafe_pinned!(r: Fuse<R>);
|
||||
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
|
||||
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
|
||||
|
||||
pub(super) fn new(l: L, r: R) -> Self {
|
||||
PartialCmpFuture {
|
||||
l: l.fuse(),
|
||||
r: r.fuse(),
|
||||
l_cache: None,
|
||||
r_cache: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
|
||||
where
|
||||
L: Stream + Sized,
|
||||
R: Stream + Sized,
|
||||
L::Item: PartialOrd<R::Item>,
|
||||
{
|
||||
type Output = Option<Ordering>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
// Short circuit logic
|
||||
// Stream that completes earliest can be considered Less, etc
|
||||
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
|
||||
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
|
||||
|
||||
if l_complete && r_complete {
|
||||
return Poll::Ready(Some(Ordering::Equal));
|
||||
} else if l_complete {
|
||||
return Poll::Ready(Some(Ordering::Less));
|
||||
} else if r_complete {
|
||||
return Poll::Ready(Some(Ordering::Greater));
|
||||
}
|
||||
|
||||
// Get next value if possible and necesary
|
||||
if !self.l.done && self.as_mut().l_cache.is_none() {
|
||||
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
|
||||
if let Some(item) = l_next {
|
||||
*self.as_mut().l_cache() = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.r.done && self.as_mut().r_cache.is_none() {
|
||||
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
|
||||
if let Some(item) = r_next {
|
||||
*self.as_mut().r_cache() = Some(item);
|
||||
}
|
||||
}
|
||||
|
||||
// Compare if both values are available.
|
||||
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
|
||||
let l_value = self.as_mut().l_cache().take().unwrap();
|
||||
let r_value = self.as_mut().r_cache().take().unwrap();
|
||||
let result = l_value.partial_cmp(&r_value);
|
||||
|
||||
if let Some(Ordering::Equal) = result {
|
||||
// Reset cache to prepare for next comparison
|
||||
*self.as_mut().l_cache() = None;
|
||||
*self.as_mut().r_cache() = None;
|
||||
} else {
|
||||
// Return non equal value
|
||||
return Poll::Ready(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A stream that yields elements based on a predicate.
|
||||
#[derive(Debug)]
|
||||
pub struct TakeWhile<S, P, T> {
|
||||
stream: S,
|
||||
predicate: P,
|
||||
__t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<S, P, T> TakeWhile<S, P, T> {
|
||||
pin_utils::unsafe_pinned!(stream: S);
|
||||
pin_utils::unsafe_unpinned!(predicate: P);
|
||||
|
||||
pub(super) fn new(stream: S, predicate: P) -> Self {
|
||||
TakeWhile {
|
||||
stream,
|
||||
predicate,
|
||||
__t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, P> Stream for TakeWhile<S, P, S::Item>
|
||||
where
|
||||
S: Stream,
|
||||
P: FnMut(&S::Item) -> bool,
|
||||
{
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)),
|
||||
Some(_) => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct TryFoldFuture<S, F, T> {
|
||||
stream: S,
|
||||
f: F,
|
||||
acc: Option<T>,
|
||||
__t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<S, F, T> TryFoldFuture<S, F, T> {
|
||||
pin_utils::unsafe_pinned!(stream: S);
|
||||
pin_utils::unsafe_unpinned!(f: F);
|
||||
pin_utils::unsafe_unpinned!(acc: Option<T>);
|
||||
|
||||
pub(super) fn new(stream: S, init: T, f: F) -> Self {
|
||||
TryFoldFuture {
|
||||
stream,
|
||||
f,
|
||||
acc: Some(init),
|
||||
__t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F, T, E> Future for TryFoldFuture<S, F, T>
|
||||
where
|
||||
S: Stream + Sized,
|
||||
F: FnMut(T, S::Item) -> Result<T, E>,
|
||||
{
|
||||
type Output = Result<T, E>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(v) => {
|
||||
let old = self.as_mut().acc().take().unwrap();
|
||||
let new = (self.as_mut().f())(old, v);
|
||||
|
||||
match new {
|
||||
Ok(o) => {
|
||||
*self.as_mut().acc() = Some(o);
|
||||
}
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
None => return Poll::Ready(Ok(self.as_mut().acc().take().unwrap())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
|
||||
/// Trait to represent types that can be created by summing up a stream.
|
||||
///
|
||||
/// This trait is used to implement the [`sum`] method on streams. Types which
|
||||
/// implement the trait can be generated by the [`sum`] method. Like
|
||||
/// [`FromStream`] this trait should rarely be called directly and instead
|
||||
/// interacted with through [`Stream::sum`].
|
||||
///
|
||||
/// [`sum`]: trait.Sum.html#tymethod.sum
|
||||
/// [`FromStream`]: trait.FromStream.html
|
||||
/// [`Stream::sum`]: trait.Stream.html#method.sum
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
pub trait Sum<A = Self>: Sized {
|
||||
/// Method which takes a stream and generates `Self` from the elements by
|
||||
/// "summing up" the items.
|
||||
fn sum<S, F>(stream: S) -> F
|
||||
where
|
||||
S: Stream<Item = A>,
|
||||
F: Future<Output = Self>;
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue