use pin_project_lite

yoshuawuyts-patch-1
yjhmelody 5 years ago
commit c9e6d3a84c

@ -11,6 +11,8 @@ jobs:
build_and_test:
name: Build and test
runs-on: ${{ matrix.os }}
env:
RUSTFLAGS: -Dwarnings
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
@ -29,7 +31,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests
args: --all --bins --examples
- name: check unstable
uses: actions-rs/cargo@v1
@ -46,6 +48,8 @@ jobs:
check_fmt_and_docs:
name: Checking fmt and docs
runs-on: ubuntu-latest
env:
RUSTFLAGS: -Dwarnings
steps:
- uses: actions/checkout@master
@ -77,6 +81,9 @@ jobs:
clippy_check:
name: Clippy check
runs-on: ubuntu-latest
# TODO: There is a lot of warnings
# env:
# RUSTFLAGS: -Dwarnings
steps:
- uses: actions/checkout@v1
- id: component

@ -9,7 +9,7 @@ authors = [
edition = "2018"
license = "Apache-2.0/MIT"
repository = "https://github.com/async-rs/async-std"
homepage = "https://github.com/async-rs/async-std"
homepage = "https://async.rs"
documentation = "https://docs.rs/async-std"
description = "Async version of the Rust standard library"
keywords = ["async", "await", "future", "std", "task"]
@ -43,9 +43,11 @@ pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
pin-project-lite = "0.1"
[dev-dependencies]
femme = "1.2.0"
rand = "0.7.2"
# surf = "1.0.2"
tempdir = "0.3.7"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }

@ -4,6 +4,7 @@ use std::pin::Pin;
use std::time::Duration;
use futures_timer::Delay;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::task::{Context, Poll};
@ -39,24 +40,24 @@ where
f.await
}
/// A future that times out after a duration of time.
struct TimeoutFuture<F> {
future: F,
delay: Delay,
}
impl<F> TimeoutFuture<F> {
pin_utils::unsafe_pinned!(future: F);
pin_utils::unsafe_pinned!(delay: Delay);
pin_project! {
/// A future that times out after a duration of time.
struct TimeoutFuture<F> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
}
impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, TimeoutError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().poll(cx) {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Ready(v) => Poll::Ready(Ok(v)),
Poll::Pending => match self.delay().poll(cx) {
Poll::Pending => match this.delay.poll(cx) {
Poll::Ready(_) => Poll::Ready(Err(TimeoutError { _private: () })),
Poll::Pending => Poll::Pending,
},

@ -2,50 +2,55 @@ use std::mem;
use std::pin::Pin;
use std::str;
use pin_project_lite::pin_project;
use super::read_until_internal;
use crate::io::{self, BufRead};
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
pin_project! {
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
#[pin]
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}
}
impl<R: BufRead> Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
let this = self.project();
let n = futures_core::ready!(read_line_internal(
this.reader,
cx,
this.buf,
this.bytes,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
if this.buf.ends_with('\n') {
this.buf.pop();
if this.buf.ends_with('\r') {
this.buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
}
}

@ -1,46 +1,51 @@
use std::mem;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::read_until_internal;
use crate::io::{self, BufRead};
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
///
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Split`].
///
/// [`split`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
#[derive(Debug)]
pub struct Split<R> {
pub(crate) reader: R,
pub(crate) buf: Vec<u8>,
pub(crate) read: usize,
pub(crate) delim: u8,
pin_project! {
/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
///
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Split`].
///
/// [`split`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
#[derive(Debug)]
pub struct Split<R> {
#[pin]
pub(crate) reader: R,
pub(crate) buf: Vec<u8>,
pub(crate) read: usize,
pub(crate) delim: u8,
}
}
impl<R: BufRead> Stream for Split<R> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
read,
delim,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_until_internal(reader, cx, *delim, buf, read))?;
if n == 0 && buf.is_empty() {
let this = self.project();
let n = futures_core::ready!(read_until_internal(
this.reader,
cx,
*this.delim,
this.buf,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if buf[buf.len() - 1] == *delim {
buf.pop();
if this.buf[this.buf.len() - 1] == *this.delim {
this.buf.pop();
}
Poll::Ready(Some(Ok(mem::replace(buf, vec![]))))
Poll::Ready(Some(Ok(mem::replace(this.buf, vec![]))))
}
}

@ -2,51 +2,56 @@ use std::io::{IoSliceMut, Read as _};
use std::pin::Pin;
use std::{cmp, fmt};
use pin_project_lite::pin_project;
use crate::io::{self, BufRead, Read, Seek, SeekFrom};
use crate::task::{Context, Poll};
const DEFAULT_CAPACITY: usize = 8 * 1024;
/// Adds buffering to any reader.
///
/// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader`
/// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer
/// of the incoming byte stream.
///
/// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to
/// the same file or network socket. It does not help when reading very large amounts at once, or
/// reading just one or a few times. It also provides no advantage when reading from a source that
/// is already in memory, like a `Vec<u8>`.
///
/// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating
/// multiple instances of a `BufReader` on the same stream can cause data loss.
///
/// This type is an async version of [`std::io::BufReader`].
///
/// [`Read`]: trait.Read.html
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut line = String::new();
/// file.read_line(&mut line).await?;
/// #
/// # Ok(()) }) }
/// ```
pub struct BufReader<R> {
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
pin_project! {
/// Adds buffering to any reader.
///
/// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader`
/// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer
/// of the incoming byte stream.
///
/// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to
/// the same file or network socket. It does not help when reading very large amounts at once, or
/// reading just one or a few times. It also provides no advantage when reading from a source that
/// is already in memory, like a `Vec<u8>`.
///
/// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating
/// multiple instances of a `BufReader` on the same stream can cause data loss.
///
/// This type is an async version of [`std::io::BufReader`].
///
/// [`Read`]: trait.Read.html
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut line = String::new();
/// file.read_line(&mut line).await?;
/// #
/// # Ok(()) }) }
/// ```
pub struct BufReader<R> {
#[pin]
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
}
impl<R: io::Read> BufReader<R> {
@ -95,10 +100,6 @@ impl<R: io::Read> BufReader<R> {
}
impl<R> BufReader<R> {
pin_utils::unsafe_pinned!(inner: R);
pin_utils::unsafe_unpinned!(pos: usize);
pin_utils::unsafe_unpinned!(cap: usize);
/// Gets a reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
@ -141,6 +142,13 @@ impl<R> BufReader<R> {
&mut self.inner
}
/// Gets a pinned mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().inner
}
/// Returns a reference to the internal buffer.
///
/// This function will not attempt to fill the buffer if it is empty.
@ -185,9 +193,10 @@ impl<R> BufReader<R> {
/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(mut self: Pin<&mut Self>) {
*self.as_mut().pos() = 0;
*self.cap() = 0;
fn discard_buffer(self: Pin<&mut Self>) {
let this = self.project();
*this.pos = 0;
*this.cap = 0;
}
}
@ -201,7 +210,7 @@ impl<R: Read> Read for BufReader<R> {
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() {
let res = futures_core::ready!(self.as_mut().inner().poll_read(cx, buf));
let res = futures_core::ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
@ -218,7 +227,8 @@ impl<R: Read> Read for BufReader<R> {
) -> Poll<io::Result<usize>> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
let res = futures_core::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
let res =
futures_core::ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
self.discard_buffer();
return Poll::Ready(res);
}
@ -234,28 +244,23 @@ impl<R: Read> BufRead for BufReader<R> {
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
let Self {
inner,
buf,
cap,
pos,
} = unsafe { self.get_unchecked_mut() };
let mut inner = unsafe { Pin::new_unchecked(inner) };
let mut this = self.project();
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
if *pos >= *cap {
debug_assert!(*pos == *cap);
*cap = futures_core::ready!(inner.as_mut().poll_read(cx, buf))?;
*pos = 0;
if *this.pos >= *this.cap {
debug_assert!(*this.pos == *this.cap);
*this.cap = futures_core::ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
*this.pos = 0;
}
Poll::Ready(Ok(&buf[*pos..*cap]))
Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
*this.pos = cmp::min(*this.pos + amt, *this.cap);
}
}
@ -305,24 +310,26 @@ impl<R: Seek> Seek for BufReader<R> {
if let Some(offset) = n.checked_sub(remainder) {
result = futures_core::ready!(
self.as_mut()
.inner()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(offset))
)?;
} else {
// seek backwards by our remainder, and then by the offset
futures_core::ready!(
self.as_mut()
.inner()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(-remainder))
)?;
self.as_mut().discard_buffer();
result = futures_core::ready!(
self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n))
self.as_mut()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(n))
)?;
}
} else {
// Seeking with Start/End doesn't care about our buffer length.
result = futures_core::ready!(self.as_mut().inner().poll_seek(cx, pos))?;
result = futures_core::ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
}
self.discard_buffer();
Poll::Ready(Ok(result))

@ -2,6 +2,7 @@ use std::fmt;
use std::pin::Pin;
use futures_core::ready;
use pin_project_lite::pin_project;
use crate::io::write::WriteExt;
use crate::io::{self, Seek, SeekFrom, Write};
@ -9,88 +10,88 @@ use crate::task::{Context, Poll};
const DEFAULT_CAPACITY: usize = 8 * 1024;
/// Wraps a writer and buffers its output.
///
/// It can be excessively inefficient to work directly with something that
/// implements [`Write`]. For example, every call to
/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
/// writer in large, infrequent batches.
///
/// `BufWriter` can improve the speed of programs that make *small* and
/// *repeated* write calls to the same file or network socket. It does not
/// help when writing very large amounts at once, or writing just one or a few
/// times. It also provides no advantage when writing to a destination that is
/// in memory, like a `Vec<u8>`.
///
/// When the `BufWriter` is dropped, the contents of its buffer will be written
/// out. However, any errors that happen in the process of flushing the buffer
/// when the writer is dropped will be ignored. Code that wishes to handle such
/// errors must manually call [`flush`] before the writer is dropped.
///
/// This type is an async version of [`std::io::BufReader`].
///
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
///
/// # Examples
///
/// Let's write the numbers one through ten to a [`TcpStream`]:
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// use async_std::net::TcpStream;
/// use async_std::prelude::*;
///
/// let mut stream = TcpStream::connect("127.0.0.1:34254").await?;
///
/// for i in 0..10 {
/// let arr = [i+1];
/// stream.write(&arr).await?;
/// }
/// #
/// # Ok(()) }) }
/// ```
///
/// Because we're not buffering, we write each one in turn, incurring the
/// overhead of a system call per byte written. We can fix this with a
/// `BufWriter`:
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// use async_std::io::BufWriter;
/// use async_std::net::TcpStream;
/// use async_std::prelude::*;
///
/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
/// for i in 0..10 {
/// let arr = [i+1];
/// stream.write(&arr).await?;
/// };
/// #
/// # Ok(()) }) }
/// ```
///
/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
/// together by the buffer, and will all be written out in one system call when
/// the `stream` is dropped.
///
/// [`Write`]: trait.Write.html
/// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write
/// [`TcpStream`]: ../net/struct.TcpStream.html
/// [`flush`]: trait.Write.html#tymethod.flush
pub struct BufWriter<W> {
inner: W,
buf: Vec<u8>,
written: usize,
pin_project! {
/// Wraps a writer and buffers its output.
///
/// It can be excessively inefficient to work directly with something that
/// implements [`Write`]. For example, every call to
/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
/// writer in large, infrequent batches.
///
/// `BufWriter` can improve the speed of programs that make *small* and
/// *repeated* write calls to the same file or network socket. It does not
/// help when writing very large amounts at once, or writing just one or a few
/// times. It also provides no advantage when writing to a destination that is
/// in memory, like a `Vec<u8>`.
///
/// When the `BufWriter` is dropped, the contents of its buffer will be written
/// out. However, any errors that happen in the process of flushing the buffer
/// when the writer is dropped will be ignored. Code that wishes to handle such
/// errors must manually call [`flush`] before the writer is dropped.
///
/// This type is an async version of [`std::io::BufReader`].
///
/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
///
/// # Examples
///
/// Let's write the numbers one through ten to a [`TcpStream`]:
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// use async_std::net::TcpStream;
/// use async_std::prelude::*;
///
/// let mut stream = TcpStream::connect("127.0.0.1:34254").await?;
///
/// for i in 0..10 {
/// let arr = [i+1];
/// stream.write(&arr).await?;
/// }
/// #
/// # Ok(()) }) }
/// ```
///
/// Because we're not buffering, we write each one in turn, incurring the
/// overhead of a system call per byte written. We can fix this with a
/// `BufWriter`:
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// use async_std::io::BufWriter;
/// use async_std::net::TcpStream;
/// use async_std::prelude::*;
///
/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?);
/// for i in 0..10 {
/// let arr = [i+1];
/// stream.write(&arr).await?;
/// };
/// #
/// # Ok(()) }) }
/// ```
///
/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
/// together by the buffer, and will all be written out in one system call when
/// the `stream` is dropped.
///
/// [`Write`]: trait.Write.html
/// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write
/// [`TcpStream`]: ../net/struct.TcpStream.html
/// [`flush`]: trait.Write.html#tymethod.flush
pub struct BufWriter<W> {
#[pin]
inner: W,
buf: Vec<u8>,
written: usize,
}
}
#[derive(Debug)]
pub struct IntoInnerError<W>(W, std::io::Error);
impl<W: Write> BufWriter<W> {
pin_utils::unsafe_pinned!(inner: W);
pin_utils::unsafe_unpinned!(buf: Vec<u8>);
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
/// but may change in the future.
///
@ -178,6 +179,13 @@ impl<W: Write> BufWriter<W> {
&mut self.inner
}
/// Gets a pinned mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
self.project().inner
}
/// Consumes BufWriter, returning the underlying writer
///
/// This method will not write leftover data, it will be lost.
@ -234,16 +242,15 @@ impl<W: Write> BufWriter<W> {
///
/// [`LineWriter`]: struct.LineWriter.html
fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let Self {
inner,
buf,
written,
} = unsafe { Pin::get_unchecked_mut(self) };
let mut inner = unsafe { Pin::new_unchecked(inner) };
let len = buf.len();
let mut this = self.project();
let len = this.buf.len();
let mut ret = Ok(());
while *written < len {
match inner.as_mut().poll_write(cx, &buf[*written..]) {
while *this.written < len {
match this
.inner
.as_mut()
.poll_write(cx, &this.buf[*this.written..])
{
Poll::Ready(Ok(0)) => {
ret = Err(io::Error::new(
io::ErrorKind::WriteZero,
@ -251,7 +258,7 @@ impl<W: Write> BufWriter<W> {
));
break;
}
Poll::Ready(Ok(n)) => *written += n,
Poll::Ready(Ok(n)) => *this.written += n,
Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {}
Poll::Ready(Err(e)) => {
ret = Err(e);
@ -260,10 +267,10 @@ impl<W: Write> BufWriter<W> {
Poll::Pending => return Poll::Pending,
}
}
if *written > 0 {
buf.drain(..*written);
if *this.written > 0 {
this.buf.drain(..*this.written);
}
*written = 0;
*this.written = 0;
Poll::Ready(ret)
}
}
@ -278,20 +285,20 @@ impl<W: Write> Write for BufWriter<W> {
ready!(self.as_mut().poll_flush_buf(cx))?;
}
if buf.len() >= self.buf.capacity() {
self.inner().poll_write(cx, buf)
self.get_pin_mut().poll_write(cx, buf)
} else {
Pin::new(&mut *self.buf()).poll_write(cx, buf)
Pin::new(&mut *self.project().buf).poll_write(cx, buf)
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.inner().poll_flush(cx)
self.get_pin_mut().poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.inner().poll_close(cx)
self.get_pin_mut().poll_close(cx)
}
}
@ -314,6 +321,6 @@ impl<W: Write + Seek> Seek for BufWriter<W> {
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.inner().poll_seek(cx, pos)
self.get_pin_mut().poll_seek(cx, pos)
}
}

@ -1,5 +1,7 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::io::{self, BufRead, BufReader, Read, Write};
use crate::task::{Context, Poll};
@ -46,47 +48,38 @@ where
R: Read + Unpin + ?Sized,
W: Write + Unpin + ?Sized,
{
pub struct CopyFuture<'a, R, W: ?Sized> {
reader: R,
writer: &'a mut W,
amt: u64,
}
impl<R, W: Unpin + ?Sized> CopyFuture<'_, R, W> {
fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) {
unsafe {
let this = self.get_unchecked_mut();
(
Pin::new_unchecked(&mut this.reader),
Pin::new(&mut *this.writer),
&mut this.amt,
)
}
pin_project! {
struct CopyFuture<R, W> {
#[pin]
reader: R,
#[pin]
writer: W,
amt: u64,
}
}
impl<R, W> Future for CopyFuture<'_, R, W>
impl<R, W> Future for CopyFuture<R, W>
where
R: BufRead,
W: Write + Unpin + ?Sized,
W: Write + Unpin,
{
type Output = io::Result<u64>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut reader, mut writer, amt) = self.project();
let mut this = self.project();
loop {
let buffer = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?;
let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
futures_core::ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(*amt));
futures_core::ready!(this.writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(*this.amt));
}
let i = futures_core::ready!(writer.as_mut().poll_write(cx, buffer))?;
let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
*amt += i as u64;
reader.as_mut().consume(i);
*this.amt += i as u64;
this.reader.as_mut().consume(i);
}
}
}

@ -1,20 +1,25 @@
use crate::io::IoSliceMut;
use std::fmt;
use std::pin::Pin;
use crate::io::{self, BufRead, Read};
use pin_project_lite::pin_project;
use crate::io::{self, BufRead, IoSliceMut, Read};
use crate::task::{Context, Poll};
/// Adaptor to chain together two readers.
///
/// This struct is generally created by calling [`chain`] on a reader.
/// Please see the documentation of [`chain`] for more details.
///
/// [`chain`]: trait.Read.html#method.chain
pub struct Chain<T, U> {
pub(crate) first: T,
pub(crate) second: U,
pub(crate) done_first: bool,
pin_project! {
/// Adaptor to chain together two readers.
///
/// This struct is generally created by calling [`chain`] on a reader.
/// Please see the documentation of [`chain`] for more details.
///
/// [`chain`]: trait.Read.html#method.chain
pub struct Chain<T, U> {
#[pin]
pub(crate) first: T,
#[pin]
pub(crate) second: U,
pub(crate) done_first: bool,
}
}
impl<T, U> Chain<T, U> {
@ -98,76 +103,64 @@ impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
}
}
impl<T: Read + Unpin, U: Read + Unpin> Read for Chain<T, U> {
impl<T: Read, U: Read> Read for Chain<T, U> {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if !self.done_first {
let rd = Pin::new(&mut self.first);
match futures_core::ready!(rd.poll_read(cx, buf)) {
Ok(0) if !buf.is_empty() => self.done_first = true,
let this = self.project();
if !*this.done_first {
match futures_core::ready!(this.first.poll_read(cx, buf)) {
Ok(0) if !buf.is_empty() => *this.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let rd = Pin::new(&mut self.second);
rd.poll_read(cx, buf)
this.second.poll_read(cx, buf)
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
if !self.done_first {
let rd = Pin::new(&mut self.first);
match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) {
Ok(0) if !bufs.is_empty() => self.done_first = true,
let this = self.project();
if !*this.done_first {
match futures_core::ready!(this.first.poll_read_vectored(cx, bufs)) {
Ok(0) if !bufs.is_empty() => *this.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let rd = Pin::new(&mut self.second);
rd.poll_read_vectored(cx, bufs)
this.second.poll_read_vectored(cx, bufs)
}
}
impl<T: BufRead + Unpin, U: BufRead + Unpin> BufRead for Chain<T, U> {
impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let Self {
first,
second,
done_first,
} = unsafe { self.get_unchecked_mut() };
if !*done_first {
let first = unsafe { Pin::new_unchecked(first) };
match futures_core::ready!(first.poll_fill_buf(cx)) {
let this = self.project();
if !*this.done_first {
match futures_core::ready!(this.first.poll_fill_buf(cx)) {
Ok(buf) if buf.is_empty() => {
*done_first = true;
*this.done_first = true;
}
Ok(buf) => return Poll::Ready(Ok(buf)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let second = unsafe { Pin::new_unchecked(second) };
second.poll_fill_buf(cx)
this.second.poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
if !self.done_first {
let rd = Pin::new(&mut self.first);
rd.consume(amt)
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
if !*this.done_first {
this.first.consume(amt)
} else {
let rd = Pin::new(&mut self.second);
rd.consume(amt)
this.second.consume(amt)
}
}
}

@ -1,19 +1,24 @@
use std::cmp;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::io::{self, BufRead, Read};
use crate::task::{Context, Poll};
/// Reader adaptor which limits the bytes read from an underlying reader.
///
/// This struct is generally created by calling [`take`] on a reader.
/// Please see the documentation of [`take`] for more details.
///
/// [`take`]: trait.Read.html#method.take
#[derive(Debug)]
pub struct Take<T> {
pub(crate) inner: T,
pub(crate) limit: u64,
pin_project! {
/// Reader adaptor which limits the bytes read from an underlying reader.
///
/// This struct is generally created by calling [`take`] on a reader.
/// Please see the documentation of [`take`] for more details.
///
/// [`take`]: trait.Read.html#method.take
#[derive(Debug)]
pub struct Take<T> {
#[pin]
pub(crate) inner: T,
pub(crate) limit: u64,
}
}
impl<T> Take<T> {
@ -152,15 +157,15 @@ impl<T> Take<T> {
}
}
impl<T: Read + Unpin> Read for Take<T> {
impl<T: Read> Read for Take<T> {
/// Attempt to read from the `AsyncRead` into `buf`.
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let Self { inner, limit } = &mut *self;
take_read_internal(Pin::new(inner), cx, buf, limit)
let this = self.project();
take_read_internal(this.inner, cx, buf, this.limit)
}
}
@ -186,31 +191,30 @@ pub fn take_read_internal<R: Read + ?Sized>(
}
}
impl<T: BufRead + Unpin> BufRead for Take<T> {
impl<T: BufRead> BufRead for Take<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let Self { inner, limit } = unsafe { self.get_unchecked_mut() };
let inner = unsafe { Pin::new_unchecked(inner) };
let this = self.project();
if *limit == 0 {
if *this.limit == 0 {
return Poll::Ready(Ok(&[]));
}
match futures_core::ready!(inner.poll_fill_buf(cx)) {
match futures_core::ready!(this.inner.poll_fill_buf(cx)) {
Ok(buf) => {
let cap = cmp::min(buf.len() as u64, *limit) as usize;
let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
Poll::Ready(Ok(&buf[..cap]))
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
// Don't let callers reset the limit by passing an overlarge value
let amt = cmp::min(amt as u64, self.limit) as usize;
self.limit -= amt as u64;
let amt = cmp::min(amt as u64, *this.limit) as usize;
*this.limit -= amt as u64;
let rd = Pin::new(&mut self.inner);
rd.consume(amt);
this.inner.consume(amt);
}
}

@ -3,7 +3,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use futures_timer::Delay;
use pin_utils::unsafe_pinned;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::io;
@ -43,22 +43,18 @@ where
.await
}
/// Future returned by the `FutureExt::timeout` method.
#[derive(Debug)]
pub struct Timeout<F, T>
where
F: Future<Output = io::Result<T>>,
{
future: F,
timeout: Delay,
}
impl<F, T> Timeout<F, T>
where
F: Future<Output = io::Result<T>>,
{
unsafe_pinned!(future: F);
unsafe_pinned!(timeout: Delay);
pin_project! {
/// Future returned by the `FutureExt::timeout` method.
#[derive(Debug)]
pub struct Timeout<F, T>
where
F: Future<Output = io::Result<T>>,
{
#[pin]
future: F,
#[pin]
timeout: Delay,
}
}
impl<F, T> Future for Timeout<F, T>
@ -67,14 +63,15 @@ where
{
type Output = io::Result<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().poll(cx) {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Pending => {}
other => return other,
}
if self.timeout().poll(cx).is_ready() {
let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into());
if this.timeout.poll(cx).is_ready() {
let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out"));
Poll::Ready(err)
} else {
Poll::Pending

@ -32,7 +32,7 @@ impl<T: Write + Unpin + ?Sized> Future for WriteFmtFuture<'_, T> {
buffer,
..
} = &mut *self;
let mut buffer = buffer.as_mut().unwrap();
let buffer = buffer.as_mut().unwrap();
// Copy the data from the buffer into the writer until it's done.
loop {
@ -40,7 +40,7 @@ impl<T: Write + Unpin + ?Sized> Future for WriteFmtFuture<'_, T> {
futures_core::ready!(Pin::new(&mut **writer).poll_flush(cx))?;
return Poll::Ready(Ok(()));
}
let i = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, &mut buffer))?;
let i = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}

@ -210,7 +210,7 @@ impl ToSocketAddrs for str {
impl Future<Output = Self::Iter>,
ToSocketAddrsFuture<Self::Iter>
) {
if let Some(addr) = self.parse().ok() {
if let Ok(addr) = self.parse() {
return ToSocketAddrsFuture::Ready(Ok(vec![addr].into_iter()));
}

@ -799,7 +799,7 @@ impl AsRef<Path> for String {
impl AsRef<Path> for std::path::PathBuf {
fn as_ref(&self) -> &Path {
Path::new(self.into())
Path::new(self)
}
}

@ -5,7 +5,7 @@ use crate::path::Path;
/// This struct is an async version of [`std::path::PathBuf`].
///
/// [`std::path::Path`]: https://doc.rust-lang.org/std/path/struct.PathBuf.html
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Default)]
pub struct PathBuf {
inner: std::path::PathBuf,
}
@ -206,7 +206,7 @@ impl From<std::path::PathBuf> for PathBuf {
impl Into<std::path::PathBuf> for PathBuf {
fn into(self) -> std::path::PathBuf {
self.inner.into()
self.inner
}
}

@ -59,7 +59,7 @@ pub use crate::stream::Stream;
/// # }
/// # }
/// # }
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// impl ExactSizeStream for Counter {
/// // We can easily calculate the remaining number of iterations.
@ -74,7 +74,6 @@ pub use crate::stream::Stream;
///
/// assert_eq!(5, counter.len());
/// # });
/// # }
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]

@ -14,7 +14,7 @@ use crate::stream::IntoStream;
/// ## Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream::{self, Extend};
@ -25,7 +25,7 @@ use crate::stream::IntoStream;
///
/// assert_eq!(v, vec![1, 2, 3, 3, 3]);
/// #
/// # }) }
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]

@ -1,20 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that yields elements by calling a closure.
///
/// This stream is constructed by [`from_fn`] function.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Debug)]
pub struct FromFn<F, Fut, T> {
f: F,
future: Option<Fut>,
__t: PhantomData<T>,
pin_project! {
/// A stream that yields elements by calling a closure.
///
/// This stream is constructed by [`from_fn`] function.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Debug)]
pub struct FromFn<F, Fut, T> {
f: F,
#[pin]
future: Option<Fut>,
__t: PhantomData<T>,
}
}
/// Creates a new stream where to produce each new element a provided closure is called.
@ -25,7 +30,7 @@ pub struct FromFn<F, Fut, T> {
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::sync::Mutex;
@ -53,8 +58,7 @@ pub struct FromFn<F, Fut, T> {
/// assert_eq!(s.next().await, Some(3));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
///
/// # })
/// ```
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
where
@ -68,11 +72,6 @@ where
}
}
impl<F, Fut, T> FromFn<F, Fut, T> {
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);
}
impl<F, Fut, T> Stream for FromFn<F, Fut, T>
where
F: FnMut() -> Fut,
@ -80,20 +79,18 @@ where
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match &self.future {
Some(_) => {
let next =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
if this.future.is_some() {
let next =
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);
return Poll::Ready(next);
}
None => {
let fut = (self.as_mut().f())();
self.as_mut().future().set(Some(fut));
}
return Poll::Ready(next);
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
}

@ -4,8 +4,6 @@ use std::time::{Duration, Instant};
use futures_core::future::Future;
use futures_core::stream::Stream;
use pin_utils::unsafe_pinned;
use futures_timer::Delay;
/// Creates a new stream that yields at a set interval.
@ -62,15 +60,11 @@ pub struct Interval {
interval: Duration,
}
impl Interval {
unsafe_pinned!(delay: Delay);
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if Pin::new(&mut *self).delay().poll(cx).is_pending() {
if Pin::new(&mut self.delay).poll(cx).is_pending() {
return Poll::Pending;
}
let when = Instant::now();

@ -1,5 +1,7 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
@ -24,20 +26,22 @@ pub fn once<T>(t: T) -> Once<T> {
Once { value: Some(t) }
}
/// A stream that yields a single item.
///
/// This stream is constructed by the [`once`] function.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
pub struct Once<T> {
value: Option<T>,
pin_project! {
/// A stream that yields a single item.
///
/// This stream is constructed by the [`once`] function.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
pub struct Once<T> {
value: Option<T>,
}
}
impl<T: Unpin> Stream for Once<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.value.take())
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.project().value.take())
}
}

@ -1,20 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is constructed by the [`repeat_with`] function.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)]
pub struct RepeatWith<F, Fut, A> {
f: F,
future: Option<Fut>,
__a: PhantomData<A>,
pin_project! {
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is constructed by the [`repeat_with`] function.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)]
pub struct RepeatWith<F, Fut, A> {
f: F,
#[pin]
future: Option<Fut>,
__a: PhantomData<A>,
}
}
/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
@ -24,7 +29,7 @@ pub struct RepeatWith<F, Fut, A> {
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
@ -37,13 +42,13 @@ pub struct RepeatWith<F, Fut, A> {
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// # }) }
/// # })
/// ```
///
/// Going finite:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
@ -55,7 +60,7 @@ pub struct RepeatWith<F, Fut, A> {
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// # }) }
/// # })
/// ```
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A>
where
@ -69,11 +74,6 @@ where
}
}
impl<F, Fut, A> RepeatWith<F, Fut, A> {
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);
}
impl<F, Fut, A> Stream for RepeatWith<F, Fut, A>
where
F: FnMut() -> Fut,
@ -81,22 +81,19 @@ where
{
type Item = A;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match &self.future {
Some(_) => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
if this.future.is_some() {
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
this.future.set(None);
return Poll::Ready(Some(res));
}
None => {
let fut = (self.as_mut().f())();
return Poll::Ready(Some(res));
} else {
let fut = (this.f)();
self.as_mut().future().set(Some(fut));
}
this.future.set(Some(fut));
}
}
}

@ -1,20 +1,23 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::prelude::*;
use crate::task::{Context, Poll};
/// Chains two streams one after another.
#[derive(Debug)]
pub struct Chain<S, U> {
first: Fuse<S>,
second: Fuse<U>,
pin_project! {
/// Chains two streams one after another.
#[derive(Debug)]
pub struct Chain<S, U> {
#[pin]
first: Fuse<S>,
#[pin]
second: Fuse<U>,
}
}
impl<S: Stream, U: Stream> Chain<S, U> {
pin_utils::unsafe_pinned!(first: Fuse<S>);
pin_utils::unsafe_pinned!(second: Fuse<U>);
pub(super) fn new(first: S, second: U) -> Self {
Chain {
first: first.fuse(),
@ -26,22 +29,23 @@ impl<S: Stream, U: Stream> Chain<S, U> {
impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.first.done {
let next = futures_core::ready!(self.as_mut().first().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if !this.first.done {
let next = futures_core::ready!(this.first.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if !self.second.done {
let next = futures_core::ready!(self.as_mut().second().poll_next(cx));
if !this.second.done {
let next = futures_core::ready!(this.second.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if self.first.done && self.second.done {
if this.first.done && this.second.done {
return Poll::Ready(None);
}

@ -1,29 +1,30 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
// Lexicographically compares the elements of this `Stream` with those
// of another using `Ord`.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct CmpFuture<L: Stream, R: Stream> {
l: Fuse<L>,
r: Fuse<R>,
l_cache: Option<L::Item>,
r_cache: Option<R::Item>,
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another using `Ord`.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct CmpFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
l_cache: Option<L::Item>,
r_cache: Option<R::Item>,
}
}
impl<L: Stream, R: Stream> CmpFuture<L, R> {
pin_utils::unsafe_pinned!(l: Fuse<L>);
pin_utils::unsafe_pinned!(r: Fuse<R>);
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
pub(super) fn new(l: L, r: R) -> Self {
CmpFuture {
l: l.fuse(),
@ -42,11 +43,12 @@ where
{
type Output = Ordering;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
// Stream that completes earliest can be considered Less, etc
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
let l_complete = this.l.done && this.l_cache.is_none();
let r_complete = this.r.done && this.r_cache.is_none();
if l_complete && r_complete {
return Poll::Ready(Ordering::Equal);
@ -57,30 +59,30 @@ where
}
// Get next value if possible and necesary
if !self.l.done && self.as_mut().l_cache.is_none() {
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
if !this.l.done && this.l_cache.is_none() {
let l_next = futures_core::ready!(this.l.as_mut().poll_next(cx));
if let Some(item) = l_next {
*self.as_mut().l_cache() = Some(item);
*this.l_cache = Some(item);
}
}
if !self.r.done && self.as_mut().r_cache.is_none() {
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
if !this.r.done && this.r_cache.is_none() {
let r_next = futures_core::ready!(this.r.as_mut().poll_next(cx));
if let Some(item) = r_next {
*self.as_mut().r_cache() = Some(item);
*this.r_cache = Some(item);
}
}
// Compare if both values are available.
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
let l_value = self.as_mut().l_cache().take().unwrap();
let r_value = self.as_mut().r_cache().take().unwrap();
if this.l_cache.is_some() && this.r_cache.is_some() {
let l_value = this.l_cache.take().unwrap();
let r_value = this.r_cache.take().unwrap();
let result = l_value.cmp(&r_value);
if let Ordering::Equal = result {
// Reset cache to prepare for next comparison
*self.as_mut().l_cache() = None;
*self.as_mut().r_cache() = None;
*this.l_cache = None;
*this.r_cache = None;
} else {
// Return non equal value
return Poll::Ready(result);

@ -1,19 +1,21 @@
use crate::task::{Context, Poll};
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Enumerate<S> {
stream: S,
i: usize,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Enumerate<S> {
#[pin]
stream: S,
i: usize,
}
}
impl<S> Enumerate<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(i: usize);
pub(super) fn new(stream: S) -> Self {
Enumerate { stream, i: 0 }
}
@ -25,13 +27,14 @@ where
{
type Item = (usize, S::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) => {
let ret = (self.i, v);
*self.as_mut().i() += 1;
let ret = (*this.i, v);
*this.i += 1;
Poll::Ready(Some(ret))
}
None => Poll::Ready(None),

@ -1,21 +1,23 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream to filter elements of another stream with a predicate.
#[derive(Debug)]
pub struct Filter<S, P, T> {
stream: S,
predicate: P,
__t: PhantomData<T>,
pin_project! {
/// A stream to filter elements of another stream with a predicate.
#[derive(Debug)]
pub struct Filter<S, P, T> {
#[pin]
stream: S,
predicate: P,
__t: PhantomData<T>,
}
}
impl<S, P, T> Filter<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: P);
pub(super) fn new(stream: S, predicate: P) -> Self {
Filter {
stream,
@ -32,11 +34,12 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)),
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(v)),
Some(_) => {
cx.waker().wake_by_ref();
Poll::Pending

@ -2,21 +2,23 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::stream::Stream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> {
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FilterMap<S, F, T, B> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}
}
impl<S, F, T, B> FilterMap<S, F, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap {
stream,
@ -34,10 +36,11 @@ where
{
type Item = B;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) => match (self.as_mut().f())(v) {
Some(v) => match (this.f)(v) {
Some(b) => Poll::Ready(Some(b)),
None => {
cx.waker().wake_by_ref();

@ -1,24 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FoldFuture<S, F, T, B> {
stream: S,
f: F,
acc: Option<B>,
__t: PhantomData<T>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FoldFuture<S, F, T, B> {
#[pin]
stream: S,
f: F,
acc: Option<B>,
__t: PhantomData<T>,
}
}
impl<S, F, T, B> FoldFuture<S, F, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_unpinned!(acc: Option<B>);
pub(super) fn new(stream: S, init: B, f: F) -> Self {
FoldFuture {
stream,
@ -36,17 +37,18 @@ where
{
type Output = B;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => {
let old = self.as_mut().acc().take().unwrap();
let new = (self.as_mut().f())(old, v);
*self.as_mut().acc() = Some(new);
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
*this.acc = Some(new);
}
None => return Poll::Ready(self.as_mut().acc().take().unwrap()),
None => return Poll::Ready(this.acc.take().unwrap()),
}
}
}

@ -1,22 +1,24 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ForEachFuture<S, F, T> {
stream: S,
f: F,
__t: PhantomData<T>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ForEachFuture<S, F, T> {
#[pin]
stream: S,
f: F,
__t: PhantomData<T>,
}
}
impl<S, F, T> ForEachFuture<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: S, f: F) -> Self {
ForEachFuture {
stream,
@ -33,12 +35,13 @@ where
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => (self.as_mut().f())(v),
Some(v) => (this.f)(v),
None => return Poll::Ready(()),
}
}

@ -1,33 +1,32 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A `Stream` that is permanently closed once a single call to `poll` results in
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
#[derive(Clone, Debug)]
pub struct Fuse<S> {
pub(crate) stream: S,
pub(crate) done: bool,
}
impl<S: Unpin> Unpin for Fuse<S> {}
impl<S: Stream> Fuse<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(done: bool);
pin_project! {
/// A `Stream` that is permanently closed once a single call to `poll` results in
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
#[derive(Clone, Debug)]
pub struct Fuse<S> {
#[pin]
pub(crate) stream: S,
pub(crate) done: bool,
}
}
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
if self.done {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.poll_next(cx));
if next.is_none() {
*self.as_mut().done() = true;
*this.done = true;
}
Poll::Ready(next)
}

@ -1,26 +1,29 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
// Determines if the elements of this `Stream` are lexicographically
// greater than or equal to those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct GeFuture<L: Stream, R: Stream> {
partial_cmp: PartialCmpFuture<L, R>,
pin_project! {
// Determines if the elements of this `Stream` are lexicographically
// greater than or equal to those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct GeFuture<L: Stream, R: Stream> {
#[pin]
partial_cmp: PartialCmpFuture<L, R>,
}
}
impl<L: Stream, R: Stream> GeFuture<L, R>
where
L::Item: PartialOrd<R::Item>,
{
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
pub(super) fn new(l: L, r: R) -> Self {
GeFuture {
partial_cmp: l.partial_cmp(r),
@ -30,14 +33,14 @@ where
impl<L: Stream, R: Stream> Future for GeFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L: Stream,
R: Stream,
L::Item: PartialOrd<R::Item>,
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
match result {
Some(Ordering::Greater) | Some(Ordering::Equal) => Poll::Ready(true),

@ -1,26 +1,29 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
// Determines if the elements of this `Stream` are lexicographically
// greater than those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct GtFuture<L: Stream, R: Stream> {
partial_cmp: PartialCmpFuture<L, R>,
pin_project! {
// Determines if the elements of this `Stream` are lexicographically
// greater than those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct GtFuture<L: Stream, R: Stream> {
#[pin]
partial_cmp: PartialCmpFuture<L, R>,
}
}
impl<L: Stream, R: Stream> GtFuture<L, R>
where
L::Item: PartialOrd<R::Item>,
{
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
pub(super) fn new(l: L, r: R) -> Self {
GtFuture {
partial_cmp: l.partial_cmp(r),
@ -36,8 +39,8 @@ where
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
match result {
Some(Ordering::Greater) => Poll::Ready(true),

@ -1,21 +1,23 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that does something with each element of another stream.
#[derive(Debug)]
pub struct Inspect<S, F, T> {
stream: S,
f: F,
__t: PhantomData<T>,
pin_project! {
/// A stream that does something with each element of another stream.
#[derive(Debug)]
pub struct Inspect<S, F, T> {
#[pin]
stream: S,
f: F,
__t: PhantomData<T>,
}
}
impl<S, F, T> Inspect<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: S, f: F) -> Self {
Inspect {
stream,
@ -32,11 +34,12 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
Poll::Ready(next.and_then(|x| {
(self.as_mut().f())(&x);
(this.f)(&x);
Some(x)
}))
}

@ -1,20 +1,22 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LastFuture<S, T> {
stream: S,
last: Option<T>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LastFuture<S, T> {
#[pin]
stream: S,
last: Option<T>,
}
}
impl<S, T> LastFuture<S, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(last: Option<T>);
pub(crate) fn new(stream: S) -> Self {
LastFuture { stream, last: None }
}
@ -27,16 +29,17 @@ where
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
*self.as_mut().last() = Some(new);
*this.last = Some(new);
Poll::Pending
}
None => Poll::Ready(self.last),
None => Poll::Ready(*this.last),
}
}
}

@ -1,26 +1,29 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// Determines if the elements of this `Stream` are lexicographically
/// less or equal to those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LeFuture<L: Stream, R: Stream> {
partial_cmp: PartialCmpFuture<L, R>,
pin_project! {
/// Determines if the elements of this `Stream` are lexicographically
/// less or equal to those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LeFuture<L: Stream, R: Stream> {
#[pin]
partial_cmp: PartialCmpFuture<L, R>,
}
}
impl<L: Stream, R: Stream> LeFuture<L, R>
where
L::Item: PartialOrd<R::Item>,
{
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
pub(super) fn new(l: L, r: R) -> Self {
LeFuture {
partial_cmp: l.partial_cmp(r),
@ -36,8 +39,8 @@ where
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
match result {
Some(Ordering::Less) | Some(Ordering::Equal) => Poll::Ready(true),

@ -1,26 +1,29 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
// Determines if the elements of this `Stream` are lexicographically
// less than those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LtFuture<L: Stream, R: Stream> {
partial_cmp: PartialCmpFuture<L, R>,
pin_project! {
// Determines if the elements of this `Stream` are lexicographically
// less than those of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct LtFuture<L: Stream, R: Stream> {
#[pin]
partial_cmp: PartialCmpFuture<L, R>,
}
}
impl<L: Stream, R: Stream> LtFuture<L, R>
where
L::Item: PartialOrd<R::Item>,
{
pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture<L, R>);
pub(super) fn new(l: L, r: R) -> Self {
LtFuture {
partial_cmp: l.partial_cmp(r),
@ -36,8 +39,8 @@ where
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = futures_core::ready!(self.project().partial_cmp.poll(cx));
match result {
Some(Ordering::Less) => Poll::Ready(true),

@ -1,22 +1,24 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Map<S, F, T, B> {
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Map<S, F, T, B> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}
}
impl<S, F, T, B> Map<S, F, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(crate) fn new(stream: S, f: F) -> Self {
Map {
stream,
@ -34,8 +36,9 @@ where
{
type Item = B;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
Poll::Ready(next.map(self.as_mut().f()))
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
Poll::Ready(next.map(this.f))
}
}

@ -2,22 +2,25 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;
/// A stream that merges two other streams into a single stream.
///
/// This stream is returned by [`Stream::merge`].
///
/// [`Stream::merge`]: trait.Stream.html#method.merge
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct Merge<L, R> {
left: L,
right: R,
pin_project! {
/// A stream that merges two other streams into a single stream.
///
/// This stream is returned by [`Stream::merge`].
///
/// [`Stream::merge`]: trait.Stream.html#method.merge
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct Merge<L, R> {
#[pin]
left: L,
#[pin]
right: R,
}
}
impl<L, R> Unpin for Merge<L, R> {}
impl<L, R> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self {
Self { left, right }
@ -26,19 +29,20 @@ impl<L, R> Merge<L, R> {
impl<L, R, T> Stream for Merge<L, R>
where
L: Stream<Item = T> + Unpin,
R: Stream<Item = T> + Unpin,
L: Stream<Item = T>,
R: Stream<Item = T>,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Poll::Ready(Some(item)) = this.left.poll_next(cx) {
// The first stream made progress. The Merge needs to be polled
// again to check the progress of the second stream.
cx.waker().wake_by_ref();
Poll::Ready(Some(item))
} else {
Pin::new(&mut self.right).poll_next(cx)
this.right.poll_next(cx)
}
}
}

@ -1,23 +1,24 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByFuture<S, F, T> {
stream: S,
compare: F,
min: Option<T>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByFuture<S, F, T> {
#[pin]
stream: S,
compare: F,
min: Option<T>,
}
}
impl<S, F, T> MinByFuture<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(compare: F);
pin_utils::unsafe_unpinned!(min: Option<T>);
pub(super) fn new(stream: S, compare: F) -> Self {
MinByFuture {
stream,
@ -35,22 +36,23 @@ where
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match self.as_mut().min().take() {
None => *self.as_mut().min() = Some(new),
Some(old) => match (&mut self.as_mut().compare())(&new, &old) {
Ordering::Less => *self.as_mut().min() = Some(new),
_ => *self.as_mut().min() = Some(old),
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match (this.compare)(&new, &old) {
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(self.min),
None => Poll::Ready(*this.min),
}
}
}

@ -1,23 +1,24 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByKeyFuture<S, T, K> {
stream: S,
min: Option<T>,
key_by: K,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByKeyFuture<S, T, K> {
#[pin]
stream: S,
min: Option<T>,
key_by: K,
}
}
impl<S, T, K> MinByKeyFuture<S, T, K> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(min: Option<T>);
pin_utils::unsafe_unpinned!(key_by: K);
pub(super) fn new(stream: S, key_by: K) -> Self {
MinByKeyFuture {
stream,
@ -35,24 +36,25 @@ where
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
let new = self.as_mut().key_by()(&new);
let new = (this.key_by)(&new);
cx.waker().wake_by_ref();
match self.as_mut().min().take() {
None => *self.as_mut().min() = Some(new),
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Less => *self.as_mut().min() = Some(new),
_ => *self.as_mut().min() = Some(old),
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(self.min),
None => Poll::Ready(*this.min),
}
}
}

@ -1,29 +1,30 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PartialCmpFuture<L: Stream, R: Stream> {
l: Fuse<L>,
r: Fuse<R>,
l_cache: Option<L::Item>,
r_cache: Option<R::Item>,
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PartialCmpFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
l_cache: Option<L::Item>,
r_cache: Option<R::Item>,
}
}
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
pin_utils::unsafe_pinned!(l: Fuse<L>);
pin_utils::unsafe_pinned!(r: Fuse<R>);
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
pub(super) fn new(l: L, r: R) -> Self {
PartialCmpFuture {
l: l.fuse(),
@ -42,12 +43,13 @@ where
{
type Output = Option<Ordering>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
// Short circuit logic
// Stream that completes earliest can be considered Less, etc
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
let l_complete = this.l.done && this.l_cache.is_none();
let r_complete = this.r.done && this.r_cache.is_none();
if l_complete && r_complete {
return Poll::Ready(Some(Ordering::Equal));
@ -58,30 +60,30 @@ where
}
// Get next value if possible and necesary
if !self.l.done && self.as_mut().l_cache.is_none() {
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
if !this.l.done && this.l_cache.is_none() {
let l_next = futures_core::ready!(this.l.as_mut().poll_next(cx));
if let Some(item) = l_next {
*self.as_mut().l_cache() = Some(item);
*this.l_cache = Some(item);
}
}
if !self.r.done && self.as_mut().r_cache.is_none() {
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
if !this.r.done && this.r_cache.is_none() {
let r_next = futures_core::ready!(this.r.as_mut().poll_next(cx));
if let Some(item) = r_next {
*self.as_mut().r_cache() = Some(item);
*this.r_cache = Some(item);
}
}
// Compare if both values are available.
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
let l_value = self.as_mut().l_cache().take().unwrap();
let r_value = self.as_mut().r_cache().take().unwrap();
if this.l_cache.is_some() && this.r_cache.is_some() {
let l_value = this.l_cache.as_mut().take().unwrap();
let r_value = this.r_cache.as_mut().take().unwrap();
let result = l_value.partial_cmp(&r_value);
if let Some(Ordering::Equal) = result {
// Reset cache to prepare for next comparison
*self.as_mut().l_cache() = None;
*self.as_mut().r_cache() = None;
*this.l_cache = None;
*this.r_cache = None;
} else {
// Return non equal value
return Poll::Ready(result);

@ -1,13 +1,18 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream to maintain state while polling another stream.
#[derive(Debug)]
pub struct Scan<S, St, F> {
stream: S,
state_f: (St, F),
pin_project! {
/// A stream to maintain state while polling another stream.
#[derive(Debug)]
pub struct Scan<S, St, F> {
#[pin]
stream: S,
state_f: (St, F),
}
}
impl<S, St, F> Scan<S, St, F> {
@ -17,13 +22,8 @@ impl<S, St, F> Scan<S, St, F> {
state_f: (initial_state, f),
}
}
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(state_f: (St, F));
}
impl<S: Unpin, St, F> Unpin for Scan<S, St, F> {}
impl<S, St, F, B> Stream for Scan<S, St, F>
where
S: Stream,
@ -31,11 +31,12 @@ where
{
type Item = B;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
let poll_result = self.as_mut().stream().poll_next(cx);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
let mut this = self.project();
let poll_result = this.stream.as_mut().poll_next(cx);
poll_result.map(|item| {
item.and_then(|item| {
let (state, f) = self.as_mut().state_f();
let (state, f) = this.state_f;
f(state, item)
})
})

@ -1,19 +1,21 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::stream::Stream;
/// A stream to skip first n elements of another stream.
#[derive(Debug)]
pub struct Skip<S> {
stream: S,
n: usize,
pin_project! {
/// A stream to skip first n elements of another stream.
#[derive(Debug)]
pub struct Skip<S> {
#[pin]
stream: S,
n: usize,
}
}
impl<S> Skip<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(n: usize);
pub(crate) fn new(stream: S, n: usize) -> Self {
Skip { stream, n }
}
@ -25,14 +27,15 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => match self.n {
Some(v) => match *this.n {
0 => return Poll::Ready(Some(v)),
_ => *self.as_mut().n() -= 1,
_ => *this.n -= 1,
},
None => return Poll::Ready(None),
}

@ -1,21 +1,23 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream to skip elements of another stream based on a predicate.
#[derive(Debug)]
pub struct SkipWhile<S, P, T> {
stream: S,
predicate: Option<P>,
__t: PhantomData<T>,
pin_project! {
/// A stream to skip elements of another stream based on a predicate.
#[derive(Debug)]
pub struct SkipWhile<S, P, T> {
#[pin]
stream: S,
predicate: Option<P>,
__t: PhantomData<T>,
}
}
impl<S, P, T> SkipWhile<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: Option<P>);
pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile {
stream,
@ -32,15 +34,16 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => match self.as_mut().predicate() {
Some(v) => match this.predicate {
Some(p) => {
if !p(&v) {
*self.as_mut().predicate() = None;
*this.predicate = None;
return Poll::Ready(Some(v));
}
}

@ -1,21 +1,22 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that steps a given amount of elements of another stream.
#[derive(Debug)]
pub struct StepBy<S> {
stream: S,
step: usize,
i: usize,
pin_project! {
/// A stream that steps a given amount of elements of another stream.
#[derive(Debug)]
pub struct StepBy<S> {
#[pin]
stream: S,
step: usize,
i: usize,
}
}
impl<S> StepBy<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(step: usize);
pin_utils::unsafe_unpinned!(i: usize);
pub(crate) fn new(stream: S, step: usize) -> Self {
StepBy {
stream,
@ -31,17 +32,18 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => match self.i {
Some(v) => match this.i {
0 => {
*self.as_mut().i() = self.step;
*this.i = *this.step;
return Poll::Ready(Some(v));
}
_ => *self.as_mut().i() -= 1,
_ => *this.i -= 1,
},
None => return Poll::Ready(None),
}

@ -1,33 +1,32 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that yields the first `n` items of another stream.
#[derive(Clone, Debug)]
pub struct Take<S> {
pub(crate) stream: S,
pub(crate) remaining: usize,
}
impl<S: Unpin> Unpin for Take<S> {}
impl<S: Stream> Take<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(remaining: usize);
pin_project! {
/// A stream that yields the first `n` items of another stream.
#[derive(Clone, Debug)]
pub struct Take<S> {
#[pin]
pub(crate) stream: S,
pub(crate) remaining: usize,
}
}
impl<S: Stream> Stream for Take<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
if self.remaining == 0 {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.remaining == 0 {
Poll::Ready(None)
} else {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(_) => *self.as_mut().remaining() -= 1,
None => *self.as_mut().remaining() = 0,
Some(_) => *this.remaining -= 1,
None => *this.remaining = 0,
}
Poll::Ready(next)
}

@ -1,21 +1,23 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that yields elements based on a predicate.
#[derive(Debug)]
pub struct TakeWhile<S, P, T> {
stream: S,
predicate: P,
__t: PhantomData<T>,
pin_project! {
/// A stream that yields elements based on a predicate.
#[derive(Debug)]
pub struct TakeWhile<S, P, T> {
#[pin]
stream: S,
predicate: P,
__t: PhantomData<T>,
}
}
impl<S, P, T> TakeWhile<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: P);
pub(super) fn new(stream: S, predicate: P) -> Self {
TakeWhile {
stream,
@ -32,11 +34,12 @@ where
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)),
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(v)),
Some(_) => {
cx.waker().wake_by_ref();
Poll::Pending

@ -1,24 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryFoldFuture<S, F, T> {
stream: S,
f: F,
acc: Option<T>,
__t: PhantomData<T>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryFoldFuture<S, F, T> {
#[pin]
stream: S,
f: F,
acc: Option<T>,
__t: PhantomData<T>,
}
}
impl<S, F, T> TryFoldFuture<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_unpinned!(acc: Option<T>);
pub(super) fn new(stream: S, init: T, f: F) -> Self {
TryFoldFuture {
stream,
@ -36,23 +37,22 @@ where
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some(v) => {
let old = self.as_mut().acc().take().unwrap();
let new = (self.as_mut().f())(old, v);
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
match new {
Ok(o) => {
*self.as_mut().acc() = Some(o);
}
Ok(o) => *this.acc = Some(o),
Err(e) => return Poll::Ready(Err(e)),
}
}
None => return Poll::Ready(Ok(self.as_mut().acc().take().unwrap())),
None => return Poll::Ready(Ok(this.acc.take().unwrap())),
}
}
}

@ -1,23 +1,25 @@
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryForEeachFuture<S, F, T, R> {
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<R>,
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryForEeachFuture<S, F, T, R> {
#[pin]
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<R>,
}
}
impl<S, F, T, R> TryForEeachFuture<S, F, T, R> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(crate) fn new(stream: S, f: F) -> Self {
TryForEeachFuture {
stream,
@ -36,14 +38,15 @@ where
{
type Output = Result<(), E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let item = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match item {
None => return Poll::Ready(Ok(())),
Some(v) => {
let res = (self.as_mut().f())(v);
let res = (this.f)(v);
if let Err(e) = res {
return Poll::Ready(Err(e));
}

@ -1,14 +1,20 @@
use std::fmt;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// An iterator that iterates two other iterators simultaneously.
pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>,
first: A,
second: B,
pin_project! {
/// An iterator that iterates two other iterators simultaneously.
pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>,
#[pin]
first: A,
#[pin]
second: B,
}
}
impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
@ -20,8 +26,6 @@ impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
}
}
impl<A: Stream + Unpin, B: Unpin> Unpin for Zip<A, B> {}
impl<A: Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self {
Zip {
@ -30,25 +34,22 @@ impl<A: Stream, B> Zip<A, B> {
second,
}
}
pin_utils::unsafe_unpinned!(item_slot: Option<A::Item>);
pin_utils::unsafe_pinned!(first: A);
pin_utils::unsafe_pinned!(second: B);
}
impl<A: Stream, B: Stream> Stream for Zip<A, B> {
type Item = (A::Item, B::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.as_mut().item_slot().is_none() {
match self.as_mut().first().poll_next(cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.item_slot.is_none() {
match this.first.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(item)) => *self.as_mut().item_slot() = Some(item),
Poll::Ready(Some(item)) => *this.item_slot = Some(item),
}
}
let second_item = futures_core::ready!(self.as_mut().second().poll_next(cx));
let first_item = self.as_mut().item_slot().take().unwrap();
let second_item = futures_core::ready!(this.second.poll_next(cx));
let first_item = this.item_slot.take().unwrap();
Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
}
}

@ -8,7 +8,7 @@ use crate::sync::Mutex;
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::{Arc, Barrier};
/// use async_std::task;
@ -30,7 +30,6 @@ use crate::sync::Mutex;
/// handle.await;
/// }
/// # });
/// # }
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
@ -120,7 +119,7 @@ impl Barrier {
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::{Arc, Barrier};
/// use async_std::task;
@ -142,7 +141,6 @@ impl Barrier {
/// handle.await;
/// }
/// # });
/// # }
/// ```
pub async fn wait(&self) -> BarrierWaitResult {
let mut lock = self.state.lock().await;
@ -190,7 +188,7 @@ impl BarrierWaitResult {
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::Barrier;
///
@ -198,7 +196,6 @@ impl BarrierWaitResult {
/// let barrier_wait_result = barrier.wait().await;
/// println!("{:?}", barrier_wait_result.is_leader());
/// # });
/// # }
/// ```
pub fn is_leader(&self) -> bool {
self.0

File diff suppressed because it is too large Load Diff

@ -40,5 +40,8 @@ mod rwlock;
cfg_unstable! {
pub use barrier::{Barrier, BarrierWaitResult};
pub use channel::{channel, Sender, Receiver};
mod barrier;
mod channel;
}

@ -7,6 +7,7 @@ use std::task::{RawWaker, RawWakerVTable};
use std::thread;
use crossbeam_utils::sync::Parker;
use pin_project_lite::pin_project;
use super::task;
use super::task_local;
@ -100,19 +101,18 @@ where
}
}
struct CatchUnwindFuture<F> {
future: F,
}
impl<F> CatchUnwindFuture<F> {
pin_utils::unsafe_pinned!(future: F);
pin_project! {
struct CatchUnwindFuture<F> {
#[pin]
future: F,
}
}
impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
type Output = thread::Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
panic::catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
panic::catch_unwind(AssertUnwindSafe(|| self.project().future.poll(cx)))?.map(Ok)
}
}

@ -18,13 +18,13 @@ use std::pin::Pin;
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// task::yield_now().await;
/// #
/// # }) }
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]

@ -0,0 +1,350 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_std::sync::channel;
use async_std::task;
use rand::{thread_rng, Rng};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn smoke() {
task::block_on(async {
let (s, r) = channel(1);
s.send(7).await;
assert_eq!(r.recv().await, Some(7));
s.send(8).await;
assert_eq!(r.recv().await, Some(8));
drop(s);
assert_eq!(r.recv().await, None);
})
}
#[test]
fn capacity() {
for i in 1..10 {
let (s, r) = channel::<()>(i);
assert_eq!(s.capacity(), i);
assert_eq!(r.capacity(), i);
}
}
#[test]
fn len_empty_full() {
task::block_on(async {
let (s, r) = channel(2);
assert_eq!(s.len(), 0);
assert_eq!(s.is_empty(), true);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 0);
assert_eq!(r.is_empty(), true);
assert_eq!(r.is_full(), false);
s.send(()).await;
assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
s.send(()).await;
assert_eq!(s.len(), 2);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), true);
assert_eq!(r.len(), 2);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), true);
r.recv().await;
assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
})
}
#[test]
fn recv() {
task::block_on(async {
let (s, r) = channel(100);
task::spawn(async move {
assert_eq!(r.recv().await, Some(7));
task::sleep(ms(1000)).await;
assert_eq!(r.recv().await, Some(8));
task::sleep(ms(1000)).await;
assert_eq!(r.recv().await, Some(9));
assert_eq!(r.recv().await, None);
});
task::sleep(ms(1500)).await;
s.send(7).await;
s.send(8).await;
s.send(9).await;
})
}
#[test]
fn send() {
task::block_on(async {
let (s, r) = channel(1);
task::spawn(async move {
s.send(7).await;
task::sleep(ms(1000)).await;
s.send(8).await;
task::sleep(ms(1000)).await;
s.send(9).await;
task::sleep(ms(1000)).await;
s.send(10).await;
});
task::sleep(ms(1500)).await;
assert_eq!(r.recv().await, Some(7));
assert_eq!(r.recv().await, Some(8));
assert_eq!(r.recv().await, Some(9));
})
}
#[test]
fn recv_after_disconnect() {
task::block_on(async {
let (s, r) = channel(100);
s.send(1).await;
s.send(2).await;
s.send(3).await;
drop(s);
assert_eq!(r.recv().await, Some(1));
assert_eq!(r.recv().await, Some(2));
assert_eq!(r.recv().await, Some(3));
assert_eq!(r.recv().await, None);
})
}
#[test]
fn len() {
const COUNT: usize = 25_000;
const CAP: usize = 1000;
task::block_on(async {
let (s, r) = channel(CAP);
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
for _ in 0..CAP / 10 {
for i in 0..50 {
s.send(i).await;
assert_eq!(s.len(), i + 1);
}
for i in 0..50 {
r.recv().await;
assert_eq!(r.len(), 50 - i - 1);
}
}
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
for i in 0..CAP {
s.send(i).await;
assert_eq!(s.len(), i + 1);
}
for _ in 0..CAP {
r.recv().await.unwrap();
}
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
let child = task::spawn({
let r = r.clone();
async move {
for i in 0..COUNT {
assert_eq!(r.recv().await, Some(i));
let len = r.len();
assert!(len <= CAP);
}
}
});
for i in 0..COUNT {
s.send(i).await;
let len = s.len();
assert!(len <= CAP);
}
child.await;
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
})
}
#[test]
fn disconnect_wakes_receiver() {
task::block_on(async {
let (s, r) = channel::<()>(1);
let child = task::spawn(async move {
assert_eq!(r.recv().await, None);
});
task::sleep(ms(1000)).await;
drop(s);
child.await;
})
}
#[test]
fn spsc() {
const COUNT: usize = 100_000;
task::block_on(async {
let (s, r) = channel(3);
let child = task::spawn(async move {
for i in 0..COUNT {
assert_eq!(r.recv().await, Some(i));
}
assert_eq!(r.recv().await, None);
});
for i in 0..COUNT {
s.send(i).await;
}
drop(s);
child.await;
})
}
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const TASKS: usize = 4;
task::block_on(async {
let (s, r) = channel::<usize>(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
let v = Arc::new(v);
let mut tasks = Vec::new();
for _ in 0..TASKS {
let r = r.clone();
let v = v.clone();
tasks.push(task::spawn(async move {
for _ in 0..COUNT {
let n = r.recv().await.unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
}
}));
}
for _ in 0..TASKS {
let s = s.clone();
tasks.push(task::spawn(async move {
for i in 0..COUNT {
s.send(i).await;
}
}));
}
for t in tasks {
t.await;
}
for c in v.iter() {
assert_eq!(c.load(Ordering::SeqCst), TASKS);
}
});
}
#[test]
fn oneshot() {
const COUNT: usize = 10_000;
task::block_on(async {
for _ in 0..COUNT {
let (s, r) = channel(1);
let c1 = task::spawn(async move { r.recv().await.unwrap() });
let c2 = task::spawn(async move { s.send(0).await });
c1.await;
c2.await;
}
})
}
#[test]
fn drops() {
const RUNS: usize = 100;
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::SeqCst);
}
}
let mut rng = thread_rng();
for _ in 0..RUNS {
task::block_on(async {
let steps = rng.gen_range(0, 10_000);
let additional = rng.gen_range(0, 50);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = channel::<DropCounter>(50);
let child = task::spawn({
let r = r.clone();
async move {
for _ in 0..steps {
r.recv().await.unwrap();
}
}
});
for _ in 0..steps {
s.send(DropCounter).await;
}
child.await;
for _ in 0..additional {
s.send(DropCounter).await;
}
assert_eq!(DROPS.load(Ordering::SeqCst), steps);
drop(s);
drop(r);
assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
})
}
}
Loading…
Cancel
Save