From 20cdf73bb053d1e6a9468775d96950327bd252e6 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 4 Nov 2019 02:40:55 +0100 Subject: [PATCH] Simplify RwLock using WakerSet (#440) --- src/sync/rwlock.rs | 290 +++++++++++++----------------------------- src/sync/waker_set.rs | 36 ++++-- 2 files changed, 115 insertions(+), 211 deletions(-) diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index a0d0f07..81b0735 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -1,26 +1,21 @@ use std::cell::UnsafeCell; use std::fmt; +use std::isize; use std::ops::{Deref, DerefMut}; use std::pin::Pin; +use std::process; use std::sync::atomic::{AtomicUsize, Ordering}; -use slab::Slab; - use crate::future::Future; -use crate::task::{Context, Poll, Waker}; +use crate::sync::WakerSet; +use crate::task::{Context, Poll}; /// Set if a write lock is held. #[allow(clippy::identity_op)] const WRITE_LOCK: usize = 1 << 0; -/// Set if there are read operations blocked on the lock. -const BLOCKED_READS: usize = 1 << 1; - -/// Set if there are write operations blocked on the lock. -const BLOCKED_WRITES: usize = 1 << 2; - /// The value of a single blocked read contributing to the read count. -const ONE_READ: usize = 1 << 3; +const ONE_READ: usize = 1 << 1; /// The bits in which the read count is stored. const READ_COUNT_MASK: usize = !(ONE_READ - 1); @@ -56,8 +51,8 @@ const READ_COUNT_MASK: usize = !(ONE_READ - 1); /// ``` pub struct RwLock { state: AtomicUsize, - reads: std::sync::Mutex>>, - writes: std::sync::Mutex>>, + read_wakers: WakerSet, + write_wakers: WakerSet, value: UnsafeCell, } @@ -77,8 +72,8 @@ impl RwLock { pub fn new(t: T) -> RwLock { RwLock { state: AtomicUsize::new(0), - reads: std::sync::Mutex::new(Slab::new()), - writes: std::sync::Mutex::new(Slab::new()), + read_wakers: WakerSet::new(), + write_wakers: WakerSet::new(), value: UnsafeCell::new(t), } } @@ -104,100 +99,61 @@ impl RwLock { /// # }) /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - pub struct LockFuture<'a, T> { + pub struct ReadFuture<'a, T> { lock: &'a RwLock, opt_key: Option, - acquired: bool, } - impl<'a, T> Future for LockFuture<'a, T> { + impl<'a, T> Future for ReadFuture<'a, T> { type Output = RwLockReadGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.lock.try_read() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + let poll = match self.lock.try_read() { + Some(guard) => Poll::Ready(guard), None => { - let mut reads = self.lock.reads.lock().unwrap(); - - // Register the current task. + // Insert this lock operation. match self.opt_key { - None => { - // Insert a new entry into the list of blocked reads. - let w = cx.waker().clone(); - let key = reads.insert(Some(w)); - self.opt_key = Some(key); - - if reads.len() == 1 { - self.lock.state.fetch_or(BLOCKED_READS, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked reads. Just - // reset the waker if it was removed. - if reads[key].is_none() { - let w = cx.waker().clone(); - reads[key] = Some(w); - } - } + None => self.opt_key = Some(self.lock.read_wakers.insert(cx)), + Some(key) => self.lock.read_wakers.update(key, cx), } // Try locking again because it's possible the lock got unlocked just - // before the current task was registered as a blocked task. + // before the current task was inserted into the waker set. match self.lock.try_read() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + Some(guard) => Poll::Ready(guard), None => Poll::Pending, } } + }; + + if poll.is_ready() { + // If the current task is in the set, remove it. + if let Some(key) = self.opt_key.take() { + self.lock.read_wakers.complete(key); + } } + + poll } } - impl Drop for LockFuture<'_, T> { + impl Drop for ReadFuture<'_, T> { fn drop(&mut self) { + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { - let mut reads = self.lock.reads.lock().unwrap(); - let opt_waker = reads.remove(key); - - if reads.is_empty() { - self.lock.state.fetch_and(!BLOCKED_READS, Ordering::Relaxed); - } + self.lock.read_wakers.cancel(key); - if opt_waker.is_none() { - // We were awoken. Wake up another blocked read. - if let Some((_, opt_waker)) = reads.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - return; - } - } - drop(reads); - - if !self.acquired { - // We didn't acquire the lock and didn't wake another blocked read. - // Wake a blocked write instead. - let mut writes = self.lock.writes.lock().unwrap(); - if let Some((_, opt_waker)) = writes.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - return; - } - } - } + // If there are no active readers, wake one of the writers. + if self.lock.state.load(Ordering::SeqCst) & READ_COUNT_MASK == 0 { + self.lock.write_wakers.notify_one(); } } } } - LockFuture { + ReadFuture { lock: self, opt_key: None, - acquired: false, } .await } @@ -226,7 +182,7 @@ impl RwLock { /// # }) /// ``` pub fn try_read(&self) -> Option> { - let mut state = self.state.load(Ordering::Acquire); + let mut state = self.state.load(Ordering::SeqCst); loop { // If a write lock is currently held, then a read lock cannot be acquired. @@ -234,12 +190,17 @@ impl RwLock { return None; } + // Make sure the number of readers doesn't overflow. + if state > isize::MAX as usize { + process::abort(); + } + // Increment the number of active reads. match self.state.compare_exchange_weak( state, state + ONE_READ, - Ordering::AcqRel, - Ordering::Acquire, + Ordering::SeqCst, + Ordering::SeqCst, ) { Ok(_) => return Some(RwLockReadGuard(self)), Err(s) => state = s, @@ -268,99 +229,59 @@ impl RwLock { /// # }) /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - pub struct LockFuture<'a, T> { + pub struct WriteFuture<'a, T> { lock: &'a RwLock, opt_key: Option, - acquired: bool, } - impl<'a, T> Future for LockFuture<'a, T> { + impl<'a, T> Future for WriteFuture<'a, T> { type Output = RwLockWriteGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.lock.try_write() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + let poll = match self.lock.try_write() { + Some(guard) => Poll::Ready(guard), None => { - let mut writes = self.lock.writes.lock().unwrap(); - - // Register the current task. + // Insert this lock operation. match self.opt_key { - None => { - // Insert a new entry into the list of blocked writes. - let w = cx.waker().clone(); - let key = writes.insert(Some(w)); - self.opt_key = Some(key); - - if writes.len() == 1 { - self.lock.state.fetch_or(BLOCKED_WRITES, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked writes. Just - // reset the waker if it was removed. - if writes[key].is_none() { - let w = cx.waker().clone(); - writes[key] = Some(w); - } - } + None => self.opt_key = Some(self.lock.write_wakers.insert(cx)), + Some(key) => self.lock.write_wakers.update(key, cx), } // Try locking again because it's possible the lock got unlocked just - // before the current task was registered as a blocked task. + // before the current task was inserted into the waker set. match self.lock.try_write() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + Some(guard) => Poll::Ready(guard), None => Poll::Pending, } } + }; + + if poll.is_ready() { + // If the current task is in the set, remove it. + if let Some(key) = self.opt_key.take() { + self.lock.write_wakers.complete(key); + } } + + poll } } - impl Drop for LockFuture<'_, T> { + impl Drop for WriteFuture<'_, T> { fn drop(&mut self) { + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { - let mut writes = self.lock.writes.lock().unwrap(); - let opt_waker = writes.remove(key); - - if writes.is_empty() { - self.lock - .state - .fetch_and(!BLOCKED_WRITES, Ordering::Relaxed); - } - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another write. - if let Some((_, opt_waker)) = writes.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - return; - } - } - drop(writes); - - // There are no blocked writes. Wake a blocked read instead. - let mut reads = self.lock.reads.lock().unwrap(); - if let Some((_, opt_waker)) = reads.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - return; - } - } + if !self.lock.write_wakers.cancel(key) { + // If no other blocked reader was notified, notify all readers. + self.lock.read_wakers.notify_all(); } } } } - LockFuture { + WriteFuture { lock: self, opt_key: None, - acquired: false, } .await } @@ -389,24 +310,10 @@ impl RwLock { /// # }) /// ``` pub fn try_write(&self) -> Option> { - let mut state = self.state.load(Ordering::Acquire); - - loop { - // If any kind of lock is currently held, then a write lock cannot be acquired. - if state & (WRITE_LOCK | READ_COUNT_MASK) != 0 { - return None; - } - - // Set the write lock. - match self.state.compare_exchange_weak( - state, - state | WRITE_LOCK, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => return Some(RwLockWriteGuard(self)), - Err(s) => state = s, - } + if self.state.compare_and_swap(0, WRITE_LOCK, Ordering::SeqCst) == 0 { + Some(RwLockWriteGuard(self)) + } else { + None } } @@ -449,18 +356,15 @@ impl RwLock { impl fmt::Debug for RwLock { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.try_read() { - None => { - struct LockedPlaceholder; - impl fmt::Debug for LockedPlaceholder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - f.debug_struct("RwLock") - .field("data", &LockedPlaceholder) - .finish() + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") } + } + + match self.try_read() { + None => f.debug_struct("RwLock").field("data", &Locked).finish(), Some(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(), } } @@ -486,18 +390,11 @@ unsafe impl Sync for RwLockReadGuard<'_, T> {} impl Drop for RwLockReadGuard<'_, T> { fn drop(&mut self) { - let state = self.0.state.fetch_sub(ONE_READ, Ordering::AcqRel); - - // If this was the last read and there are blocked writes, wake one of them up. - if (state & READ_COUNT_MASK) == ONE_READ && state & BLOCKED_WRITES != 0 { - let mut writes = self.0.writes.lock().unwrap(); + let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst); - if let Some((_, opt_waker)) = writes.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } + // If this was the last read, wake one of the writers. + if state & READ_COUNT_MASK == ONE_READ { + self.0.write_wakers.notify_one(); } } } @@ -530,25 +427,12 @@ unsafe impl Sync for RwLockWriteGuard<'_, T> {} impl Drop for RwLockWriteGuard<'_, T> { fn drop(&mut self) { - let state = self.0.state.fetch_and(!WRITE_LOCK, Ordering::AcqRel); - - let mut guard = None; - - // Check if there are any blocked reads or writes. - if state & BLOCKED_READS != 0 { - guard = Some(self.0.reads.lock().unwrap()); - } else if state & BLOCKED_WRITES != 0 { - guard = Some(self.0.writes.lock().unwrap()); - } + self.0.state.store(0, Ordering::SeqCst); - // Wake up a single blocked task. - if let Some(mut guard) = guard { - if let Some((_, opt_waker)) = guard.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } + // Notify all blocked readers. + if !self.0.read_wakers.notify_all() { + // If there were no blocked readers, notify a blocked writer. + self.0.write_wakers.notify_one(); } } } diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 8fd1b62..eb44a67 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -95,8 +95,11 @@ impl WakerSet { } /// Removes the waker of a cancelled operation. - pub fn cancel(&self, key: usize) { + /// + /// Returns `true` if another blocked operation from the set was notified. + pub fn cancel(&self, key: usize) -> bool { let mut inner = self.lock(); + if inner.entries.remove(key).is_none() { inner.none_count -= 1; @@ -107,33 +110,45 @@ impl WakerSet { w.wake(); inner.none_count += 1; } + return true; } } + + false } /// Notifies one blocked operation. + /// + /// Returns `true` if an operation was notified. #[inline] - pub fn notify_one(&self) { + pub fn notify_one(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 { - self.notify(false); + self.notify(false) + } else { + false } } /// Notifies all blocked operations. - // TODO: Delete this attribute when `crate::sync::channel()` is stabilized. - #[cfg(feature = "unstable")] + /// + /// Returns `true` if at least one operation was notified. #[inline] - pub fn notify_all(&self) { + pub fn notify_all(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 { - self.notify(true); + self.notify(true) + } else { + false } } /// Notifies blocked operations, either one or all of them. - fn notify(&self, all: bool) { + /// + /// Returns `true` if at least one operation was notified. + fn notify(&self, all: bool) -> bool { let mut inner = &mut *self.lock(); + let mut notified = false; for (_, opt_waker) in inner.entries.iter_mut() { // If there is no waker in this entry, that means it was already woken. @@ -141,10 +156,15 @@ impl WakerSet { w.wake(); inner.none_count += 1; } + + notified = true; + if !all { break; } } + + notified } /// Locks the list of entries.