mirror of
https://github.com/async-rs/async-std.git
synced 2025-05-13 18:41:27 +00:00
Fix deadlock when all receivers are dropped (#474)
* Fix deadlock when all receivers are dropped * Add a comment to explain the behavior of try_send * Disable clippy
This commit is contained in:
parent
266e6326eb
commit
bc24503382
3 changed files with 29 additions and 15 deletions
22
.github/workflows/ci.yml
vendored
22
.github/workflows/ci.yml
vendored
|
@ -74,14 +74,14 @@ jobs:
|
||||||
- name: Docs
|
- name: Docs
|
||||||
run: cargo doc --features docs
|
run: cargo doc --features docs
|
||||||
|
|
||||||
clippy_check:
|
# clippy_check:
|
||||||
name: Clippy check
|
# name: Clippy check
|
||||||
runs-on: ubuntu-latest
|
# runs-on: ubuntu-latest
|
||||||
steps:
|
# steps:
|
||||||
- uses: actions/checkout@v1
|
# - uses: actions/checkout@v1
|
||||||
- name: Install rust
|
# - name: Install rust
|
||||||
run: rustup update beta && rustup default beta
|
# run: rustup update beta && rustup default beta
|
||||||
- name: Install clippy
|
# - name: Install clippy
|
||||||
run: rustup component add clippy
|
# run: rustup component add clippy
|
||||||
- name: clippy
|
# - name: clippy
|
||||||
run: cargo clippy --all --features unstable
|
# run: cargo clippy --all --features unstable
|
||||||
|
|
|
@ -677,6 +677,14 @@ impl<T> Channel<T> {
|
||||||
let mut tail = self.tail.load(Ordering::Relaxed);
|
let mut tail = self.tail.load(Ordering::Relaxed);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
// Extract mark bit from the tail and unset it.
|
||||||
|
//
|
||||||
|
// If the mark bit was set (which means all receivers have been dropped), we will still
|
||||||
|
// send the message into the channel if there is enough capacity. The message will get
|
||||||
|
// dropped when the channel is dropped (which means when all senders are also dropped).
|
||||||
|
let mark_bit = tail & self.mark_bit;
|
||||||
|
tail ^= mark_bit;
|
||||||
|
|
||||||
// Deconstruct the tail.
|
// Deconstruct the tail.
|
||||||
let index = tail & (self.mark_bit - 1);
|
let index = tail & (self.mark_bit - 1);
|
||||||
let lap = tail & !(self.one_lap - 1);
|
let lap = tail & !(self.one_lap - 1);
|
||||||
|
@ -699,8 +707,8 @@ impl<T> Channel<T> {
|
||||||
|
|
||||||
// Try moving the tail.
|
// Try moving the tail.
|
||||||
match self.tail.compare_exchange_weak(
|
match self.tail.compare_exchange_weak(
|
||||||
tail,
|
tail | mark_bit,
|
||||||
new_tail,
|
new_tail | mark_bit,
|
||||||
Ordering::SeqCst,
|
Ordering::SeqCst,
|
||||||
Ordering::Relaxed,
|
Ordering::Relaxed,
|
||||||
) {
|
) {
|
||||||
|
@ -732,7 +740,7 @@ impl<T> Channel<T> {
|
||||||
// ...then the channel is full.
|
// ...then the channel is full.
|
||||||
|
|
||||||
// Check if the channel is disconnected.
|
// Check if the channel is disconnected.
|
||||||
if tail & self.mark_bit != 0 {
|
if mark_bit != 0 {
|
||||||
return Err(TrySendError::Disconnected(msg));
|
return Err(TrySendError::Disconnected(msg));
|
||||||
} else {
|
} else {
|
||||||
return Err(TrySendError::Full(msg));
|
return Err(TrySendError::Full(msg));
|
||||||
|
|
|
@ -25,7 +25,13 @@ fn smoke() {
|
||||||
|
|
||||||
drop(s);
|
drop(s);
|
||||||
assert_eq!(r.recv().await, None);
|
assert_eq!(r.recv().await, None);
|
||||||
})
|
});
|
||||||
|
|
||||||
|
task::block_on(async {
|
||||||
|
let (s, r) = channel(10);
|
||||||
|
drop(r);
|
||||||
|
s.send(1).await;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in a new issue