Fix double drop in StreamExt::cycle

pull/903/head
Taiki Endo 4 years ago
parent 11196c853d
commit e8dc2c0571

@ -1,14 +1,19 @@
use core::mem::ManuallyDrop;
use core::pin::Pin; use core::pin::Pin;
use futures_core::ready;
use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! {
/// A stream that will repeatedly yield the same list of elements. /// A stream that will repeatedly yield the same list of elements.
#[derive(Debug)] #[derive(Debug)]
pub struct Cycle<S> { pub struct Cycle<S> {
orig: S, orig: S,
source: ManuallyDrop<S>, #[pin]
source: S,
}
} }
impl<S> Cycle<S> impl<S> Cycle<S>
@ -18,15 +23,7 @@ where
pub(crate) fn new(source: S) -> Self { pub(crate) fn new(source: S) -> Self {
Self { Self {
orig: source.clone(), orig: source.clone(),
source: ManuallyDrop::new(source), source,
}
}
}
impl<S> Drop for Cycle<S> {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.source);
} }
} }
} }
@ -38,17 +35,14 @@ where
type Item = S::Item; type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { let mut this = self.project();
let this = self.get_unchecked_mut();
match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) { match ready!(this.source.as_mut().poll_next(cx)) {
Some(item) => Poll::Ready(Some(item)),
None => { None => {
ManuallyDrop::drop(&mut this.source); this.source.set(this.orig.clone());
this.source = ManuallyDrop::new(this.orig.clone()); this.source.poll_next(cx)
Pin::new_unchecked(&mut *this.source).poll_next(cx)
}
} }
item => Poll::Ready(item),
} }
} }
} }

Loading…
Cancel
Save