mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-30 17:25:32 +00:00
Merge pull request #903 from taiki-e/stream-cycle
Fix double drop in StreamExt::cycle
This commit is contained in:
commit
a2ea3c7395
1 changed files with 18 additions and 24 deletions
|
@ -1,14 +1,19 @@
|
||||||
use core::mem::ManuallyDrop;
|
|
||||||
use core::pin::Pin;
|
use core::pin::Pin;
|
||||||
|
|
||||||
|
use futures_core::ready;
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
/// A stream that will repeatedly yield the same list of elements.
|
pin_project! {
|
||||||
#[derive(Debug)]
|
/// A stream that will repeatedly yield the same list of elements.
|
||||||
pub struct Cycle<S> {
|
#[derive(Debug)]
|
||||||
orig: S,
|
pub struct Cycle<S> {
|
||||||
source: ManuallyDrop<S>,
|
orig: S,
|
||||||
|
#[pin]
|
||||||
|
source: S,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Cycle<S>
|
impl<S> Cycle<S>
|
||||||
|
@ -18,15 +23,7 @@ where
|
||||||
pub(crate) fn new(source: S) -> Self {
|
pub(crate) fn new(source: S) -> Self {
|
||||||
Self {
|
Self {
|
||||||
orig: source.clone(),
|
orig: source.clone(),
|
||||||
source: ManuallyDrop::new(source),
|
source,
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Drop for Cycle<S> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
unsafe {
|
|
||||||
ManuallyDrop::drop(&mut self.source);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,17 +35,14 @@ where
|
||||||
type Item = S::Item;
|
type Item = S::Item;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
unsafe {
|
let mut this = self.project();
|
||||||
let this = self.get_unchecked_mut();
|
|
||||||
|
|
||||||
match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) {
|
match ready!(this.source.as_mut().poll_next(cx)) {
|
||||||
Some(item) => Poll::Ready(Some(item)),
|
None => {
|
||||||
None => {
|
this.source.set(this.orig.clone());
|
||||||
ManuallyDrop::drop(&mut this.source);
|
this.source.poll_next(cx)
|
||||||
this.source = ManuallyDrop::new(this.orig.clone());
|
|
||||||
Pin::new_unchecked(&mut *this.source).poll_next(cx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
item => Poll::Ready(item),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue