diff --git a/src/sync/channel.rs b/src/sync/channel.rs index d4b6423..c326280 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -138,41 +138,34 @@ impl Sender { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let msg = self.msg.take().unwrap(); + loop { + let msg = self.msg.take().unwrap(); - // Try sending the message. - let poll = match self.channel.try_send(msg) { - Ok(()) => Poll::Ready(()), - Err(TrySendError::Disconnected(msg)) => { - self.msg = Some(msg); - Poll::Pending + // If the current task is in the set, remove it. + if let Some(key) = self.opt_key.take() { + self.channel.send_wakers.remove(key); } - Err(TrySendError::Full(msg)) => { - // Insert this send operation. - match self.opt_key { - None => self.opt_key = Some(self.channel.send_wakers.insert(cx)), - Some(key) => self.channel.send_wakers.update(key, cx), - } - // Try sending the message again. - match self.channel.try_send(msg) { - Ok(()) => Poll::Ready(()), - Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => { - self.msg = Some(msg); - Poll::Pending + // Try sending the message. + match self.channel.try_send(msg) { + Ok(()) => return Poll::Ready(()), + Err(TrySendError::Disconnected(msg)) => { + self.msg = Some(msg); + return Poll::Pending; + } + Err(TrySendError::Full(msg)) => { + self.msg = Some(msg); + + // Insert this send operation. + self.opt_key = Some(self.channel.send_wakers.insert(cx)); + + // If the channel is still full and not disconnected, return. + if self.channel.is_full() && !self.channel.is_disconnected() { + return Poll::Pending; } } } - }; - - if poll.is_ready() { - // If the current task is in the set, remove it. - if let Some(key) = self.opt_key.take() { - self.channel.send_wakers.complete(key); - } } - - poll } } @@ -543,34 +536,27 @@ fn poll_recv( opt_key: &mut Option, cx: &mut Context<'_>, ) -> Poll> { - // Try receiving a message. - let poll = match channel.try_recv() { - Ok(msg) => Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => Poll::Ready(None), - Err(TryRecvError::Empty) => { - // Insert this receive operation. - match *opt_key { - None => *opt_key = Some(wakers.insert(cx)), - Some(key) => wakers.update(key, cx), - } - - // Try receiving a message again. - match channel.try_recv() { - Ok(msg) => Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => Poll::Ready(None), - Err(TryRecvError::Empty) => Poll::Pending, - } - } - }; - - if poll.is_ready() { + loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { - wakers.complete(key); + wakers.remove(key); + } + + // Try receiving a message. + match channel.try_recv() { + Ok(msg) => return Poll::Ready(Some(msg)), + Err(TryRecvError::Disconnected) => return Poll::Ready(None), + Err(TryRecvError::Empty) => { + // Insert this receive operation. + *opt_key = Some(wakers.insert(cx)); + + // If the channel is still empty and not disconnected, return. + if channel.is_empty() && !channel.is_disconnected() { + return Poll::Pending; + } + } } } - - poll } /// A slot in a channel. @@ -862,6 +848,11 @@ impl Channel { } } + /// Returns `true` if the channel is disconnected. + pub fn is_disconnected(&self) -> bool { + self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 + } + /// Returns `true` if the channel is empty. fn is_empty(&self) -> bool { let head = self.head.load(Ordering::SeqCst); diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index fcd030d..52c3898 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -104,32 +104,26 @@ impl Mutex { type Output = MutexGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll = match self.mutex.try_lock() { - Some(guard) => Poll::Ready(guard), - None => { - // Insert this lock operation. - match self.opt_key { - None => self.opt_key = Some(self.mutex.wakers.insert(cx)), - Some(key) => self.mutex.wakers.update(key, cx), - } - - // Try locking again because it's possible the mutex got unlocked just - // before the current task was inserted into the waker set. - match self.mutex.try_lock() { - Some(guard) => Poll::Ready(guard), - None => Poll::Pending, - } - } - }; - - if poll.is_ready() { + loop { // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { - self.mutex.wakers.complete(key); + self.mutex.wakers.remove(key); + } + + // Try acquiring the lock. + match self.mutex.try_lock() { + Some(guard) => return Poll::Ready(guard), + None => { + // Insert this lock operation. + self.opt_key = Some(self.mutex.wakers.insert(cx)); + + // If the mutex is still locked, return. + if self.mutex.locked.load(Ordering::SeqCst) { + return Poll::Pending; + } + } } } - - poll } } @@ -266,8 +260,8 @@ impl Drop for MutexGuard<'_, T> { // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`. self.0.locked.store(false, Ordering::SeqCst); - // Notify one blocked `lock()` operation. - self.0.wakers.notify_one(); + // Notify a blocked `lock()` operation if none were notified already. + self.0.wakers.notify_any(); } } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 81b0735..65b9dca 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -108,32 +108,26 @@ impl RwLock { type Output = RwLockReadGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll = match self.lock.try_read() { - Some(guard) => Poll::Ready(guard), - None => { - // Insert this lock operation. - match self.opt_key { - None => self.opt_key = Some(self.lock.read_wakers.insert(cx)), - Some(key) => self.lock.read_wakers.update(key, cx), - } - - // Try locking again because it's possible the lock got unlocked just - // before the current task was inserted into the waker set. - match self.lock.try_read() { - Some(guard) => Poll::Ready(guard), - None => Poll::Pending, - } - } - }; - - if poll.is_ready() { + loop { // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { - self.lock.read_wakers.complete(key); + self.lock.read_wakers.remove(key); + } + + // Try acquiring a read lock. + match self.lock.try_read() { + Some(guard) => return Poll::Ready(guard), + None => { + // Insert this lock operation. + self.opt_key = Some(self.lock.read_wakers.insert(cx)); + + // If the lock is still acquired for writing, return. + if self.lock.state.load(Ordering::SeqCst) & WRITE_LOCK != 0 { + return Poll::Pending; + } + } } } - - poll } } @@ -143,9 +137,10 @@ impl RwLock { if let Some(key) = self.opt_key { self.lock.read_wakers.cancel(key); - // If there are no active readers, wake one of the writers. + // If there are no active readers, notify a blocked writer if none were + // notified already. if self.lock.state.load(Ordering::SeqCst) & READ_COUNT_MASK == 0 { - self.lock.write_wakers.notify_one(); + self.lock.write_wakers.notify_any(); } } } @@ -238,32 +233,26 @@ impl RwLock { type Output = RwLockWriteGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll = match self.lock.try_write() { - Some(guard) => Poll::Ready(guard), - None => { - // Insert this lock operation. - match self.opt_key { - None => self.opt_key = Some(self.lock.write_wakers.insert(cx)), - Some(key) => self.lock.write_wakers.update(key, cx), - } - - // Try locking again because it's possible the lock got unlocked just - // before the current task was inserted into the waker set. - match self.lock.try_write() { - Some(guard) => Poll::Ready(guard), - None => Poll::Pending, - } - } - }; - - if poll.is_ready() { + loop { // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { - self.lock.write_wakers.complete(key); + self.lock.write_wakers.remove(key); + } + + // Try acquiring a write lock. + match self.lock.try_write() { + Some(guard) => return Poll::Ready(guard), + None => { + // Insert this lock operation. + self.opt_key = Some(self.lock.write_wakers.insert(cx)); + + // If the lock is still acquired for reading or writing, return. + if self.lock.state.load(Ordering::SeqCst) != 0 { + return Poll::Pending; + } + } } } - - poll } } @@ -392,9 +381,9 @@ impl Drop for RwLockReadGuard<'_, T> { fn drop(&mut self) { let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst); - // If this was the last read, wake one of the writers. + // If this was the last reader, notify a blocked writer if none were notified already. if state & READ_COUNT_MASK == ONE_READ { - self.0.write_wakers.notify_one(); + self.0.write_wakers.notify_any(); } } } @@ -431,8 +420,9 @@ impl Drop for RwLockWriteGuard<'_, T> { // Notify all blocked readers. if !self.0.read_wakers.notify_all() { - // If there were no blocked readers, notify a blocked writer. - self.0.write_wakers.notify_one(); + // If there were no blocked readers, notify a blocked writer if none were notified + // already. + self.0.write_wakers.notify_any(); } } } diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index eb44a67..57fbaaa 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -17,11 +17,11 @@ use crate::task::{Context, Waker}; #[allow(clippy::identity_op)] const LOCKED: usize = 1 << 0; -/// Set when there are tasks for `notify_one()` to wake. -const NOTIFY_ONE: usize = 1 << 1; +/// Set when there is at least one entry that has already been notified. +const NOTIFIED: usize = 1 << 1; -/// Set when there are tasks for `notify_all()` to wake. -const NOTIFY_ALL: usize = 1 << 2; +/// Set when there is at least one notifiable entry. +const NOTIFIABLE: usize = 1 << 2; /// Inner representation of `WakerSet`. struct Inner { @@ -34,8 +34,8 @@ struct Inner { /// The key of each entry is its index in the `Slab`. entries: Slab>, - /// The number of entries that have the waker set to `None`. - none_count: usize, + /// The number of notifiable entries. + notifiable: usize, } /// A set holding wakers. @@ -55,7 +55,7 @@ impl WakerSet { flag: AtomicUsize::new(0), inner: UnsafeCell::new(Inner { entries: Slab::new(), - none_count: 0, + notifiable: 0, }), } } @@ -63,34 +63,20 @@ impl WakerSet { /// Inserts a waker for a blocked operation and returns a key associated with it. pub fn insert(&self, cx: &Context<'_>) -> usize { let w = cx.waker().clone(); - self.lock().entries.insert(Some(w)) - } - - /// Updates the waker of a previously inserted entry. - pub fn update(&self, key: usize, cx: &Context<'_>) { let mut inner = self.lock(); - match &mut inner.entries[key] { - None => { - // Fill in the waker. - let w = cx.waker().clone(); - inner.entries[key] = Some(w); - inner.none_count -= 1; - } - Some(w) => { - // Replace the waker if the existing one is different. - if !w.will_wake(cx.waker()) { - *w = cx.waker().clone(); - } - } - } + let key = inner.entries.insert(Some(w)); + inner.notifiable += 1; + key } - /// Removes the waker of a completed operation. - pub fn complete(&self, key: usize) { + /// Removes the waker of an operation. + pub fn remove(&self, key: usize) { let mut inner = self.lock(); - if inner.entries.remove(key).is_none() { - inner.none_count -= 1; + + match inner.entries.remove(key) { + Some(_) => inner.notifiable -= 1, + None => {} } } @@ -100,31 +86,48 @@ impl WakerSet { pub fn cancel(&self, key: usize) -> bool { let mut inner = self.lock(); - if inner.entries.remove(key).is_none() { - inner.none_count -= 1; - - // The operation was cancelled and notified so notify another operation instead. - if let Some((_, opt_waker)) = inner.entries.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - inner.none_count += 1; + match inner.entries.remove(key) { + Some(_) => inner.notifiable -= 1, + None => { + // The operation was cancelled and notified so notify another operation instead. + for (_, opt_waker) in inner.entries.iter_mut() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + inner.notifiable -= 1; + return true; + } } - return true; } } false } - /// Notifies one blocked operation. + /// Notifies a blocked operation if none have been notified already. /// /// Returns `true` if an operation was notified. #[inline] + pub fn notify_any(&self) -> bool { + // Use `SeqCst` ordering to synchronize with `Lock::drop()`. + let flag = self.flag.load(Ordering::SeqCst); + + if flag & NOTIFIED == 0 && flag & NOTIFIABLE != 0 { + self.notify(Notify::Any) + } else { + false + } + } + + /// Notifies one additional blocked operation. + /// + /// Returns `true` if an operation was notified. + #[inline] + #[cfg(feature = "unstable")] pub fn notify_one(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. - if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 { - self.notify(false) + if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 { + self.notify(Notify::One) } else { false } @@ -136,8 +139,8 @@ impl WakerSet { #[inline] pub fn notify_all(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. - if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 { - self.notify(true) + if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 { + self.notify(Notify::All) } else { false } @@ -146,7 +149,7 @@ impl WakerSet { /// Notifies blocked operations, either one or all of them. /// /// Returns `true` if at least one operation was notified. - fn notify(&self, all: bool) -> bool { + fn notify(&self, n: Notify) -> bool { let mut inner = &mut *self.lock(); let mut notified = false; @@ -154,12 +157,15 @@ impl WakerSet { // If there is no waker in this entry, that means it was already woken. if let Some(w) = opt_waker.take() { w.wake(); - inner.none_count += 1; + inner.notifiable -= 1; + notified = true; + + if n == Notify::One { + break; + } } - notified = true; - - if !all { + if n == Notify::Any { break; } } @@ -188,14 +194,14 @@ impl Drop for Lock<'_> { fn drop(&mut self) { let mut flag = 0; - // If there is at least one entry and all are `Some`, then `notify_one()` has work to do. - if !self.entries.is_empty() && self.none_count == 0 { - flag |= NOTIFY_ONE; + // Set the `NOTIFIED` flag if there is at least one notified entry. + if self.entries.len() - self.notifiable > 0 { + flag |= NOTIFIED; } - // If there is at least one `Some` entry, then `notify_all()` has work to do. - if self.entries.len() - self.none_count > 0 { - flag |= NOTIFY_ALL; + // Set the `NOTIFIABLE` flag if there is at least one notifiable entry. + if self.notifiable > 0 { + flag |= NOTIFIABLE; } // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`. @@ -218,3 +224,14 @@ impl DerefMut for Lock<'_> { unsafe { &mut *self.waker_set.inner.get() } } } + +/// Notification strategy. +#[derive(Clone, Copy, Eq, PartialEq)] +enum Notify { + /// Make sure at least one entry is notified. + Any, + /// Notify one additional entry. + One, + /// Notify all entries. + All, +}