You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
2.6 KiB
Rust

use std::future::Future;
use async_macros::utils::poll_fn;
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use async_macros::MaybeDone;
use async_std::stream::Stream;
use std::collections::VecDeque;
use std::sync::Mutex;
use async_std::sync::Arc;
#[async_trait]
pub trait IntoResults<T: Send> {
async fn into_results<'a>(self) -> T where Self: 'a, T: 'a;
}
#[async_trait]
impl<O: Send, F: Future<Output=O> + Send> IntoResults<Vec<O>> for Vec<F> {
async fn into_results<'a>(self) -> Vec<O> where Self: 'a, O: 'a {
let mut futures: Vec<MaybeDone<F>> = self.into_iter().map(MaybeDone::new).collect();
return poll_fn(|cx: &mut Context<'_>| -> Poll<Vec<O>> {
let mut ready = true;
for future in &mut futures {
let fut = unsafe { Pin::new_unchecked(future) };
ready &= Future::poll(fut, cx).is_ready();
}
if ready {
let mut results = Vec::new();
for future in &mut futures {
let fut = unsafe { Pin::new_unchecked(future) };
results.push(fut.take().unwrap());
}
Poll::Ready(results)
} else {
Poll::Pending
}
}).await;
}
}
#[derive(Clone, Debug, Default)]
pub struct AsyncQueue<T>(Arc<Mutex<AsyncQueueState<T>>>);
impl<T> AsyncQueue<T> {
pub fn push(&mut self, value: T) {
let mut res = self.0.lock().unwrap();
res.queue.push_back(value);
if let Some(waker) = &res.waker {
waker.wake_by_ref();
}
}
pub fn close(&mut self) {
let mut res = self.0.lock().unwrap();
res.closed = true;
if let Some(waker) = &res.waker {
waker.wake_by_ref();
}
}
pub fn new() -> AsyncQueue<T> {
AsyncQueue(Arc::new(Mutex::new(AsyncQueueState {
closed: false,
waker: None,
queue: VecDeque::new(),
})))
}
}
#[derive(Clone, Debug, Default)]
struct AsyncQueueState<T> {
closed: bool,
waker: Option<Waker>,
queue: VecDeque<T>,
}
impl<T> Stream for AsyncQueue<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
if let Ok(mut res) = self.0.lock() {
res.waker = Some(cx.waker().clone());
if let Some(data) = res.queue.pop_front() {
return Poll::Ready(Some(data));
}
if res.closed {
return Poll::Ready(None);
}
}
Poll::Pending
}
}