You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
2.6 KiB
Rust
96 lines
2.6 KiB
Rust
4 years ago
|
use std::future::Future;
|
||
|
use async_macros::utils::poll_fn;
|
||
|
use std::task::{Context, Poll, Waker};
|
||
|
use std::pin::Pin;
|
||
|
use async_macros::MaybeDone;
|
||
|
use async_std::stream::Stream;
|
||
|
use std::collections::VecDeque;
|
||
|
use std::sync::Mutex;
|
||
|
use async_std::sync::Arc;
|
||
|
|
||
|
#[async_trait]
|
||
|
pub trait IntoResults<T: Send> {
|
||
|
async fn into_results<'a>(self) -> T where Self: 'a, T: 'a;
|
||
|
}
|
||
|
|
||
|
#[async_trait]
|
||
|
impl<O: Send, F: Future<Output=O> + Send> IntoResults<Vec<O>> for Vec<F> {
|
||
|
async fn into_results<'a>(self) -> Vec<O> where Self: 'a, O: 'a {
|
||
|
let mut futures: Vec<MaybeDone<F>> = self.into_iter().map(MaybeDone::new).collect();
|
||
|
return poll_fn(|cx: &mut Context<'_>| -> Poll<Vec<O>> {
|
||
|
let mut ready = true;
|
||
|
|
||
|
for future in &mut futures {
|
||
|
let fut = unsafe { Pin::new_unchecked(future) };
|
||
|
ready &= Future::poll(fut, cx).is_ready();
|
||
|
}
|
||
|
|
||
|
if ready {
|
||
|
let mut results = Vec::new();
|
||
|
for future in &mut futures {
|
||
|
let fut = unsafe { Pin::new_unchecked(future) };
|
||
|
results.push(fut.take().unwrap());
|
||
|
}
|
||
|
|
||
|
Poll::Ready(results)
|
||
|
} else {
|
||
|
Poll::Pending
|
||
|
}
|
||
|
}).await;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Clone, Debug, Default)]
|
||
|
pub struct AsyncQueue<T>(Arc<Mutex<AsyncQueueState<T>>>);
|
||
|
|
||
|
impl<T> AsyncQueue<T> {
|
||
|
pub fn push(&mut self, value: T) {
|
||
|
let mut res = self.0.lock().unwrap();
|
||
|
res.queue.push_back(value);
|
||
|
if let Some(waker) = &res.waker {
|
||
|
waker.wake_by_ref();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn close(&mut self) {
|
||
|
let mut res = self.0.lock().unwrap();
|
||
|
res.closed = true;
|
||
|
if let Some(waker) = &res.waker {
|
||
|
waker.wake_by_ref();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn new() -> AsyncQueue<T> {
|
||
|
AsyncQueue(Arc::new(Mutex::new(AsyncQueueState {
|
||
|
closed: false,
|
||
|
waker: None,
|
||
|
queue: VecDeque::new(),
|
||
|
})))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Clone, Debug, Default)]
|
||
|
struct AsyncQueueState<T> {
|
||
|
closed: bool,
|
||
|
waker: Option<Waker>,
|
||
|
queue: VecDeque<T>,
|
||
|
}
|
||
|
|
||
|
impl<T> Stream for AsyncQueue<T> {
|
||
|
type Item = T;
|
||
|
|
||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
|
||
|
if let Ok(mut res) = self.0.lock() {
|
||
|
res.waker = Some(cx.waker().clone());
|
||
|
if let Some(data) = res.queue.pop_front() {
|
||
|
return Poll::Ready(Some(data));
|
||
|
}
|
||
|
|
||
|
if res.closed {
|
||
|
return Poll::Ready(None);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Poll::Pending
|
||
|
}
|
||
|
}
|