diff --git a/src/io/copy.rs b/src/io/copy.rs index f05ed0e1..e8d5d733 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -7,6 +7,12 @@ use crate::io::{self, BufRead, BufReader, Read, Write}; use crate::task::{Context, Poll}; use crate::utils::Context as _; +// Note: There are two otherwise-identical implementations of this +// function because unstable has removed the `?Sized` bound for the +// reader and writer and accepts `R` and `W` instead of `&mut R` and +// `&mut W`. If making a change to either of the implementations, +// ensure that you copy it into the other. + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then @@ -57,6 +63,7 @@ where #[pin] writer: W, amt: u64, + reader_eof: bool } } @@ -69,13 +76,20 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + loop { - let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if buffer.is_empty() { + if *this.reader_eof { futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; return Poll::Ready(Ok(*this.amt)); } + let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; + + if buffer.is_empty() { + *this.reader_eof = true; + continue; + } + let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); @@ -89,6 +103,7 @@ where let future = CopyFuture { reader: BufReader::new(reader), writer, + reader_eof: false, amt: 0, }; future.await.context(|| String::from("io::copy failed")) @@ -144,6 +159,7 @@ where #[pin] writer: W, amt: u64, + reader_eof: bool } } @@ -156,13 +172,20 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + loop { - let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if buffer.is_empty() { + if *this.reader_eof { futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; return Poll::Ready(Ok(*this.amt)); } + let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; + + if buffer.is_empty() { + *this.reader_eof = true; + continue; + } + let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); @@ -176,6 +199,7 @@ where let future = CopyFuture { reader: BufReader::new(reader), writer, + reader_eof: false, amt: 0, }; future.await.context(|| String::from("io::copy failed")) diff --git a/tests/io_copy.rs b/tests/io_copy.rs new file mode 100644 index 00000000..394c34f8 --- /dev/null +++ b/tests/io_copy.rs @@ -0,0 +1,72 @@ +use std::{ + io::Result, + pin::Pin, + task::{Context, Poll}, +}; + +struct ReaderThatPanicsAfterEof { + read_count: usize, + has_sent_eof: bool, + max_read: usize, +} + +impl async_std::io::Read for ReaderThatPanicsAfterEof { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if self.has_sent_eof { + panic!("this should be unreachable because we should not poll after eof (Ready(Ok(0)))") + } else if self.read_count >= self.max_read { + self.has_sent_eof = true; + Poll::Ready(Ok(0)) + } else { + self.read_count += 1; + Poll::Ready(Ok(buf.len())) + } + } +} + +struct WriterThatTakesAWhileToFlush { + max_flush: usize, + flush_count: usize, +} + +impl async_std::io::Write for WriterThatTakesAWhileToFlush { + fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.flush_count += 1; + if self.flush_count >= self.max_flush { + Poll::Ready(Ok(())) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[test] +fn io_copy_does_not_poll_after_eof() { + async_std::task::block_on(async { + let mut reader = ReaderThatPanicsAfterEof { + has_sent_eof: false, + max_read: 10, + read_count: 0, + }; + + let mut writer = WriterThatTakesAWhileToFlush { + flush_count: 0, + max_flush: 10, + }; + + assert!(async_std::io::copy(&mut reader, &mut writer).await.is_ok()); + }) +}