Merge remote-tracking branch 'upstream/master' into 342-stream-throttle

This commit is contained in:
Wouter Geraedts 2019-11-12 14:51:24 +01:00
commit a722de1a10
40 changed files with 407 additions and 465 deletions

View file

@ -7,6 +7,60 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview
## [Unreleased]
# [1.0.1] - 2019-11-12
[API Documentation](https://docs.rs/async-std/1.0.1/async-std)
We were seeing a regression in our fs performance, caused by too many
long-running tasks. This patch fixes that regression by being more proactive
about closing down idle threads.
## Changes
- Improved thread startup/shutdown algorithm in `task::spawn_blocking`.
- Fixed a typo in the tutorial.
# [1.0.0] - 2019-11-11
[API Documentation](https://docs.rs/async-std/1.0.0/async-std)
This release marks the `1.0.0` release of async-std; a major milestone for our
development. This release itself mostly includes quality of life improvements
for all of modules, including more consistent API bounds for a lot of our
submodules.
The biggest change is that we're now using the full semver range,
`major.minor.patch`, and any breaking changes to our "stable" APIs will require
an update of the `major` number.
We're excited we've hit this milestone together with you all. Thank you!
## Added
- Added `Future::join` as "unstable", replacing `future::join!`.
- Added `Future::try_join` as "unstable", replacing `future::try_join!`.
- Enabled `stable` and `beta` channel testing on CI.
- Implemented `FromIterator` and `Extend` for `PathBuf`.
- Implemented `FromStream` for `PathBuf`.
- Loosened the trait bounds of `io::copy` on "unstable".
## Changed
- Added a `Sync` bound to `RwLock`, resolving a memory safety issue.
- Fixed a bug in `Stream::take_while` where it could continue after it should've
ended.
- Fixed a bug where our `attributes` Cargo feature wasn't working as intended.
- Improved documentation of `Stream::merge`, documenting ordering guarantees.
- Update doc imports in examples to prefer async-std's types.
- Various quality of life improvements to the `future` submodule.
- Various quality of life improvements to the `path` submodule.
- Various quality of life improvements to the `stream` submodule.
## Removed
- Removed `future::join!` in favor of `Future::join`.
- Removed `future::try_join!` in favor of `Future::try_join`.
# [0.99.12] - 2019-11-07
[API Documentation](https://docs.rs/async-std/0.99.12/async-std)
@ -388,8 +442,11 @@ task::blocking(async {
- Initial beta release
[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.11...HEAD
[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11
[Unreleased]: https://github.com/async-rs/async-std/compare/v1.0.1...HEAD
[1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1
[1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0
[0.99.12]: https://github.com/async-rs/async-std/compare/v0.99.11...v0.99.12
[0.99.11]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11
[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10
[0.99.9]: https://github.com/async-rs/async-std/compare/v0.99.8...v0.99.9
[0.99.8]: https://github.com/async-rs/async-std/compare/v0.99.7...v0.99.8

View file

@ -1,6 +1,6 @@
[package]
name = "async-std"
version = "0.99.12"
version = "1.0.1"
authors = [
"Stjepan Glavina <stjepang@gmail.com>",
"Yoshua Wuyts <yoshuawuyts@gmail.com>",

View file

@ -50,6 +50,6 @@ Add the following lines to `Cargo.toml`:
```toml
[dependencies]
futures-preview = { version = "0.3.0-alpha.19", features = [ "async-await" ] }
async-std = "0.99"
futures = "0.3.0"
async-std = "1.0.0"
```

View file

@ -19,8 +19,8 @@
//! [`File`]s:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::fs::File;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
@ -47,9 +47,9 @@
//! coming from:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::SeekFrom;
//! use async_std::fs::File;
//! use async_std::io::SeekFrom;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
@ -82,9 +82,9 @@
//! methods to any reader:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
@ -104,9 +104,9 @@
//! to [`write`][`Write::write`]:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufWriter;
//! use async_std::fs::File;
//! use async_std::io::BufWriter;
//! use async_std::io::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
@ -179,9 +179,9 @@
//! lines:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #

View file

@ -154,7 +154,7 @@
//!
//! ```toml
//! [dependencies.async-std]
//! version = "0.99"
//! version = "1.0.0"
//! features = ["unstable"]
//! ```
//!
@ -167,7 +167,7 @@
//!
//! ```toml
//! [dependencies.async-std]
//! version = "0.99"
//! version = "1.0.0"
//! features = ["attributes"]
//! ```
//!
@ -176,7 +176,7 @@
//!
//! ```toml
//! [dependencies.async-std]
//! version = "0.99"
//! version = "1.0.0"
//! default-features = false
//! features = ["std"]
//! ```

View file

@ -65,10 +65,10 @@ pub trait Extend<A> {
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub async fn extend<'a, C, A, T>(collection: &mut C, stream: T)
pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
where
C: Extend<A>,
T: IntoStream<Item = A> + 'a,
C: Extend<T>,
S: IntoStream<Item = T> + 'a,
{
Extend::extend(collection, stream).await
}

View file

@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Debug)]
pub struct FromFn<F, Fut, T> {
f: F,
#[pin]
future: Option<Fut>,
__t: PhantomData<T>,
}
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Clone, Debug)]
pub struct FromFn<F> {
f: F,
}
impl<F> Unpin for FromFn<F> {}
/// Creates a new stream where to produce each new element a provided closure is called.
///
/// This allows creating a custom stream with any behaviour without using the more verbose
@ -34,22 +27,15 @@ pin_project! {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::sync::Mutex;
/// use std::sync::Arc;
/// use async_std::stream;
///
/// let count = Arc::new(Mutex::new(0u8));
/// let mut count = 0u8;
/// let s = stream::from_fn(|| {
/// let count = Arc::clone(&count);
///
/// async move {
/// *count.lock().await += 1;
///
/// if *count.lock().await > 3 {
/// None
/// } else {
/// Some(*count.lock().await)
/// }
/// count += 1;
/// if count > 3 {
/// None
/// } else {
/// Some(count)
/// }
/// });
///
@ -61,38 +47,21 @@ pin_project! {
/// #
/// # })
/// ```
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
pub fn from_fn<T, F>(f: F) -> FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
FromFn {
f,
future: None,
__t: PhantomData,
}
FromFn { f }
}
impl<F, Fut, T> Stream for FromFn<F, Fut, T>
impl<T, F> Stream for FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let next =
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);
return Poll::Ready(next);
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(item)
}
}

View file

@ -6,7 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream that created from iterator
/// A stream that was created from iterator.
///
/// This stream is created by the [`from_iter`] function.
/// See it documentation for more.

View file

@ -306,9 +306,7 @@ pub use from_iter::{from_iter, FromIter};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use repeat_with::{repeat_with, RepeatWith};
pub use stream::{
Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip,
};
pub use stream::*;
pub(crate) mod stream;

View file

@ -33,7 +33,7 @@ pin_project! {
/// documentation for more.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Once<T> {
value: Option<T>,
}

View file

@ -33,7 +33,7 @@ where
/// documentation for more.
///
/// [`repeat`]: fn.repeat.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Repeat<T> {
item: T,
}

View file

@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)]
pub struct RepeatWith<F, Fut, A> {
f: F,
#[pin]
future: Option<Fut>,
__a: PhantomData<A>,
}
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Clone, Debug)]
pub struct RepeatWith<F> {
f: F,
}
impl<F> Unpin for RepeatWith<F> {}
/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
///
/// # Examples
@ -35,7 +28,7 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1 });
/// let s = stream::repeat_with(|| 1);
///
/// pin_utils::pin_mut!(s);
///
@ -54,48 +47,38 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1u8 }).take(2);
/// let mut n = 1;
/// let s = stream::repeat_with(|| {
/// let item = n;
/// n *= 2;
/// item
/// })
/// .take(4);
///
/// pin_utils::pin_mut!(s);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A>
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
RepeatWith {
f: repeater,
future: None,
__a: PhantomData,
}
RepeatWith { f: repeater }
}
impl<F, Fut, A> Stream for RepeatWith<F, Fut, A>
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
type Item = A;
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);
return Poll::Ready(Some(res));
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
}

View file

@ -7,7 +7,7 @@ use crate::prelude::*;
use crate::task::{Context, Poll};
pin_project! {
/// Chains two streams one after another.
/// A stream that chains two streams one after another.
///
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its
/// documentation for more.

View file

@ -4,6 +4,7 @@ use pin_project_lite::pin_project;
use std::pin::Pin;
pin_project! {
/// A stream that clones the elements of an underlying stream.
#[derive(Debug)]
pub struct Cloned<S> {
#[pin]

View file

@ -4,8 +4,8 @@ use pin_project_lite::pin_project;
use std::pin::Pin;
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
/// A stream that copies the elements of an underlying stream.
#[derive(Debug)]
pub struct Copied<S> {
#[pin]
stream: S,

View file

@ -1,73 +1,54 @@
use std::mem::ManuallyDrop;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream that will repeatedly yield the same list of elements
pub struct Cycle<S, T> {
#[pin]
source: S,
index: usize,
buffer: Vec<T>,
state: CycleState,
}
/// A stream that will repeatedly yield the same list of elements.
#[derive(Debug)]
pub struct Cycle<S> {
orig: S,
source: ManuallyDrop<S>,
}
#[derive(Eq, PartialEq)]
enum CycleState {
FromStream,
FromBuffer,
}
impl<S> Cycle<S, S::Item>
impl<S> Cycle<S>
where
S: Stream,
S::Item: Clone,
S: Stream + Clone,
{
pub fn new(source: S) -> Cycle<S, S::Item> {
pub fn new(source: S) -> Cycle<S> {
Cycle {
source,
index: 0,
buffer: Vec::new(),
state: CycleState::FromStream,
orig: source.clone(),
source: ManuallyDrop::new(source),
}
}
}
impl<S> Stream for Cycle<S, S::Item>
impl<S> Drop for Cycle<S> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.source);
}
}
}
impl<S> Stream for Cycle<S>
where
S: Stream,
S::Item: Clone,
S: Stream + Clone,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
unsafe {
let this = self.get_unchecked_mut();
let mut next;
if *this.state == CycleState::FromStream {
next = futures_core::ready!(this.source.poll_next(cx));
if let Some(val) = next {
this.buffer.push(val.clone());
next = Some(val)
} else {
*this.state = CycleState::FromBuffer;
next = this.buffer.get(*this.index).cloned();
match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) {
Some(item) => Poll::Ready(Some(item)),
None => {
ManuallyDrop::drop(&mut this.source);
this.source = ManuallyDrop::new(this.orig.clone());
Pin::new_unchecked(&mut *this.source).poll_next(cx)
}
}
} else {
let mut index = *this.index;
if index == this.buffer.len() {
index = 0
}
next = Some(this.buffer[index].clone());
*this.index = index + 1;
}
Poll::Ready(next)
}
}

View file

@ -6,8 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
#[derive(Debug)]
pub struct Enumerate<S> {
#[pin]
stream: S,

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`filter`]: trait.Stream.html#method.filter
/// [`Stream`]: trait.Stream.html
#[derive(Debug)]
pub struct Filter<S, P, T> {
pub struct Filter<S, P> {
#[pin]
stream: S,
predicate: P,
__t: PhantomData<T>,
}
}
impl<S, P, T> Filter<S, P, T> {
impl<S, P> Filter<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self {
Filter {
stream,
predicate,
__t: PhantomData,
}
}
}
impl<S, P> Stream for Filter<S, P, S::Item>
impl<S, P> Stream for Filter<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -7,29 +6,21 @@ use pin_project_lite::pin_project;
use crate::stream::Stream;
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> {
#[derive(Debug)]
pub struct FilterMap<S, F> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}
}
impl<S, F, T, B> FilterMap<S, F, T, B> {
impl<S, F> FilterMap<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap {
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
FilterMap { stream, f }
}
}
impl<S, F, B> Stream for FilterMap<S, F, S::Item, B>
impl<S, F, B> Stream for FilterMap<S, F>
where
S: Stream,
F: FnMut(S::Item) -> Option<B>,

View file

@ -1,31 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> {
pub struct FindFuture<'a, S, P> {
stream: &'a mut S,
p: P,
__t: PhantomData<T>,
}
impl<'a, S, P, T> FindFuture<'a, S, P, T> {
impl<'a, S, P> FindFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture {
stream,
p,
__t: PhantomData,
}
FindFuture { stream, p }
}
}
impl<S: Unpin, P, T> Unpin for FindFuture<'_, S, P, T> {}
impl<S: Unpin, P> Unpin for FindFuture<'_, S, P> {}
impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item>
impl<'a, S, P> Future for FindFuture<'a, S, P>
where
S: Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,

View file

@ -1,33 +1,25 @@
use std::marker::PhantomData;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindMapFuture<'a, S, F, T, B> {
pub struct FindMapFuture<'a, S, F> {
stream: &'a mut S,
f: F,
__b: PhantomData<B>,
__t: PhantomData<T>,
}
impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> {
impl<'a, S, F> FindMapFuture<'a, S, F> {
pub(super) fn new(stream: &'a mut S, f: F) -> Self {
FindMapFuture {
stream,
f,
__b: PhantomData,
__t: PhantomData,
}
FindMapFuture { stream, f }
}
}
impl<S: Unpin, F, T, B> Unpin for FindMapFuture<'_, S, F, T, B> {}
impl<S: Unpin, F> Unpin for FindMapFuture<'_, S, F> {}
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B>
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
where
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> Option<B>,

View file

@ -1,33 +1,36 @@
use pin_project_lite::pin_project;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::prelude::*;
use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};
pin_project! {
/// A stream that maps each element to a stream, and yields the elements of the produced
/// streams.
///
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S, U, T, F> {
pub struct FlatMap<S, U, F> {
#[pin]
stream: Map<S, F, T, U>,
stream: Map<S, F>,
#[pin]
inner_stream: Option<U>,
}
}
impl<S, U, F> FlatMap<S, U, S::Item, F>
impl<S, U, F> FlatMap<S, U, F>
where
S: Stream,
U: IntoStream,
F: FnMut(S::Item) -> U,
{
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, F> {
FlatMap {
stream: stream.map(f),
inner_stream: None,
@ -35,7 +38,7 @@ where
}
}
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
impl<S, U, F> Stream for FlatMap<S, U, F>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,

View file

@ -1,30 +1,38 @@
use pin_project_lite::pin_project;
use std::fmt;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};
pin_project! {
/// A stream that flattens one level of nesting in an stream of things that can be turned into
/// streams.
///
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S, U> {
pub struct Flatten<S>
where
S: Stream,
S::Item: IntoStream,
{
#[pin]
stream: S,
#[pin]
inner_stream: Option<U>,
inner_stream: Option<<S::Item as IntoStream>::IntoStream>,
}
}
impl<S> Flatten<S, S::Item>
impl<S> Flatten<S>
where
S: Stream,
S::Item: IntoStream,
{
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
pub(super) fn new(stream: S) -> Flatten<S> {
Flatten {
stream,
inner_stream: None,
@ -32,7 +40,7 @@ where
}
}
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
impl<S, U> Stream for Flatten<S>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
@ -56,3 +64,16 @@ where
}
}
}
impl<S, U> fmt::Debug for Flatten<S>
where
S: fmt::Debug + Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: fmt::Debug + Stream,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Flatten")
.field("inner", &self.stream)
.finish()
}
}

View file

@ -1,6 +1,5 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -8,29 +7,26 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FoldFuture<S, F, T, B> {
#[derive(Debug)]
pub struct FoldFuture<S, F, B> {
#[pin]
stream: S,
f: F,
acc: Option<B>,
__t: PhantomData<T>,
}
}
impl<S, F, T, B> FoldFuture<S, F, T, B> {
impl<S, F, B> FoldFuture<S, F, B> {
pub(super) fn new(stream: S, init: B, f: F) -> Self {
FoldFuture {
stream,
f,
acc: Some(init),
__t: PhantomData,
}
}
}
impl<S, F, B> Future for FoldFuture<S, F, S::Item, B>
impl<S, F, B> Future for FoldFuture<S, F, B>
where
S: Stream + Sized,
F: FnMut(B, S::Item) -> B,

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
@ -10,25 +9,23 @@ use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ForEachFuture<S, F, T> {
pub struct ForEachFuture<S, F> {
#[pin]
stream: S,
f: F,
__t: PhantomData<T>,
}
}
impl<S, F, T> ForEachFuture<S, F, T> {
impl<S, F> ForEachFuture<S, F> {
pub(super) fn new(stream: S, f: F) -> Self {
ForEachFuture {
stream,
f,
__t: PhantomData,
}
}
}
impl<S, F> Future for ForEachFuture<S, F, S::Item>
impl<S, F> Future for ForEachFuture<S, F>
where
S: Stream + Sized,
F: FnMut(S::Item),

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`inspect`]: trait.Stream.html#method.inspect
/// [`Stream`]: trait.Stream.html
#[derive(Debug)]
pub struct Inspect<S, F, T> {
pub struct Inspect<S, F> {
#[pin]
stream: S,
f: F,
__t: PhantomData<T>,
}
}
impl<S, F, T> Inspect<S, F, T> {
impl<S, F> Inspect<S, F> {
pub(super) fn new(stream: S, f: F) -> Self {
Inspect {
stream,
f,
__t: PhantomData,
}
}
}
impl<S, F> Stream for Inspect<S, F, S::Item>
impl<S, F> Stream for Inspect<S, F>
where
S: Stream,
F: FnMut(&S::Item),

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -7,29 +6,25 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Map<S, F, T, B> {
/// A stream that maps value of another stream with a function.
#[derive(Debug)]
pub struct Map<S, F> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}
}
impl<S, F, T, B> Map<S, F, T, B> {
impl<S, F> Map<S, F> {
pub(crate) fn new(stream: S, f: F) -> Self {
Map {
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
}
}
impl<S, F, B> Stream for Map<S, F, S::Item, B>
impl<S, F, B> Stream for Map<S, F>
where
S: Stream,
F: FnMut(S::Item) -> B,

View file

@ -290,6 +290,7 @@ extension_trait! {
Creates a stream that yields elements based on a predicate.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
@ -302,12 +303,11 @@ extension_trait! {
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None);
#
# }) }
```
"#]
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P, Self::Item>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
@ -447,10 +447,10 @@ extension_trait! {
# }) }
```
"#]
fn cloned<'a,T>(self) -> Cloned<Self>
fn cloned<'a, T>(self) -> Cloned<Self>
where
Self: Sized + Stream<Item = &'a T>,
T : 'a + Clone,
T: Clone + 'a,
{
Cloned::new(self)
}
@ -480,10 +480,10 @@ extension_trait! {
# }) }
```
"#]
fn copied<'a,T>(self) -> Copied<Self>
fn copied<'a, T>(self) -> Copied<Self>
where
Self: Sized + Stream<Item = &'a T>,
T : 'a + Copy,
T: Copy + 'a,
{
Copied::new(self)
}
@ -512,10 +512,9 @@ extension_trait! {
# })
```
"#]
fn cycle(self) -> Cycle<Self, Self::Item>
where
Self: Sized,
Self::Item: Clone,
fn cycle(self) -> Cycle<Self>
where
Self: Clone + Sized,
{
Cycle::new(self)
}
@ -542,7 +541,6 @@ extension_trait! {
assert_eq!(s.next().await, Some((1, 'b')));
assert_eq!(s.next().await, Some((2, 'c')));
assert_eq!(s.next().await, None);
#
# }) }
```
@ -577,7 +575,7 @@ extension_trait! {
# }) }
```
"#]
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B>
fn map<B, F>(self, f: F) -> Map<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> B,
@ -602,17 +600,18 @@ extension_trait! {
let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
let sum = s
.inspect(|x| println!("about to filter {}", x))
.filter(|x| x % 2 == 0)
.inspect(|x| println!("made it through filter: {}", x))
.fold(0, |sum, i| sum + i).await;
.inspect(|x| println!("about to filter {}", x))
.filter(|x| x % 2 == 0)
.inspect(|x| println!("made it through filter: {}", x))
.fold(0, |sum, i| sum + i)
.await;
assert_eq!(sum, 6);
#
# }) }
```
"#]
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item),
@ -655,7 +654,6 @@ extension_trait! {
#
# }) }
```
"#]
fn last(
self,
@ -722,7 +720,7 @@ extension_trait! {
# }) }
```
"#]
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
fn filter<P>(self, predicate: P) -> Filter<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
@ -758,7 +756,7 @@ extension_trait! {
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where
Self: Sized,
U: IntoStream,
@ -792,7 +790,7 @@ extension_trait! {
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flatten(self) -> Flatten<Self, Self::Item>
fn flatten(self) -> Flatten<Self>
where
Self: Sized,
Self::Item: IntoStream,
@ -833,7 +831,7 @@ extension_trait! {
# }) }
```
"#]
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
@ -841,7 +839,7 @@ extension_trait! {
FilterMap::new(self, f)
}
#[doc = r#"
#[doc = r#"
Returns the element that gives the minimum value with respect to the
specified key function. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned.
@ -865,19 +863,19 @@ extension_trait! {
# }) }
```
"#]
fn min_by_key<K>(
fn min_by_key<B, F>(
self,
key_by: K,
) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, K>]
key_by: F,
) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, F>]
where
Self: Sized,
Self::Item: Ord,
K: FnMut(&Self::Item) -> Self::Item,
B: Ord,
F: FnMut(&Self::Item) -> B,
{
MinByKeyFuture::new(self, key_by)
}
#[doc = r#"
#[doc = r#"
Returns the element that gives the maximum value with respect to the
specified key function. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned.
@ -901,14 +899,14 @@ extension_trait! {
# }) }
```
"#]
fn max_by_key<K>(
fn max_by_key<B, F>(
self,
key_by: K,
) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, K>]
key_by: F,
) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, F>]
where
Self: Sized,
Self::Item: Ord,
K: FnMut(&Self::Item) -> Self::Item,
B: Ord,
F: FnMut(&Self::Item) -> B,
{
MaxByKeyFuture::new(self, key_by)
}
@ -1080,7 +1078,7 @@ extension_trait! {
n: usize,
) -> impl Future<Output = Option<Self::Item>> + '_ [NthFuture<'_, Self>]
where
Self: Sized,
Self: Unpin + Sized,
{
NthFuture::new(self, n)
}
@ -1188,9 +1186,9 @@ extension_trait! {
fn find<P>(
&mut self,
p: P,
) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P, Self::Item>]
) -> impl Future<Output = Option<Self::Item>> + '_ [FindFuture<'_, Self, P>]
where
Self: Sized,
Self: Unpin + Sized,
P: FnMut(&Self::Item) -> bool,
{
FindFuture::new(self, p)
@ -1216,9 +1214,9 @@ extension_trait! {
fn find_map<F, B>(
&mut self,
f: F,
) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F, Self::Item, B>]
) -> impl Future<Output = Option<B>> + '_ [FindMapFuture<'_, Self, F>]
where
Self: Sized,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> Option<B>,
{
FindMapFuture::new(self, f)
@ -1250,7 +1248,7 @@ extension_trait! {
self,
init: B,
f: F,
) -> impl Future<Output = B> [FoldFuture<Self, F, Self::Item, B>]
) -> impl Future<Output = B> [FoldFuture<Self, F, B>]
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
@ -1285,7 +1283,7 @@ extension_trait! {
fn for_each<F>(
self,
f: F,
) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>]
) -> impl Future<Output = ()> [ForEachFuture<Self, F>]
where
Self: Sized,
F: FnMut(Self::Item),
@ -1426,7 +1424,7 @@ extension_trait! {
# }) }
```
"#]
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
@ -1509,7 +1507,7 @@ extension_trait! {
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![1usize, 2, 3]);
let mut s = stream::from_iter(vec![1usize, 2, 3]);
let sum = s.try_fold(0, |acc, v| {
if (acc+v) % 2 == 1 {
Ok(v+3)
@ -1524,12 +1522,12 @@ extension_trait! {
```
"#]
fn try_fold<B, F, T, E>(
self,
&mut self,
init: T,
f: F,
) -> impl Future<Output = Result<T, E>> [TryFoldFuture<Self, F, T>]
) -> impl Future<Output = Result<T, E>> + '_ [TryFoldFuture<'_, Self, F, T>]
where
Self: Sized,
Self: Unpin + Sized,
F: FnMut(B, Self::Item) -> Result<T, E>,
{
TryFoldFuture::new(self, init, f)
@ -1549,7 +1547,7 @@ extension_trait! {
let (tx, rx) = channel();
let s = stream::from_iter(vec![1u8, 2, 3]);
let mut s = stream::from_iter(vec![1u8, 2, 3]);
let s = s.try_for_each(|v| {
if v % 2 == 1 {
tx.clone().send(v).unwrap();
@ -1570,11 +1568,11 @@ extension_trait! {
```
"#]
fn try_for_each<F, E>(
self,
&mut self,
f: F,
) -> impl Future<Output = E> [TryForEachFuture<Self, F, Self::Item, E>]
) -> impl Future<Output = E> + 'a [TryForEachFuture<'_, Self, F>]
where
Self: Sized,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> Result<(), E>,
{
TryForEachFuture::new(self, f)
@ -1619,7 +1617,7 @@ extension_trait! {
#[inline]
fn zip<U>(self, other: U) -> Zip<Self, U>
where
Self: Sized + Stream,
Self: Sized,
U: Stream,
{
Zip::new(self, other)
@ -1678,7 +1676,6 @@ extension_trait! {
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(
self,
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
@ -1777,28 +1774,28 @@ extension_trait! {
use async_std::stream;
let s = stream::from_iter(vec![1usize, 2, 3]);
let res = s.clone().position(|x| *x == 1).await;
let res = s.clone().position(|x| x == 1).await;
assert_eq!(res, Some(0));
let res = s.clone().position(|x| *x == 2).await;
let res = s.clone().position(|x| x == 2).await;
assert_eq!(res, Some(1));
let res = s.clone().position(|x| *x == 3).await;
let res = s.clone().position(|x| x == 3).await;
assert_eq!(res, Some(2));
let res = s.clone().position(|x| *x == 4).await;
let res = s.clone().position(|x| x == 4).await;
assert_eq!(res, None);
#
# }) }
```
"#]
fn position<P>(
self,
predicate: P
) -> impl Future<Output = Option<usize>> [PositionFuture<Self, P>]
&mut self,
predicate: P,
) -> impl Future<Output = Option<usize>> + '_ [PositionFuture<'_, Self, P>]
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
Self: Unpin + Sized,
P: FnMut(Self::Item) -> bool,
{
PositionFuture::new(self, predicate)
}
@ -1842,10 +1839,12 @@ extension_trait! {
CmpFuture::new(self, other)
}
#[doc = r#"
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
not equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
@ -1870,7 +1869,7 @@ extension_trait! {
other: S
) -> impl Future<Output = bool> [NeFuture<Self, S>]
where
Self: Sized + Stream,
Self: Sized,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
@ -2063,11 +2062,11 @@ extension_trait! {
}
#[doc = r#"
Sums the elements of an iterator.
Sums the elements of a stream.
Takes each element, adds them together, and returns the result.
An empty iterator returns the zero value of the type.
An empty streams returns the zero value of the type.
# Panics
@ -2100,15 +2099,15 @@ extension_trait! {
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
where
Self: Sized + Stream<Item = S> + 'a,
S: Sum,
S: Sum<Self::Item>,
{
Sum::sum(self)
}
#[doc = r#"
Iterates over the entire iterator, multiplying all the elements
Multiplies all elements of the stream.
An empty iterator returns the one value of the type.
An empty stream returns the one value of the type.
# Panics

View file

@ -1,24 +1,21 @@
use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PositionFuture<S, P> {
#[pin]
stream: S,
predicate: P,
index:usize,
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PositionFuture<'a, S, P> {
stream: &'a mut S,
predicate: P,
index: usize,
}
impl<S, P> PositionFuture<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self {
impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {}
impl<'a, S, P> PositionFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, predicate: P) -> Self {
PositionFuture {
stream,
predicate,
@ -27,23 +24,25 @@ impl<S, P> PositionFuture<S, P> {
}
}
impl<S, P> Future for PositionFuture<S, P>
impl<'a, S, P> Future for PositionFuture<'a, S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
S: Stream + Unpin,
P: FnMut(S::Item) -> bool,
{
type Output = Option<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
match next {
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)),
Some(_) => {
cx.waker().wake_by_ref();
*this.index += 1;
Poll::Pending
Some(v) => {
if (&mut self.predicate)(v) {
Poll::Ready(Some(self.index))
} else {
cx.waker().wake_by_ref();
self.index += 1;
Poll::Pending
}
}
None => Poll::Ready(None),
}

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`skip_while`]: trait.Stream.html#method.skip_while
/// [`Stream`]: trait.Stream.html
#[derive(Debug)]
pub struct SkipWhile<S, P, T> {
pub struct SkipWhile<S, P> {
#[pin]
stream: S,
predicate: Option<P>,
__t: PhantomData<T>,
}
}
impl<S, P, T> SkipWhile<S, P, T> {
impl<S, P> SkipWhile<S, P> {
pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile {
stream,
predicate: Some(predicate),
__t: PhantomData,
}
}
}
impl<S, P> Stream for SkipWhile<S, P, S::Item>
impl<S, P> Stream for SkipWhile<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,

View file

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
@ -15,25 +14,23 @@ pin_project! {
/// [`take_while`]: trait.Stream.html#method.take_while
/// [`Stream`]: trait.Stream.html
#[derive(Debug)]
pub struct TakeWhile<S, P, T> {
pub struct TakeWhile<S, P> {
#[pin]
stream: S,
predicate: P,
__t: PhantomData<T>,
}
}
impl<S, P, T> TakeWhile<S, P, T> {
impl<S, P> TakeWhile<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self {
TakeWhile {
stream,
predicate,
__t: PhantomData,
}
}
}
impl<S, P> Stream for TakeWhile<S, P, S::Item>
impl<S, P> Stream for TakeWhile<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,

View file

@ -22,7 +22,7 @@ pin_project! {
}
impl<S: Stream> Timeout<S> {
pub fn new(stream: S, dur: Duration) -> Timeout<S> {
pub(crate) fn new(stream: S, dur: Duration) -> Timeout<S> {
let delay = Delay::new(dur);
Timeout { stream, delay }

View file

@ -1,58 +1,51 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryFoldFuture<S, F, T> {
#[pin]
stream: S,
f: F,
acc: Option<T>,
__t: PhantomData<T>,
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryFoldFuture<'a, S, F, T> {
stream: &'a mut S,
f: F,
acc: Option<T>,
}
impl<S, F, T> TryFoldFuture<S, F, T> {
pub(super) fn new(stream: S, init: T, f: F) -> Self {
impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {}
impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> {
pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self {
TryFoldFuture {
stream,
f,
acc: Some(init),
__t: PhantomData,
}
}
}
impl<S, F, T, E> Future for TryFoldFuture<S, F, T>
impl<'a, S, F, T, E> Future for TryFoldFuture<'a, S, F, T>
where
S: Stream + Sized,
S: Stream + Unpin,
F: FnMut(T, S::Item) -> Result<T, E>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
match next {
Some(v) => {
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
let old = self.acc.take().unwrap();
let new = (&mut self.f)(old, v);
match new {
Ok(o) => *this.acc = Some(o),
Ok(o) => self.acc = Some(o),
Err(e) => return Poll::Ready(Err(e)),
}
}
None => return Poll::Ready(Ok(this.acc.take().unwrap())),
None => return Poll::Ready(Ok(self.acc.take().unwrap())),
}
}
}

View file

@ -1,52 +1,39 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryForEachFuture<S, F, T, R> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<R>,
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryForEachFuture<'a, S, F> {
stream: &'a mut S,
f: F,
}
impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {}
impl<'a, S, F> TryForEachFuture<'a, S, F> {
pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
TryForEachFuture { stream, f }
}
}
impl<S, F, T, R> TryForEachFuture<S, F, T, R> {
pub(crate) fn new(stream: S, f: F) -> Self {
TryForEachFuture {
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
}
}
impl<S, F, E> Future for TryForEachFuture<S, F, S::Item, E>
impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
where
S: Stream,
S::Item: std::fmt::Debug,
S: Stream + Unpin,
F: FnMut(S::Item) -> Result<(), E>,
{
type Output = Result<(), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let item = futures_core::ready!(this.stream.as_mut().poll_next(cx));
let item = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
match item {
None => return Poll::Ready(Ok(())),
Some(v) => {
let res = (this.f)(v);
let res = (&mut self.f)(v);
if let Err(e) = res {
return Poll::Ready(Err(e));
}

View file

@ -7,7 +7,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// An iterator that iterates two other iterators simultaneously.
/// A stream that takes items from two other streams simultaneously.
///
/// This `struct` is created by the [`zip`] method on [`Stream`]. See its
/// documentation for more.

View file

@ -155,9 +155,7 @@
//! ```
//! # async_std::task::block_on(async {
//! #
//! use std::sync::Arc;
//!
//! use async_std::sync::Mutex;
//! use async_std::sync::{Arc, Mutex};
//! use async_std::task;
//!
//! let m1 = Arc::new(Mutex::new(0));

View file

@ -19,9 +19,7 @@ use crate::task::{Context, Poll};
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::sync::Arc;
///
/// use async_std::sync::Mutex;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::task;
///
/// let m = Arc::new(Mutex::new(0));
@ -77,9 +75,7 @@ impl<T> Mutex<T> {
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::sync::Arc;
///
/// use async_std::sync::Mutex;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::task;
///
/// let m1 = Arc::new(Mutex::new(10));
@ -155,9 +151,7 @@ impl<T> Mutex<T> {
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::sync::Arc;
///
/// use async_std::sync::Mutex;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::task;
///
/// let m1 = Arc::new(Mutex::new(10));

View file

@ -124,6 +124,9 @@ cfg_std! {
#[doc(inline)]
pub use async_macros::ready;
pub use yield_now::yield_now;
mod yield_now;
}
cfg_default! {
@ -157,8 +160,3 @@ cfg_default! {
#[cfg(not(any(feature = "unstable", test)))]
pub(crate) use spawn_blocking::spawn_blocking;
}
cfg_unstable! {
pub use yield_now::yield_now;
mod yield_now;
}

View file

@ -6,7 +6,7 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use once_cell::sync::Lazy;
use crate::task::{JoinHandle, Task};
use crate::utils::{abort_on_panic, random};
use crate::utils::abort_on_panic;
/// Spawns a blocking task.
///
@ -68,16 +68,13 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);
// Generate a random duration of time between 1 second and 10 seconds. If the thread doesn't
// receive the next task in this duration of time, it will stop running.
let timeout = Duration::from_millis(1000 + u64::from(random(9_000)));
let timeout = Duration::from_secs(1);
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let task = match POOL.receiver.recv_timeout(timeout) {
let mut task = match POOL.receiver.recv_timeout(timeout) {
Ok(task) => task,
Err(_) => {
// Check whether this is the last sleeping thread.
@ -100,8 +97,22 @@ fn start_thread() {
start_thread();
}
// Run the task.
abort_on_panic(|| task.run());
loop {
// Run the task.
abort_on_panic(|| task.run());
// Try taking another task if there are any available.
task = match POOL.receiver.try_recv() {
Ok(task) => task,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
}

View file

@ -26,8 +26,6 @@ use crate::task::{Context, Poll};
/// #
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub async fn yield_now() {
YieldNow(false).await