diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index fffda4dd..7265d17f 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -22,3 +22,52 @@ impl FlattenCompat { } } } + +impl Stream for FlattenCompat +where + S: Stream> + std::marker::Unpin, + U: Stream + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(ref mut inner) = self.as_mut().frontiter() { + if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::FlattenCompat; + + use crate::prelude::*; + use crate::task; + + use std::collections::VecDeque; + + #[test] + fn test_poll_next() -> std::io::Result<()> { + let inner1: VecDeque = vec![1, 2, 3].into_iter().collect(); + let inner2: VecDeque = vec![4, 5, 6].into_iter().collect(); + + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + task::block_on(async move { + let flat = FlattenCompat::new(s); + let v: Vec = flat.collect().await; + + assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); + Ok(()) + }) + } +}