mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-16 10:49:55 +00:00
Merge pull request #242 from async-rs/barrier-unstable
mark sync::Barrier as unstable
This commit is contained in:
commit
fdc8fe624d
4 changed files with 64 additions and 55 deletions
|
@ -21,8 +21,8 @@ features = ["docs"]
|
|||
rustdoc-args = ["--cfg", "feature=\"docs\""]
|
||||
|
||||
[features]
|
||||
docs = []
|
||||
unstable = []
|
||||
docs = ["broadcaster"]
|
||||
unstable = ["broadcaster"]
|
||||
|
||||
[dependencies]
|
||||
async-macros = "1.0.0"
|
||||
|
@ -42,7 +42,7 @@ num_cpus = "1.10.1"
|
|||
pin-utils = "0.1.0-alpha.4"
|
||||
slab = "0.4.2"
|
||||
kv-log-macro = "1.0.4"
|
||||
broadcaster = "0.2.4"
|
||||
broadcaster = { version = "0.2.4", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
femme = "1.2.0"
|
||||
|
|
|
@ -32,6 +32,7 @@ use crate::sync::Mutex;
|
|||
/// # });
|
||||
/// # }
|
||||
/// ```
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug)]
|
||||
pub struct Barrier {
|
||||
state: Mutex<BarrierState>,
|
||||
|
@ -60,6 +61,7 @@ struct BarrierState {
|
|||
/// let barrier = Barrier::new(1);
|
||||
/// let barrier_wait_result = barrier.wait();
|
||||
/// ```
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BarrierWaitResult(bool);
|
||||
|
||||
|
@ -172,3 +174,58 @@ impl BarrierWaitResult {
|
|||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use futures_channel::mpsc::unbounded;
|
||||
use futures_util::sink::SinkExt;
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
use crate::sync::{Arc, Barrier};
|
||||
use crate::task;
|
||||
|
||||
#[test]
|
||||
fn test_barrier() {
|
||||
// NOTE(dignifiedquire): Based on the test in std, I was seeing some
|
||||
// race conditions, so running it in a loop to make sure things are
|
||||
// solid.
|
||||
|
||||
for _ in 0..1_000 {
|
||||
task::block_on(async move {
|
||||
const N: usize = 10;
|
||||
|
||||
let barrier = Arc::new(Barrier::new(N));
|
||||
let (tx, mut rx) = unbounded();
|
||||
|
||||
for _ in 0..N - 1 {
|
||||
let c = barrier.clone();
|
||||
let mut tx = tx.clone();
|
||||
task::spawn(async move {
|
||||
let res = c.wait().await;
|
||||
|
||||
tx.send(res.is_leader()).await.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// At this point, all spawned threads should be blocked,
|
||||
// so we shouldn't get anything from the port
|
||||
let res = rx.try_next();
|
||||
assert!(match res {
|
||||
Err(_err) => true,
|
||||
_ => false,
|
||||
});
|
||||
|
||||
let mut leader_found = barrier.wait().await.is_leader();
|
||||
|
||||
// Now, the barrier is cleared and we should get data.
|
||||
for _ in 0..N - 1 {
|
||||
if rx.next().await.unwrap() {
|
||||
assert!(!leader_found);
|
||||
leader_found = true;
|
||||
}
|
||||
}
|
||||
assert!(leader_found);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,10 +32,14 @@
|
|||
#[doc(inline)]
|
||||
pub use std::sync::{Arc, Weak};
|
||||
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
pub use barrier::{Barrier, BarrierWaitResult};
|
||||
|
||||
pub use mutex::{Mutex, MutexGuard};
|
||||
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
mod barrier;
|
||||
mod mutex;
|
||||
mod rwlock;
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use futures_channel::mpsc::unbounded;
|
||||
use futures_util::sink::SinkExt;
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
use async_std::sync::Barrier;
|
||||
use async_std::task;
|
||||
|
||||
#[test]
|
||||
fn test_barrier() {
|
||||
// Based on the test in std, I was seeing some race conditions, so running it in a loop to make sure
|
||||
// things are solid.
|
||||
|
||||
for _ in 0..1_000 {
|
||||
task::block_on(async move {
|
||||
const N: usize = 10;
|
||||
|
||||
let barrier = Arc::new(Barrier::new(N));
|
||||
let (tx, mut rx) = unbounded();
|
||||
|
||||
for _ in 0..N - 1 {
|
||||
let c = barrier.clone();
|
||||
let mut tx = tx.clone();
|
||||
task::spawn(async move {
|
||||
let res = c.wait().await;
|
||||
|
||||
tx.send(res.is_leader()).await.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// At this point, all spawned threads should be blocked,
|
||||
// so we shouldn't get anything from the port
|
||||
let res = rx.try_next();
|
||||
assert!(match res {
|
||||
Err(_err) => true,
|
||||
_ => false,
|
||||
});
|
||||
|
||||
let mut leader_found = barrier.wait().await.is_leader();
|
||||
|
||||
// Now, the barrier is cleared and we should get data.
|
||||
for _ in 0..N - 1 {
|
||||
if rx.next().await.unwrap() {
|
||||
assert!(!leader_found);
|
||||
leader_found = true;
|
||||
}
|
||||
}
|
||||
assert!(leader_found);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue