mirror of
https://github.com/async-rs/async-std.git
synced 2025-04-05 07:56:42 +00:00
fix barrier tests
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
parent
c23cc769ee
commit
0b39306b74
2 changed files with 55 additions and 52 deletions
|
@ -174,3 +174,58 @@ impl BarrierWaitResult {
|
||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use futures_channel::mpsc::unbounded;
|
||||||
|
use futures_util::sink::SinkExt;
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
|
|
||||||
|
use crate::sync::{Arc, Barrier};
|
||||||
|
use crate::task;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_barrier() {
|
||||||
|
// NOTE(dignifiedquire): Based on the test in std, I was seeing some
|
||||||
|
// race conditions, so running it in a loop to make sure things are
|
||||||
|
// solid.
|
||||||
|
|
||||||
|
for _ in 0..1_000 {
|
||||||
|
task::block_on(async move {
|
||||||
|
const N: usize = 10;
|
||||||
|
|
||||||
|
let barrier = Arc::new(Barrier::new(N));
|
||||||
|
let (tx, mut rx) = unbounded();
|
||||||
|
|
||||||
|
for _ in 0..N - 1 {
|
||||||
|
let c = barrier.clone();
|
||||||
|
let mut tx = tx.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
let res = c.wait().await;
|
||||||
|
|
||||||
|
tx.send(res.is_leader()).await.unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point, all spawned threads should be blocked,
|
||||||
|
// so we shouldn't get anything from the port
|
||||||
|
let res = rx.try_next();
|
||||||
|
assert!(match res {
|
||||||
|
Err(_err) => true,
|
||||||
|
_ => false,
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut leader_found = barrier.wait().await.is_leader();
|
||||||
|
|
||||||
|
// Now, the barrier is cleared and we should get data.
|
||||||
|
for _ in 0..N - 1 {
|
||||||
|
if rx.next().await.unwrap() {
|
||||||
|
assert!(!leader_found);
|
||||||
|
leader_found = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(leader_found);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,52 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use futures_channel::mpsc::unbounded;
|
|
||||||
use futures_util::sink::SinkExt;
|
|
||||||
use futures_util::stream::StreamExt;
|
|
||||||
|
|
||||||
use async_std::sync::Barrier;
|
|
||||||
use async_std::task;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_barrier() {
|
|
||||||
// Based on the test in std, I was seeing some race conditions, so running it in a loop to make sure
|
|
||||||
// things are solid.
|
|
||||||
|
|
||||||
for _ in 0..1_000 {
|
|
||||||
task::block_on(async move {
|
|
||||||
const N: usize = 10;
|
|
||||||
|
|
||||||
let barrier = Arc::new(Barrier::new(N));
|
|
||||||
let (tx, mut rx) = unbounded();
|
|
||||||
|
|
||||||
for _ in 0..N - 1 {
|
|
||||||
let c = barrier.clone();
|
|
||||||
let mut tx = tx.clone();
|
|
||||||
task::spawn(async move {
|
|
||||||
let res = c.wait().await;
|
|
||||||
|
|
||||||
tx.send(res.is_leader()).await.unwrap();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, all spawned threads should be blocked,
|
|
||||||
// so we shouldn't get anything from the port
|
|
||||||
let res = rx.try_next();
|
|
||||||
assert!(match res {
|
|
||||||
Err(_err) => true,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut leader_found = barrier.wait().await.is_leader();
|
|
||||||
|
|
||||||
// Now, the barrier is cleared and we should get data.
|
|
||||||
for _ in 0..N - 1 {
|
|
||||||
if rx.next().await.unwrap() {
|
|
||||||
assert!(!leader_found);
|
|
||||||
leader_found = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert!(leader_found);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue