//! A common utility for building synchronization primitives. //! //! When an async operation is blocked, it needs to register itself somewhere so that it can be //! notified later on. The `WakerSet` type helps with keeping track of such async operations and //! notifying them when they may make progress. use std::cell::UnsafeCell; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Waker}; use crossbeam_utils::Backoff; use slab::Slab; /// Set when the entry list is locked. #[allow(clippy::identity_op)] const LOCKED: usize = 1 << 0; /// Set when there is at least one entry that has already been notified. const NOTIFIED: usize = 1 << 1; /// Set when there is at least one notifiable entry. const NOTIFIABLE: usize = 1 << 2; /// Inner representation of `WakerSet`. struct Inner { /// A list of entries in the set. /// /// Each entry has an optional waker associated with the task that is executing the operation. /// If the waker is set to `None`, that means the task has been woken up but hasn't removed /// itself from the `WakerSet` yet. /// /// The key of each entry is its index in the `Slab`. entries: Slab>, /// The number of notifiable entries. notifiable: usize, } /// A set holding wakers. pub struct WakerSet { /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`. flag: AtomicUsize, /// A set holding wakers. inner: UnsafeCell, } impl WakerSet { /// Creates a new `WakerSet`. #[inline] pub fn new() -> WakerSet { WakerSet { flag: AtomicUsize::new(0), inner: UnsafeCell::new(Inner { entries: Slab::new(), notifiable: 0, }), } } /// Inserts a waker for a blocked operation and returns a key associated with it. #[cold] pub fn insert(&self, cx: &Context<'_>) -> usize { let w = cx.waker().clone(); let mut inner = self.lock(); let key = inner.entries.insert(Some(w)); inner.notifiable += 1; key } /// Removes the waker of an operation. #[cold] pub fn remove(&self, key: usize) { let mut inner = self.lock(); if inner.entries.remove(key).is_some() { inner.notifiable -= 1; } } /// Removes the waker of a cancelled operation. /// /// Returns `true` if another blocked operation from the set was notified. #[cold] pub fn cancel(&self, key: usize) -> bool { let mut inner = self.lock(); match inner.entries.remove(key) { Some(_) => inner.notifiable -= 1, None => { // The operation was cancelled and notified so notify another operation instead. for (_, opt_waker) in inner.entries.iter_mut() { // If there is no waker in this entry, that means it was already woken. if let Some(w) = opt_waker.take() { w.wake(); inner.notifiable -= 1; return true; } } } } false } /// Notifies a blocked operation if none have been notified already. /// /// Returns `true` if an operation was notified. #[inline] pub fn notify_any(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. let flag = self.flag.load(Ordering::SeqCst); if flag & NOTIFIED == 0 && flag & NOTIFIABLE != 0 { self.notify(Notify::Any) } else { false } } /// Notifies one additional blocked operation. /// /// Returns `true` if an operation was notified. #[inline] #[cfg(feature = "unstable")] pub fn notify_one(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 { self.notify(Notify::One) } else { false } } /// Notifies all blocked operations. /// /// Returns `true` if at least one operation was notified. #[inline] pub fn notify_all(&self) -> bool { // Use `SeqCst` ordering to synchronize with `Lock::drop()`. if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 { self.notify(Notify::All) } else { false } } /// Notifies blocked operations, either one or all of them. /// /// Returns `true` if at least one operation was notified. #[cold] fn notify(&self, n: Notify) -> bool { let mut inner = &mut *self.lock(); let mut notified = false; for (_, opt_waker) in inner.entries.iter_mut() { // If there is no waker in this entry, that means it was already woken. if let Some(w) = opt_waker.take() { w.wake(); inner.notifiable -= 1; notified = true; if n == Notify::One { break; } } if n == Notify::Any { break; } } notified } /// Locks the list of entries. fn lock(&self) -> Lock<'_> { let backoff = Backoff::new(); while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { backoff.snooze(); } Lock { waker_set: self } } } /// A guard holding a `WakerSet` locked. struct Lock<'a> { waker_set: &'a WakerSet, } impl Drop for Lock<'_> { #[inline] fn drop(&mut self) { let mut flag = 0; // Set the `NOTIFIED` flag if there is at least one notified entry. if self.entries.len() - self.notifiable > 0 { flag |= NOTIFIED; } // Set the `NOTIFIABLE` flag if there is at least one notifiable entry. if self.notifiable > 0 { flag |= NOTIFIABLE; } // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`. self.waker_set.flag.store(flag, Ordering::SeqCst); } } impl Deref for Lock<'_> { type Target = Inner; #[inline] fn deref(&self) -> &Inner { unsafe { &*self.waker_set.inner.get() } } } impl DerefMut for Lock<'_> { #[inline] fn deref_mut(&mut self) -> &mut Inner { unsafe { &mut *self.waker_set.inner.get() } } } /// Notification strategy. #[derive(Clone, Copy, Eq, PartialEq)] enum Notify { /// Make sure at least one entry is notified. Any, /// Notify one additional entry. One, /// Notify all entries. All, }