Merge remote-tracking branch 'origin/master' into flat_map_fixed
commit
924e5a3f41
@ -0,0 +1,266 @@
|
||||
# Production-Ready Accept Loop
|
||||
|
||||
A production-ready accept loop needs the following things:
|
||||
1. Handling errors
|
||||
2. Limiting the number of simultanteous connections to avoid deny-of-service
|
||||
(DoS) attacks
|
||||
|
||||
|
||||
## Handling errors
|
||||
|
||||
There are two kinds of errors in an accept loop:
|
||||
1. Per-connection errors. The system uses them to notify that there was a
|
||||
connection in the queue and it's dropped by the peer. Subsequent connections
|
||||
can be already queued so next connection must be accepted immediately.
|
||||
2. Resource shortages. When these are encountered it doesn't make sense to
|
||||
accept the next socket immediately. But the listener stays active, so you server
|
||||
should try to accept socket later.
|
||||
|
||||
Here is the example of a per-connection error (printed in normal and debug mode):
|
||||
```
|
||||
Error: Connection reset by peer (os error 104)
|
||||
Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
|
||||
```
|
||||
|
||||
And the following is the most common example of a resource shortage error:
|
||||
```
|
||||
Error: Too many open files (os error 24)
|
||||
Error: Os { code: 24, kind: Other, message: "Too many open files" }
|
||||
```
|
||||
|
||||
### Testing Application
|
||||
|
||||
To test your application for these errors try the following (this works
|
||||
on unixes only).
|
||||
|
||||
Lower limits and start the application:
|
||||
```
|
||||
$ ulimit -n 100
|
||||
$ cargo run --example your_app
|
||||
Compiling your_app v0.1.0 (/work)
|
||||
Finished dev [unoptimized + debuginfo] target(s) in 5.47s
|
||||
Running `target/debug/examples/your_app`
|
||||
Server is listening on: http://127.0.0.1:1234
|
||||
```
|
||||
Then in another console run the [`wrk`] benchmark tool:
|
||||
```
|
||||
$ wrk -c 1000 http://127.0.0.1:1234
|
||||
Running 10s test @ http://localhost:8080/
|
||||
2 threads and 1000 connections
|
||||
$ telnet localhost 1234
|
||||
Trying ::1...
|
||||
Connected to localhost.
|
||||
```
|
||||
|
||||
Important is to check the following things:
|
||||
|
||||
1. The application doesn't crash on error (but may log errors, see below)
|
||||
2. It's possible to connect to the application again once load is stopped
|
||||
(few seconds after `wrk`). This is what `telnet` does in example above,
|
||||
make sure it prints `Connected to <hostname>`.
|
||||
3. The `Too many open files` error is logged in the appropriate log. This
|
||||
requires to set "maximum number of simultaneous connections" parameter (see
|
||||
below) of your application to a value greater then `100` for this example.
|
||||
4. Check CPU usage of the app while doing a test. It should not occupy 100%
|
||||
of a single CPU core (it's unlikely that you can exhaust CPU by 1000
|
||||
connections in Rust, so this means error handling is not right).
|
||||
|
||||
#### Testing non-HTTP applications
|
||||
|
||||
If it's possible, use the appropriate benchmark tool and set the appropriate
|
||||
number of connections. For example `redis-benchmark` has a `-c` parameter for
|
||||
that, if you implement redis protocol.
|
||||
|
||||
Alternatively, can still use `wrk`, just make sure that connection is not
|
||||
immediately closed. If it is, put a temporary timeout before handing
|
||||
the connection to the protocol handler, like this:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
# let listener = TcpListener::bind(addr).await?;
|
||||
# let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
task::spawn(async {
|
||||
task::sleep(Duration::from_secs(10)).await; // 1
|
||||
connection_loop(stream).await;
|
||||
});
|
||||
}
|
||||
# Ok(())
|
||||
# }
|
||||
```
|
||||
|
||||
1. Make sure the sleep coroutine is inside the spawned task, not in the loop.
|
||||
|
||||
[`wrk`]: https://github.com/wg/wrk
|
||||
|
||||
|
||||
### Handling Errors Manually
|
||||
|
||||
Here is how basic accept loop could look like:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(result) = incoming.next().await {
|
||||
let stream = match stream {
|
||||
Err(ref e) if is_connection_error(e) => continue, // 1
|
||||
Err(e) => {
|
||||
eprintln!("Error: {}. Pausing for 500ms."); // 3
|
||||
task::sleep(Duration::from_millis(500)).await; // 2
|
||||
continue;
|
||||
}
|
||||
Ok(s) => s,
|
||||
};
|
||||
// body
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
1. Ignore per-connection errors.
|
||||
2. Sleep and continue on resource shortage.
|
||||
3. It's important to log the message, because these errors commonly mean the
|
||||
misconfiguration of the system and are helpful for operations people running
|
||||
the application.
|
||||
|
||||
Be sure to [test your application](#testing-application).
|
||||
|
||||
|
||||
### External Crates
|
||||
|
||||
The crate [`async-listen`] has a helper to achieve this task:
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# extern crate async_listen;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
use async_listen::{ListenExt, error_hint};
|
||||
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener
|
||||
.incoming()
|
||||
.log_warnings(log_accept_error) // 1
|
||||
.handle_errors(Duration::from_millis(500));
|
||||
while let Some(socket) = incoming.next().await { // 2
|
||||
// body
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn log_accept_error(e: &io::Error) {
|
||||
eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3
|
||||
}
|
||||
```
|
||||
|
||||
1. Logs resource shortages (`async-listen` calls them warnings). If you use
|
||||
`log` crate or any other in your app this should go to the log.
|
||||
2. Stream yields sockets without `Result` wrapper after `handle_errors` because
|
||||
all errors are already handled.
|
||||
3. Together with the error we print a hint, which explains some errors for end
|
||||
users. For example, it recommends increasing open file limit and gives
|
||||
a link.
|
||||
|
||||
[`async-listen`]: https://crates.io/crates/async-listen/
|
||||
|
||||
Be sure to [test your application](#testing-application).
|
||||
|
||||
|
||||
## Connections Limit
|
||||
|
||||
Even if you've applied everything described in
|
||||
[Handling Errors](#handling-errors) section, there is still a problem.
|
||||
|
||||
Let's imagine you have a server that needs to open a file to process
|
||||
client request. At some point, you might encounter the following situation:
|
||||
|
||||
1. There are as many client connection as max file descriptors allowed for
|
||||
the application.
|
||||
2. Listener gets `Too many open files` error so it sleeps.
|
||||
3. Some client sends a request via the previously open connection.
|
||||
4. Opening a file to serve request fails, because of the same
|
||||
`Too many open files` error, until some other client drops a connection.
|
||||
|
||||
There are many more possible situations, this is just a small illustation that
|
||||
limiting number of connections is very useful. Generally, it's one of the ways
|
||||
to control resources used by a server and avoiding some kinds of deny of
|
||||
service (DoS) attacks.
|
||||
|
||||
### `async-listen` crate
|
||||
|
||||
Limiting maximum number of simultaneous connections with [`async-listen`]
|
||||
looks like the following:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# extern crate async_listen;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, TcpStream, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
use async_listen::{ListenExt, Token, error_hint};
|
||||
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener
|
||||
.incoming()
|
||||
.log_warnings(log_accept_error)
|
||||
.handle_errors(Duration::from_millis(500)) // 1
|
||||
.backpressure(100);
|
||||
while let Some((token, socket)) = incoming.next().await { // 2
|
||||
task::spawn(async move {
|
||||
connection_loop(&token, stream).await; // 3
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn connection_loop(_token: &Token, stream: TcpStream) { // 4
|
||||
// ...
|
||||
}
|
||||
# fn log_accept_error(e: &io::Error) {
|
||||
# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e));
|
||||
# }
|
||||
```
|
||||
|
||||
1. We need to handle errors first, because [`backpressure`] helper expects
|
||||
stream of `TcpStream` rather than `Result`.
|
||||
2. The token yielded by a new stream is what is counted by backpressure helper.
|
||||
I.e. if you drop a token, new connection can be established.
|
||||
3. We give the connection loop a reference to token to bind token's lifetime to
|
||||
the lifetime of the connection.
|
||||
4. The token itsellf in the function can be ignored, hence `_token`
|
||||
|
||||
[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure
|
||||
|
||||
Be sure to [test this behavior](#testing-application).
|
@ -1,315 +0,0 @@
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use mio::{self, Evented};
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::io;
|
||||
use crate::task::{Context, Poll, Waker};
|
||||
use crate::utils::abort_on_panic;
|
||||
|
||||
/// Data associated with a registered I/O handle.
|
||||
#[derive(Debug)]
|
||||
struct Entry {
|
||||
/// A unique identifier.
|
||||
token: mio::Token,
|
||||
|
||||
/// Tasks that are blocked on reading from this I/O handle.
|
||||
readers: Mutex<Vec<Waker>>,
|
||||
|
||||
/// Thasks that are blocked on writing to this I/O handle.
|
||||
writers: Mutex<Vec<Waker>>,
|
||||
}
|
||||
|
||||
/// The state of a networking driver.
|
||||
struct Reactor {
|
||||
/// A mio instance that polls for new events.
|
||||
poller: mio::Poll,
|
||||
|
||||
/// A collection of registered I/O handles.
|
||||
entries: Mutex<Slab<Arc<Entry>>>,
|
||||
|
||||
/// Dummy I/O handle that is only used to wake up the polling thread.
|
||||
notify_reg: (mio::Registration, mio::SetReadiness),
|
||||
|
||||
/// An identifier for the notification handle.
|
||||
notify_token: mio::Token,
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
/// Creates a new reactor for polling I/O events.
|
||||
fn new() -> io::Result<Reactor> {
|
||||
let poller = mio::Poll::new()?;
|
||||
let notify_reg = mio::Registration::new2();
|
||||
|
||||
let mut reactor = Reactor {
|
||||
poller,
|
||||
entries: Mutex::new(Slab::new()),
|
||||
notify_reg,
|
||||
notify_token: mio::Token(0),
|
||||
};
|
||||
|
||||
// Register a dummy I/O handle for waking up the polling thread.
|
||||
let entry = reactor.register(&reactor.notify_reg.0)?;
|
||||
reactor.notify_token = entry.token;
|
||||
|
||||
Ok(reactor)
|
||||
}
|
||||
|
||||
/// Registers an I/O event source and returns its associated entry.
|
||||
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
|
||||
let mut entries = self.entries.lock().unwrap();
|
||||
|
||||
// Reserve a vacant spot in the slab and use its key as the token value.
|
||||
let vacant = entries.vacant_entry();
|
||||
let token = mio::Token(vacant.key());
|
||||
|
||||
// Allocate an entry and insert it into the slab.
|
||||
let entry = Arc::new(Entry {
|
||||
token,
|
||||
readers: Mutex::new(Vec::new()),
|
||||
writers: Mutex::new(Vec::new()),
|
||||
});
|
||||
vacant.insert(entry.clone());
|
||||
|
||||
// Register the I/O event source in the poller.
|
||||
let interest = mio::Ready::all();
|
||||
let opts = mio::PollOpt::edge();
|
||||
self.poller.register(source, token, interest, opts)?;
|
||||
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
/// Deregisters an I/O event source associated with an entry.
|
||||
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
|
||||
// Deregister the I/O object from the mio instance.
|
||||
self.poller.deregister(source)?;
|
||||
|
||||
// Remove the entry associated with the I/O object.
|
||||
self.entries.lock().unwrap().remove(entry.token.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// fn notify(&self) {
|
||||
// self.notify_reg
|
||||
// .1
|
||||
// .set_readiness(mio::Ready::readable())
|
||||
// .unwrap();
|
||||
// }
|
||||
}
|
||||
|
||||
/// The state of the global networking driver.
|
||||
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
|
||||
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
|
||||
// handles.
|
||||
std::thread::Builder::new()
|
||||
.name("async-std/net".to_string())
|
||||
.spawn(move || {
|
||||
// If the driver thread panics, there's not much we can do. It is not a
|
||||
// recoverable error and there is no place to propagate it into so we just abort.
|
||||
abort_on_panic(|| {
|
||||
main_loop().expect("async networking thread has panicked");
|
||||
})
|
||||
})
|
||||
.expect("cannot start a thread driving blocking tasks");
|
||||
|
||||
Reactor::new().expect("cannot initialize reactor")
|
||||
});
|
||||
|
||||
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
||||
fn main_loop() -> io::Result<()> {
|
||||
let reactor = &REACTOR;
|
||||
let mut events = mio::Events::with_capacity(1000);
|
||||
|
||||
loop {
|
||||
// Block on the poller until at least one new event comes in.
|
||||
reactor.poller.poll(&mut events, None)?;
|
||||
|
||||
// Lock the entire entry table while we're processing new events.
|
||||
let entries = reactor.entries.lock().unwrap();
|
||||
|
||||
for event in events.iter() {
|
||||
let token = event.token();
|
||||
|
||||
if token == reactor.notify_token {
|
||||
// If this is the notification token, we just need the notification state.
|
||||
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
||||
} else {
|
||||
// Otherwise, look for the entry associated with this token.
|
||||
if let Some(entry) = entries.get(token.0) {
|
||||
// Set the readiness flags from this I/O event.
|
||||
let readiness = event.readiness();
|
||||
|
||||
// Wake up reader tasks blocked on this I/O handle.
|
||||
if !(readiness & reader_interests()).is_empty() {
|
||||
for w in entry.readers.lock().unwrap().drain(..) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// Wake up writer tasks blocked on this I/O handle.
|
||||
if !(readiness & writer_interests()).is_empty() {
|
||||
for w in entry.writers.lock().unwrap().drain(..) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An I/O handle powered by the networking driver.
|
||||
///
|
||||
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
||||
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
||||
pub struct Watcher<T: Evented> {
|
||||
/// Data associated with the I/O handle.
|
||||
entry: Arc<Entry>,
|
||||
|
||||
/// The I/O event source.
|
||||
source: Option<T>,
|
||||
}
|
||||
|
||||
impl<T: Evented> Watcher<T> {
|
||||
/// Creates a new I/O handle.
|
||||
///
|
||||
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
||||
/// lifetime of the returned I/O handle.
|
||||
pub fn new(source: T) -> Watcher<T> {
|
||||
Watcher {
|
||||
entry: REACTOR
|
||||
.register(&source)
|
||||
.expect("cannot register an I/O event source"),
|
||||
source: Some(source),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner I/O event source.
|
||||
pub fn get_ref(&self) -> &T {
|
||||
self.source.as_ref().unwrap()
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source for a non-blocking read operation.
|
||||
///
|
||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||
/// will be registered for wakeup when the I/O source becomes readable.
|
||||
pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
|
||||
where
|
||||
F: FnMut(&'a T) -> io::Result<R>,
|
||||
{
|
||||
// If the operation isn't blocked, return its result.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Lock the waker list.
|
||||
let mut list = self.entry.readers.lock().unwrap();
|
||||
|
||||
// Try running the operation again.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Register the task if it isn't registered already.
|
||||
if list.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
list.push(cx.waker().clone());
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source for a non-blocking write operation.
|
||||
///
|
||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||
/// will be registered for wakeup when the I/O source becomes writable.
|
||||
pub fn poll_write_with<'a, F, R>(
|
||||
&'a self,
|
||||
cx: &mut Context<'_>,
|
||||
mut f: F,
|
||||
) -> Poll<io::Result<R>>
|
||||
where
|
||||
F: FnMut(&'a T) -> io::Result<R>,
|
||||
{
|
||||
// If the operation isn't blocked, return its result.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Lock the waker list.
|
||||
let mut list = self.entry.writers.lock().unwrap();
|
||||
|
||||
// Try running the operation again.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Register the task if it isn't registered already.
|
||||
if list.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
list.push(cx.waker().clone());
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Deregisters and returns the inner I/O source.
|
||||
///
|
||||
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
|
||||
#[allow(dead_code)]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
let source = self.source.take().unwrap();
|
||||
REACTOR
|
||||
.deregister(&source, &self.entry)
|
||||
.expect("cannot deregister I/O event source");
|
||||
source
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Evented> Drop for Watcher<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref source) = self.source {
|
||||
REACTOR
|
||||
.deregister(source, &self.entry)
|
||||
.expect("cannot deregister I/O event source");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Watcher")
|
||||
.field("entry", &self.entry)
|
||||
.field("source", &self.source)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a mask containing flags that interest tasks reading from I/O handles.
|
||||
#[inline]
|
||||
fn reader_interests() -> mio::Ready {
|
||||
mio::Ready::all() - mio::Ready::writable()
|
||||
}
|
||||
|
||||
/// Returns a mask containing flags that interest tasks writing into I/O handles.
|
||||
#[inline]
|
||||
fn writer_interests() -> mio::Ready {
|
||||
mio::Ready::writable() | hup()
|
||||
}
|
||||
|
||||
/// Returns a flag containing the hangup status.
|
||||
#[inline]
|
||||
fn hup() -> mio::Ready {
|
||||
#[cfg(unix)]
|
||||
let ready = mio::unix::UnixReady::hup().into();
|
||||
|
||||
#[cfg(not(unix))]
|
||||
let ready = mio::Ready::empty();
|
||||
|
||||
ready
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
//! The runtime.
|
||||
|
||||
use std::env;
|
||||
use std::thread;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::future;
|
||||
|
||||
/// Dummy runtime struct.
|
||||
pub struct Runtime {}
|
||||
|
||||
/// The global runtime.
|
||||
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
// Create an executor thread pool.
|
||||
|
||||
let thread_count = env::var("ASYNC_STD_THREAD_COUNT")
|
||||
.map(|env| {
|
||||
env.parse()
|
||||
.expect("ASYNC_STD_THREAD_COUNT must be a number")
|
||||
})
|
||||
.unwrap_or_else(|_| num_cpus::get())
|
||||
.max(1);
|
||||
|
||||
let thread_name = env::var("ASYNC_STD_THREAD_NAME").unwrap_or("async-std/runtime".to_string());
|
||||
|
||||
for _ in 0..thread_count {
|
||||
thread::Builder::new()
|
||||
.name(thread_name.clone())
|
||||
.spawn(|| crate::task::block_on(future::pending::<()>()))
|
||||
.expect("cannot start a runtime thread");
|
||||
}
|
||||
Runtime {}
|
||||
});
|
@ -0,0 +1,68 @@
|
||||
use core::marker::PhantomData;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use crate::stream::{DoubleEndedStream, ExactSizeStream, FusedStream, Stream};
|
||||
|
||||
/// A stream that never returns any items.
|
||||
///
|
||||
/// This stream is created by the [`pending`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`pending`]: fn.pending.html
|
||||
#[derive(Debug)]
|
||||
pub struct Pending<T> {
|
||||
_marker: PhantomData<T>,
|
||||
}
|
||||
|
||||
/// Creates a stream that never returns any items.
|
||||
///
|
||||
/// The returned stream will always return `Pending` when polled.
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let dur = Duration::from_millis(100);
|
||||
/// let mut s = stream::pending::<()>().timeout(dur);
|
||||
///
|
||||
/// let item = s.next().await;
|
||||
///
|
||||
/// assert!(item.is_some());
|
||||
/// assert!(item.unwrap().is_err());
|
||||
///
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
pub fn pending<T>() -> Pending<T> {
|
||||
Pending {
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for Pending<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DoubleEndedStream for Pending<T> {
|
||||
fn poll_next_back(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FusedStream for Pending<T> {}
|
||||
|
||||
impl<T> ExactSizeStream for Pending<T> {
|
||||
fn len(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue