From 17c95a39d7e35d5ceb14b62660a91ef0fbced333 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Thu, 5 Sep 2019 01:23:27 +0200 Subject: [PATCH] More robust file implementation Signed-off-by: Yoshua Wuyts --- Cargo.toml | 3 +- examples/print-file.rs | 2 +- src/fs/file.rs | 906 +++++++++++++++++++++-------------------- src/net/driver/mod.rs | 12 +- 4 files changed, 466 insertions(+), 457 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 24badb1e..c737d04c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,10 +27,9 @@ unstable = [] async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" -futures-channel-preview = "0.3.0-alpha.18" futures-core-preview = "0.3.0-alpha.18" futures-io-preview = "0.3.0-alpha.18" -futures-timer = "0.3.0" +futures-timer = "0.4.0" lazy_static = "1.3.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" diff --git a/examples/print-file.rs b/examples/print-file.rs index d74bdd88..e2cdde79 100644 --- a/examples/print-file.rs +++ b/examples/print-file.rs @@ -7,7 +7,7 @@ use async_std::io; use async_std::prelude::*; use async_std::task; -const LEN: usize = 4 * 1024 * 1024; // 4 Mb +const LEN: usize = 16 * 1024; // 16 Kb fn main() -> io::Result<()> { let path = args().nth(1).expect("missing path argument"); diff --git a/src/fs/file.rs b/src/fs/file.rs index 09e2ad64..48ee2e15 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,17 +1,21 @@ -//! Types for working with files. +//! Async file implementation. +use std::cell::UnsafeCell; +use std::cmp; use std::fs; use std::io::{Read as _, Seek, SeekFrom, Write as _}; +use std::ops::{Deref, DerefMut}; use std::path::Path; use std::pin::Pin; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use cfg_if::cfg_if; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer}; -use crate::future::{self, Future}; -use crate::io; -use crate::task::{blocking, Context, Poll}; +use crate::future; +use crate::io::{self, Write}; +use crate::task::{self, blocking, Context, Poll, Waker}; /// A reference to a file on the filesystem. /// @@ -59,51 +63,8 @@ use crate::task::{blocking, Context, Poll}; /// ``` #[derive(Debug)] pub struct File { - mutex: Mutex, - - #[cfg(unix)] - raw_fd: std::os::unix::io::RawFd, - - #[cfg(windows)] - raw_handle: UnsafeShared, -} - -/// The state of an asynchronous file. -/// -/// The file can be either idle or busy performing an asynchronous operation. -#[derive(Debug)] -enum State { - /// The file is idle. - /// - /// If the inner representation is `None`, that means the file is closed. - Idle(Option), - - /// The file is blocked on an asynchronous operation. - /// - /// Awaiting this operation will result in the new state of the file. - Busy(blocking::JoinHandle), -} - -/// Inner representation of an asynchronous file. -#[derive(Debug)] -struct Inner { - /// The blocking file handle. - file: fs::File, - - /// The read/write buffer. - buf: Vec, - - /// The result of the last asynchronous operation on the file. - last_op: Option, -} - -/// Possible results of an asynchronous operation on a file. -#[derive(Debug)] -enum Operation { - Read(io::Result), - Write(io::Result), - Seek(io::Result), - Flush(io::Result<()>), + file: Arc, + lock: Lock, } impl File { @@ -132,28 +93,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(async move { fs::File::open(&path) }).await?; - - #[cfg(unix)] - let file = File { - raw_fd: file.as_raw_fd(), - mutex: Mutex::new(State::Idle(Some(Inner { - file, - buf: Vec::new(), - last_op: None, - }))), - }; - - #[cfg(windows)] - let file = File { - raw_handle: UnsafeShared(file.as_raw_handle()), - mutex: Mutex::new(State::Idle(Some(Inner { - file, - buf: Vec::new(), - last_op: None, - }))), - }; - - Ok(file) + Ok(file.into()) } /// Opens a file in write-only mode. @@ -178,28 +118,7 @@ impl File { pub async fn create>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(async move { fs::File::create(&path) }).await?; - - #[cfg(unix)] - let file = File { - raw_fd: file.as_raw_fd(), - mutex: Mutex::new(State::Idle(Some(Inner { - file, - buf: Vec::new(), - last_op: None, - }))), - }; - - #[cfg(windows)] - let file = File { - raw_handle: UnsafeShared(file.as_raw_handle()), - mutex: Mutex::new(State::Idle(Some(Inner { - file, - buf: Vec::new(), - last_op: None, - }))), - }; - - Ok(file) + Ok(file.into()) } /// Attempts to synchronize all OS-internal metadata to disk. @@ -225,35 +144,14 @@ impl File { /// # Ok(()) }) } /// ``` pub async fn sync_all(&self) -> io::Result<()> { - future::poll_fn(|cx| { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures_channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.sync_all(); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + // Drain the write cache before calling `sync_all()`. + let state = future::poll_fn(|cx| { + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_drain(cx) }) - .await - .ok_or_else(|| io_error("file closed"))? - .await - .map_err(|_| io_error("blocking task failed"))? + .await?; + + blocking::spawn(async move { state.file.sync_all() }).await } /// Similar to [`sync_all`], except that it may not synchronize file metadata. @@ -280,35 +178,14 @@ impl File { /// # Ok(()) }) } /// ``` pub async fn sync_data(&self) -> io::Result<()> { - future::poll_fn(|cx| { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures_channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.sync_data(); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + // Flush the write cache before calling `sync_data()`. + let state = future::poll_fn(|cx| { + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_flush(cx) }) - .await - .ok_or_else(|| io_error("file closed"))? - .await - .map_err(|_| io_error("blocking task failed"))? + .await?; + + blocking::spawn(async move { state.file.sync_data() }).await } /// Truncates or extends the underlying file. @@ -337,35 +214,15 @@ impl File { /// # Ok(()) }) } /// ``` pub async fn set_len(&self, size: u64) -> io::Result<()> { - future::poll_fn(|cx| { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures_channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.set_len(size); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + // Invalidate the read/write cache before calling `set_len()`. + let state = future::poll_fn(|cx| { + let state = futures_core::ready!(self.lock.poll_lock(cx)); + let state = futures_core::ready!(state.poll_unread(cx))?; + state.poll_drain(cx) }) - .await - .ok_or_else(|| io_error("file closed"))? - .await - .map_err(|_| io_error("blocking task failed"))? + .await?; + + blocking::spawn(async move { state.file.set_len(size) }).await } /// Queries metadata about the file. @@ -383,35 +240,8 @@ impl File { /// # Ok(()) }) } /// ``` pub async fn metadata(&self) -> io::Result { - future::poll_fn(|cx| { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures_channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.metadata(); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } - }) - .await - .ok_or_else(|| io_error("file closed"))? - .await - .map_err(|_| io_error("blocking task failed"))? + let file = self.file.clone(); + blocking::spawn(async move { file.metadata() }).await } /// Changes the permissions on the underlying file. @@ -437,38 +267,18 @@ impl File { /// # Ok(()) }) } /// ``` pub async fn set_permissions(&self, perm: fs::Permissions) -> io::Result<()> { - let mut perm = Some(perm); - - future::poll_fn(|cx| { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures_channel::oneshot::channel(); - let perm = perm.take().unwrap(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.set_permissions(perm); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } - }) - .await - .ok_or_else(|| io_error("file closed"))? - .await - .map_err(|_| io_error("blocking task failed"))? + let file = self.file.clone(); + blocking::spawn(async move { file.set_permissions(perm) }).await + } +} + +impl Drop for File { + fn drop(&mut self) { + // We need to flush the file on drop. Unfortunately, that is not possible to do in a + // non-blocking fashion, but our only other option here is data that is residing in the + // write cache. Good task schedulers should be resilient to occasional blocking hiccups in + // file destructors so we don't expect this to be a common problem in practice. + let _ = task::block_on(self.flush()); } } @@ -489,62 +299,12 @@ impl AsyncRead for File { impl AsyncRead for &File { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - // Grab a reference to the inner representation of the file or return an error - // if the file is closed. - let inner = opt.as_mut().ok_or_else(|| io_error("file closed"))?; - let mut offset = 0; - - // Check if the operation has completed. - if let Some(Operation::Read(res)) = inner.last_op.take() { - let n = res?; - - if n <= buf.len() { - // Copy the read data into the buffer and return. - buf[..n].copy_from_slice(&inner.buf[..n]); - return Poll::Ready(Ok(n)); - } - - // If more data was read than fits into the buffer, let's retry the read - // operation, but first move the cursor where it was before the previous - // read. - offset = n; - } - - let mut inner = opt.take().unwrap(); - - // Set the length of the inner buffer to the length of the provided buffer. - if inner.buf.len() < buf.len() { - inner.buf.reserve(buf.len() - inner.buf.len()); - } - unsafe { - inner.buf.set_len(buf.len()); - } - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - if offset > 0 { - let pos = SeekFrom::Current(-(offset as i64)); - let _ = Seek::seek(&mut inner.file, pos); - } - - let res = inner.file.read(&mut inner.buf); - inner.last_op = Some(Operation::Read(res)); - State::Idle(Some(inner)) - })); - } - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_read(cx, buf) } #[inline] @@ -573,112 +333,22 @@ impl AsyncWrite for File { impl AsyncWrite for &File { fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - // Grab a reference to the inner representation of the file or return an error - // if the file is closed. - let inner = opt.as_mut().ok_or_else(|| io_error("file closed"))?; - - // Check if the operation has completed. - if let Some(Operation::Write(res)) = inner.last_op.take() { - let n = res?; - - // If more data was written than is available in the buffer, let's retry - // the write operation. - if n <= buf.len() { - return Poll::Ready(Ok(n)); - } - } else { - let mut inner = opt.take().unwrap(); - - // Set the length of the inner buffer to the length of the provided buffer. - if inner.buf.len() < buf.len() { - inner.buf.reserve(buf.len() - inner.buf.len()); - } - unsafe { - inner.buf.set_len(buf.len()); - } - - // Copy the data to write into the inner buffer. - inner.buf[..buf.len()].copy_from_slice(buf); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.write(&mut inner.buf); - inner.last_op = Some(Operation::Write(res)); - State::Idle(Some(inner)) - })); - } - } - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - // Grab a reference to the inner representation of the file or return if the - // file is closed. - let inner = match opt.as_mut() { - None => return Poll::Ready(Ok(())), - Some(s) => s, - }; - - // Check if the operation has completed. - if let Some(Operation::Flush(res)) = inner.last_op.take() { - return Poll::Ready(res); - } else { - let mut inner = opt.take().unwrap(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.flush(); - inner.last_op = Some(Operation::Flush(res)); - State::Idle(Some(inner)) - })); - } - } - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_flush(cx).map(|res| res.map(drop)) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - // Grab a reference to the inner representation of the file or return if the - // file is closed. - let inner = match opt.take() { - None => return Poll::Ready(Ok(())), - Some(s) => s, - }; - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - drop(inner); - State::Idle(None) - })); - } - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_close(cx) } } @@ -694,69 +364,30 @@ impl AsyncSeek for File { impl AsyncSeek for &File { fn poll_seek( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { - let state = &mut *self.mutex.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => { - // Grab a reference to the inner representation of the file or return an error - // if the file is closed. - let inner = opt.as_mut().ok_or_else(|| io_error("file closed"))?; - - // Check if the operation has completed. - if let Some(Operation::Seek(res)) = inner.last_op.take() { - return Poll::Ready(res); - } else { - let mut inner = opt.take().unwrap(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file.seek(pos); - inner.last_op = Some(Operation::Seek(res)); - State::Idle(Some(inner)) - })); - } - } - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), - } - } + let state = futures_core::ready!(self.lock.poll_lock(cx)); + state.poll_seek(cx, pos) } } -/// Creates a custom `io::Error` with an arbitrary error type. -fn io_error(err: impl Into>) -> io::Error { - io::Error::new(io::ErrorKind::Other, err) -} - impl From for File { /// Converts a `std::fs::File` into its asynchronous equivalent. fn from(file: fs::File) -> File { - #[cfg(unix)] - let file = File { - raw_fd: file.as_raw_fd(), - mutex: Mutex::new(State::Idle(Some(Inner { - file, - buf: Vec::new(), - last_op: None, - }))), - }; - - #[cfg(windows)] - let file = File { - raw_handle: UnsafeShared(file.as_raw_handle()), - mutex: Mutex::new(State::Idle(Some(Inner { + let file = Arc::new(file); + File { + file: file.clone(), + lock: Lock::new(State { file, - buf: Vec::new(), - last_op: None, - }))), - }; - - file + mode: Mode::Idle, + cache: Vec::new(), + is_flushed: false, + last_read_err: None, + last_write_err: None, + }), + } } } @@ -776,7 +407,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for File { fn as_raw_fd(&self) -> RawFd { - self.raw_fd + self.file.as_raw_fd() } } @@ -788,7 +419,7 @@ cfg_if! { impl IntoRawFd for File { fn into_raw_fd(self) -> RawFd { - self.raw_fd + self.file.as_raw_fd() } } } @@ -799,7 +430,7 @@ cfg_if! { if #[cfg(any(windows, feature = "docs"))] { impl AsRawHandle for File { fn as_raw_handle(&self) -> RawHandle { - self.raw_handle.0 + self.file.as_raw_handle() } } @@ -811,14 +442,385 @@ cfg_if! { impl IntoRawHandle for File { fn into_raw_handle(self) -> RawHandle { - self.raw_handle.0 + self.file.as_raw_handle() + } + } + } +} + +/// An async mutex with non-borrowing lock guards. +#[derive(Debug)] +struct Lock(Arc>); + +unsafe impl Send for Lock {} +unsafe impl Sync for Lock {} + +#[derive(Debug)] +/// The state of the lock. +struct LockState { + /// Set to `true` when locked. + locked: AtomicBool, + + /// The inner value. + value: UnsafeCell, + + /// A list of tasks interested in locking. + wakers: Mutex>, +} + +impl Lock { + /// Creates a new lock with the given value. + fn new(value: T) -> Lock { + Lock(Arc::new(LockState { + locked: AtomicBool::new(false), + value: UnsafeCell::new(value), + wakers: Mutex::new(Vec::new()), + })) + } + + /// Attempts to acquire the lock. + fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { + // Try acquiring the lock. + if self.0.locked.swap(true, Ordering::Acquire) { + // Lock the list of wakers. + let mut list = self.0.wakers.lock().unwrap(); + + // Try acquiring the lock again. + if self.0.locked.swap(true, Ordering::Acquire) { + // If failed again, add the current task to the list and return. + if list.iter().all(|w| !w.will_wake(cx.waker())) { + list.push(cx.waker().clone()); + } + return Poll::Pending; + } + } + + // The lock was successfully aquired. + Poll::Ready(LockGuard(self.0.clone())) + } +} + +/// A lock guard. +/// +/// When dropped, ownership of the inner value is returned back to the lock. +#[derive(Debug)] +struct LockGuard(Arc>); + +unsafe impl Send for LockGuard {} +unsafe impl Sync for LockGuard {} + +impl LockGuard { + /// Registers a task interested in locking. + /// + /// When this lock guard gets dropped, all registered tasks will be woken up. + fn register(&self, cx: &Context<'_>) { + let mut list = self.0.wakers.lock().unwrap(); + + if list.iter().all(|w| !w.will_wake(cx.waker())) { + list.push(cx.waker().clone()); + } + } +} + +impl Drop for LockGuard { + fn drop(&mut self) { + self.0.locked.store(false, Ordering::Release); + + for w in self.0.wakers.lock().unwrap().drain(..) { + w.wake(); + } + } +} + +impl Deref for LockGuard { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.0.value.get() } + } +} + +impl DerefMut for LockGuard { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.0.value.get() } + } +} + +/// The current mode. +/// +/// The file can either be in idle mode, in reading mode, or writing mode. +#[derive(Debug)] +enum Mode { + /// The cache is empty. + Idle, + + /// The cache contains data read from the inner file. + /// + /// This `usize` represents how many bytes from the beginning of cache have been consumed. + Reading(usize), + + /// The cache contains data that needs to be written to the inner file. + Writing, +} + +/// The current state of a file. +/// +/// The `File` struct puts this state behind a lock. +/// +/// Filesystem operations that get spawned as blocking tasks will take ownership of the state and +/// return it back once the operation completes. +#[derive(Debug)] +struct State { + /// The inner file. + file: Arc, + + /// The current mode (idle, reading, or writing). + mode: Mode, + + /// The read/write cache. + /// + /// If in reading mode, the cache contains a chunk of data that has been read from the file. + /// If in writing mode, the cache contains data that will eventually be written into the file. + cache: Vec, + + /// `true` if the file is flushed. + /// + /// When a file is flushed, the write cache and the inner file's buffer are empty. + is_flushed: bool, + + /// The last read error that came from an async operation. + last_read_err: Option, + + /// The last write error that came from an async operation. + last_write_err: Option, +} + +impl LockGuard { + /// Seeks to a new position in the file. + fn poll_seek(mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + // If this operation doesn't move the cursor, then poll the current position inside the + // file. This call will hopefully not block. + if pos == SeekFrom::Current(0) { + return Poll::Ready((&*self.file).seek(pos)); + } + + // Invalidate the read/write cache before calling `seek()`. + self = futures_core::ready!(self.poll_unread(cx))?; + self = futures_core::ready!(self.poll_drain(cx))?; + + // Seek to the new position. This call is hopefully not blocking because it should just + // change the internal offset into the file and not touch the actual file. + Poll::Ready((&*self.file).seek(pos)) + } + + /// Reads some bytes from the file into a buffer. + fn poll_read(mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + // If an async operation has left a read error, return it now. + if let Some(err) = self.last_read_err.take() { + return Poll::Ready(Err(err)); + } + + match self.mode { + Mode::Idle => {} + Mode::Reading(start) => { + // How many bytes in the cache are available for reading. + let available = self.cache.len() - start; + + // If there is cached unconsumed data or if the cache is empty, we can read from + // it. Empty cache in reading mode indicates that the last operation didn't read + // any bytes, i.e. it reached the end of the file. + if available > 0 || self.cache.is_empty() { + // Copy data from the cache into the buffer. + let n = cmp::min(available, buf.len()); + buf[..n].copy_from_slice(&self.cache[start..n]); + + // Move the read cursor forward. + self.mode = Mode::Reading(start + n); + + return Poll::Ready(Ok(n)); + } + } + Mode::Writing => { + // If we're in writing mode, drain the write cache. + self = futures_core::ready!(self.poll_drain(cx))?; + } + } + + // Make the cache as long as `buf`. + if self.cache.len() < buf.len() { + let diff = buf.len() - self.cache.len(); + self.cache.reserve(diff); + } + unsafe { + self.cache.set_len(buf.len()); + } + + // Register current task's interest in the file lock. + self.register(cx); + + // Start a read operation asynchronously. + blocking::spawn(async move { + // Read some data from the file into the cache. + let res = { + let State { file, cache, .. } = &mut *self; + (&**file).read(cache) + }; + + match res { + Ok(n) => { + // Update cache length and switch to reading mode, starting from index 0. + unsafe { + self.cache.set_len(n); + } + self.mode = Mode::Reading(0); + } + Err(err) => { + // Save the error and switch to idle mode. + self.cache.clear(); + self.mode = Mode::Idle; + self.last_read_err = Some(err); + } + } + }); + + Poll::Pending + } + + /// Invalidates the read cache. + /// + /// This method will also move the file cursor backwards by the number of unconsumed bytes in + /// the read cache. + fn poll_unread(mut self, _: &mut Context<'_>) -> Poll> { + match self.mode { + Mode::Idle | Mode::Writing => Poll::Ready(Ok(self)), + Mode::Reading(start) => { + // Number of unconsumed bytes in the read cache. + let n = self.cache.len() - start; + + if n > 0 { + // Seek `n` bytes backwards. This call is hopefully not blocking because it + // should just change the internal offset into the file and not touch the + // actual file. + (&*self.file).seek(SeekFrom::Current(-(n as i64)))?; + } + + // Switch to idle mode. + self.cache.clear(); + self.mode = Mode::Idle; + + Poll::Ready(Ok(self)) + } + } + } + + /// Writes some data from a buffer into the file. + fn poll_write(mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + // If an async operation has left a write error, return it now. + if let Some(err) = self.last_write_err.take() { + return Poll::Ready(Err(err)); + } + + // If we're in reading mode, invalidate the read buffer. + self = futures_core::ready!(self.poll_unread(cx))?; + + // Make the cache have as much capacity as `buf`. + if self.cache.capacity() < buf.len() { + let diff = buf.len() - self.cache.capacity(); + self.cache.reserve(diff); + } + + // How many bytes can be written into the cache before filling up. + let available = self.cache.capacity() - self.cache.len(); + + // If there is available space in the cache or if the buffer is empty, we can write data + // into the cache. + if available > 0 || buf.is_empty() { + let n = cmp::min(available, buf.len()); + let start = self.cache.len(); + + // Copy data from the buffer into the cache. + unsafe { + self.cache.set_len(start + n); + } + self.cache[start..start + n].copy_from_slice(&buf[..n]); + + // Mark the file as not flushed and switch to writing mode. + self.is_flushed = false; + self.mode = Mode::Writing; + Poll::Ready(Ok(n)) + } else { + // Drain the write cache because it's full. + futures_core::ready!(self.poll_drain(cx))?; + Poll::Pending + } + } + + /// Drains the write cache. + fn poll_drain(mut self, cx: &mut Context<'_>) -> Poll> { + // If an async operation has left a write error, return it now. + if let Some(err) = self.last_write_err.take() { + return Poll::Ready(Err(err)); + } + + match self.mode { + Mode::Idle | Mode::Reading(..) => Poll::Ready(Ok(self)), + Mode::Writing => { + // Register current task's interest in the file lock. + self.register(cx); + + // Start a write operation asynchronously. + blocking::spawn(async move { + match (&*self.file).write_all(&self.cache) { + Ok(_) => { + // Switch to idle mode. + self.cache.clear(); + self.mode = Mode::Idle; + } + Err(err) => { + // Save the error. + self.last_write_err = Some(err); + } + }; + }); + + Poll::Pending } } + } + + /// Flushes the write cache into the file. + fn poll_flush(mut self, cx: &mut Context<'_>) -> Poll> { + // If the file is already in flushed state, do nothing. + if self.is_flushed { + return Poll::Ready(Ok(self)); + } - #[derive(Debug)] - struct UnsafeShared(T); + // If there is data in the write cache, drain in. + self = futures_core::ready!(self.poll_drain(cx))?; + + // Register current task's interest in the file lock. + self.register(cx); + + // Start a flush operation asynchronously. + blocking::spawn(async move { + match (&*self.file).flush() { + Ok(()) => { + // Mark the file as flushed. + self.is_flushed = true; + } + Err(err) => { + // Save the error. + self.last_write_err = Some(err); + } + } + }); + + Poll::Pending + } - unsafe impl Send for UnsafeShared {} - unsafe impl Sync for UnsafeShared {} + // This function does nothing because we're not sure about `AsyncWrite::poll_close()`'s + // semantics nor whether it will stay in the `AsyncWrite` trait. + fn poll_close(self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 04ab4bb6..7cf809a2 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -213,7 +213,11 @@ impl IoHandle { let mut readiness = mio::Ready::from_usize(self.entry.readiness.load(Ordering::SeqCst)); if (readiness & mask).is_empty() { - self.entry.readers.lock().unwrap().push(cx.waker().clone()); + let mut list = self.entry.readers.lock().unwrap(); + if list.iter().all(|w| !w.will_wake(cx.waker())) { + list.push(cx.waker().clone()); + } + readiness = mio::Ready::from_usize(self.entry.readiness.fetch_or(0, Ordering::SeqCst)); } @@ -250,7 +254,11 @@ impl IoHandle { let mut readiness = mio::Ready::from_usize(self.entry.readiness.load(Ordering::SeqCst)); if (readiness & mask).is_empty() { - self.entry.writers.lock().unwrap().push(cx.waker().clone()); + let mut list = self.entry.writers.lock().unwrap(); + if list.iter().all(|w| !w.will_wake(cx.waker())) { + list.push(cx.waker().clone()); + } + readiness = mio::Ready::from_usize(self.entry.readiness.fetch_or(0, Ordering::SeqCst)); }