diff --git a/src/collections/binary_heap/extend.rs b/src/collections/binary_heap/extend.rs new file mode 100644 index 0000000..8538f9b --- /dev/null +++ b/src/collections/binary_heap/extend.rs @@ -0,0 +1,18 @@ +use std::collections::BinaryHeap; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend for BinaryHeap { + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let (lower_bound, _) = stream.size_hint(); + //self.reserve(lower_bound); + Box::pin(stream.for_each(move |item| self.push(item))) + } +} diff --git a/src/collections/binary_heap/from_stream.rs b/src/collections/binary_heap/from_stream.rs new file mode 100644 index 0000000..c8e44e9 --- /dev/null +++ b/src/collections/binary_heap/from_stream.rs @@ -0,0 +1,24 @@ +use std::collections::BinaryHeap; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream for BinaryHeap { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = BinaryHeap::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/binary_heap/mod.rs b/src/collections/binary_heap/mod.rs new file mode 100644 index 0000000..35c406c --- /dev/null +++ b/src/collections/binary_heap/mod.rs @@ -0,0 +1,7 @@ +//! The Rust priority queue implemented with a binary heap + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::BinaryHeap; diff --git a/src/collections/btree_map/extend.rs b/src/collections/btree_map/extend.rs new file mode 100644 index 0000000..ae02c42 --- /dev/null +++ b/src/collections/btree_map/extend.rs @@ -0,0 +1,16 @@ +use std::collections::BTreeMap; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend<(K, V)> for BTreeMap { + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + Box::pin(stream.into_stream().for_each(move |(k, v)| { + self.insert(k, v); + })) + } +} diff --git a/src/collections/btree_map/from_stream.rs b/src/collections/btree_map/from_stream.rs new file mode 100644 index 0000000..bd80c06 --- /dev/null +++ b/src/collections/btree_map/from_stream.rs @@ -0,0 +1,24 @@ +use std::collections::BTreeMap; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream<(K, V)> for BTreeMap { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = BTreeMap::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/btree_map/mod.rs b/src/collections/btree_map/mod.rs new file mode 100644 index 0000000..49f9084 --- /dev/null +++ b/src/collections/btree_map/mod.rs @@ -0,0 +1,7 @@ +//! The Rust B-Tree Map + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::BTreeMap; diff --git a/src/collections/btree_set/extend.rs b/src/collections/btree_set/extend.rs new file mode 100644 index 0000000..ccf0337 --- /dev/null +++ b/src/collections/btree_set/extend.rs @@ -0,0 +1,16 @@ +use std::collections::BTreeSet; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend for BTreeSet { + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + Box::pin(stream.into_stream().for_each(move |item| { + self.insert(item); + })) + } +} diff --git a/src/collections/btree_set/from_stream.rs b/src/collections/btree_set/from_stream.rs new file mode 100644 index 0000000..bd2a744 --- /dev/null +++ b/src/collections/btree_set/from_stream.rs @@ -0,0 +1,24 @@ +use std::collections::BTreeSet; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream for BTreeSet { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = BTreeSet::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/btree_set/mod.rs b/src/collections/btree_set/mod.rs new file mode 100644 index 0000000..e8a572a --- /dev/null +++ b/src/collections/btree_set/mod.rs @@ -0,0 +1,7 @@ +//! The Rust B-Tree Set + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::BTreeSet; diff --git a/src/collections/hash_map/extend.rs b/src/collections/hash_map/extend.rs new file mode 100644 index 0000000..5f96b8a --- /dev/null +++ b/src/collections/hash_map/extend.rs @@ -0,0 +1,38 @@ +use std::collections::HashMap; +use std::hash::{BuildHasher, Hash}; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend<(K, V)> for HashMap +where + K: Eq + Hash, + H: BuildHasher + Default, +{ + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + + // The following is adapted from the hashbrown source code: + // https://github.com/rust-lang/hashbrown/blob/d1ad4fc3aae2ade446738eea512e50b9e863dd0c/src/map.rs#L2470-L2491 + // + // Keys may be already present or show multiple times in the stream. Reserve the entire + // hint lower bound if the map is empty. Otherwise reserve half the hint (rounded up), so + // the map will only resize twice in the worst case. + + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let reserve = if self.is_empty() { + // stream.size_hint().0 + //} else { + // (stream.size_hint().0 + 1) / 2 + //}; + //self.reserve(reserve); + + Box::pin(stream.for_each(move |(k, v)| { + self.insert(k, v); + })) + } +} diff --git a/src/collections/hash_map/from_stream.rs b/src/collections/hash_map/from_stream.rs new file mode 100644 index 0000000..2b7bbc9 --- /dev/null +++ b/src/collections/hash_map/from_stream.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; +use std::hash::{BuildHasher, Hash}; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream<(K, V)> for HashMap +where + K: Eq + Hash, + H: BuildHasher + Default, +{ + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = HashMap::with_hasher(Default::default()); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/hash_map/mod.rs b/src/collections/hash_map/mod.rs new file mode 100644 index 0000000..6e52dd2 --- /dev/null +++ b/src/collections/hash_map/mod.rs @@ -0,0 +1,7 @@ +//! The Rust hash map, implemented with quadratic probing and SIMD lookup. + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::HashMap; diff --git a/src/collections/hash_set/extend.rs b/src/collections/hash_set/extend.rs new file mode 100644 index 0000000..72400e1 --- /dev/null +++ b/src/collections/hash_set/extend.rs @@ -0,0 +1,41 @@ +use std::collections::HashSet; +use std::hash::{BuildHasher, Hash}; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend for HashSet +where + T: Eq + Hash, + H: BuildHasher + Default, +{ + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + // The Extend impl for HashSet in the standard library delegates to the internal HashMap. + // Thus, this impl is just a copy of the async Extend impl for HashMap in this crate. + + let stream = stream.into_stream(); + + // The following is adapted from the hashbrown source code: + // https://github.com/rust-lang/hashbrown/blob/d1ad4fc3aae2ade446738eea512e50b9e863dd0c/src/map.rs#L2470-L2491 + // + // Keys may be already present or show multiple times in the stream. Reserve the entire + // hint lower bound if the map is empty. Otherwise reserve half the hint (rounded up), so + // the map will only resize twice in the worst case. + + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let reserve = if self.is_empty() { + // stream.size_hint().0 + //} else { + // (stream.size_hint().0 + 1) / 2 + //}; + //self.reserve(reserve); + + Box::pin(stream.for_each(move |item| { + self.insert(item); + })) + } +} diff --git a/src/collections/hash_set/from_stream.rs b/src/collections/hash_set/from_stream.rs new file mode 100644 index 0000000..42447fe --- /dev/null +++ b/src/collections/hash_set/from_stream.rs @@ -0,0 +1,29 @@ +use std::collections::HashSet; +use std::hash::{BuildHasher, Hash}; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream for HashSet +where + T: Eq + Hash, + H: BuildHasher + Default, +{ + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = HashSet::with_hasher(Default::default()); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/hash_set/mod.rs b/src/collections/hash_set/mod.rs new file mode 100644 index 0000000..5d93a88 --- /dev/null +++ b/src/collections/hash_set/mod.rs @@ -0,0 +1,7 @@ +//! The Rust hash set, implemented as a `HashMap` where the value is `()`. + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::HashSet; diff --git a/src/collections/linked_list/extend.rs b/src/collections/linked_list/extend.rs new file mode 100644 index 0000000..567a92d --- /dev/null +++ b/src/collections/linked_list/extend.rs @@ -0,0 +1,18 @@ +use std::collections::LinkedList; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend for LinkedList { + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let (lower_bound, _) = stream.size_hint(); + //self.reserve(lower_bound); + Box::pin(stream.for_each(move |item| self.push_back(item))) + } +} diff --git a/src/collections/linked_list/from_stream.rs b/src/collections/linked_list/from_stream.rs new file mode 100644 index 0000000..3ffe149 --- /dev/null +++ b/src/collections/linked_list/from_stream.rs @@ -0,0 +1,24 @@ +use std::collections::LinkedList; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream for LinkedList { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = LinkedList::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/linked_list/mod.rs b/src/collections/linked_list/mod.rs new file mode 100644 index 0000000..bd009ea --- /dev/null +++ b/src/collections/linked_list/mod.rs @@ -0,0 +1,7 @@ +//! The Rust doubly-linked list with owned nodes + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::LinkedList; diff --git a/src/collections/mod.rs b/src/collections/mod.rs new file mode 100644 index 0000000..ae9efaa --- /dev/null +++ b/src/collections/mod.rs @@ -0,0 +1,20 @@ +//! The Rust standard collections +//! +//! This library provides efficient implementations of the most common general purpose programming +//! data structures. + +pub mod binary_heap; +pub mod btree_map; +pub mod btree_set; +pub mod hash_map; +pub mod hash_set; +pub mod linked_list; +pub mod vec_deque; + +pub use binary_heap::BinaryHeap; +pub use btree_map::BTreeMap; +pub use btree_set::BTreeSet; +pub use hash_map::HashMap; +pub use hash_set::HashSet; +pub use linked_list::LinkedList; +pub use vec_deque::VecDeque; diff --git a/src/collections/vec_deque/extend.rs b/src/collections/vec_deque/extend.rs new file mode 100644 index 0000000..d034bdc --- /dev/null +++ b/src/collections/vec_deque/extend.rs @@ -0,0 +1,18 @@ +use std::collections::VecDeque; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Extend, IntoStream}; + +impl Extend for VecDeque { + fn stream_extend<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let (lower_bound, _) = stream.size_hint(); + //self.reserve(lower_bound); + Box::pin(stream.for_each(move |item| self.push_back(item))) + } +} diff --git a/src/collections/vec_deque/from_stream.rs b/src/collections/vec_deque/from_stream.rs new file mode 100644 index 0000000..8903de0 --- /dev/null +++ b/src/collections/vec_deque/from_stream.rs @@ -0,0 +1,24 @@ +use std::collections::VecDeque; +use std::pin::Pin; + +use crate::stream::{Extend, FromStream, IntoStream}; + +impl FromStream for VecDeque { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = VecDeque::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/collections/vec_deque/mod.rs b/src/collections/vec_deque/mod.rs new file mode 100644 index 0000000..b03d5e6 --- /dev/null +++ b/src/collections/vec_deque/mod.rs @@ -0,0 +1,7 @@ +//! The Rust double-ended queue, implemented with a growable ring buffer. + +mod extend; +mod from_stream; + +#[doc(inline)] +pub use std::collections::VecDeque; diff --git a/src/lib.rs b/src/lib.rs index f64fdbc..839a0ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,10 +65,12 @@ cfg_if! { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub mod pin; + mod unit; mod vec; mod result; mod option; mod string; + mod collections; } } diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs new file mode 100644 index 0000000..a238982 --- /dev/null +++ b/src/unit/from_stream.rs @@ -0,0 +1,16 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{FromStream, IntoStream}; + +impl FromStream<()> for () { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + Box::pin(stream.into_stream().for_each(|_| ())) + } +} diff --git a/src/unit/mod.rs b/src/unit/mod.rs new file mode 100644 index 0000000..cb8063d --- /dev/null +++ b/src/unit/mod.rs @@ -0,0 +1,6 @@ +//! The Rust primitive `()` type, sometimes called "unit" or "nil". +//! +//! This module provides types and implementations for working +//! asynchronously with values of type `()`. + +mod from_stream; diff --git a/src/vec/extend.rs b/src/vec/extend.rs index d85589e..ca3373d 100644 --- a/src/vec/extend.rs +++ b/src/vec/extend.rs @@ -9,11 +9,9 @@ impl Extend for Vec { stream: S, ) -> Pin + 'a>> { let stream = stream.into_stream(); - Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { - self.push(item); - } - }) + //TODO: Add this back in when size_hint is added to Stream/StreamExt + //let (lower_bound, _) = stream.size_hint(); + //self.reserve(lower_bound); + Box::pin(stream.for_each(move |item| self.push(item))) } } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index 26196af..b326b04 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,4 +1,7 @@ +use std::borrow::Cow; use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; use crate::stream::{Extend, FromStream, IntoStream}; @@ -21,3 +24,75 @@ impl FromStream for Vec { }) } } + +impl<'b, T: Clone> FromStream for Cow<'b, [T]> { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + Cow::Owned(FromStream::from_stream(stream).await) + }) + } +} + +impl FromStream for Box<[T]> { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + Vec::from_stream(stream).await.into_boxed_slice() + }) + } +} + +impl FromStream for Rc<[T]> { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + Vec::from_stream(stream).await.into() + }) + } +} + +impl FromStream for Arc<[T]> { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + Vec::from_stream(stream).await.into() + }) + } +}