From c8c075615c0a44ec10c210292caf25899d8b8b0a Mon Sep 17 00:00:00 2001 From: Paul Colomiets Date: Sun, 12 Jan 2020 03:45:41 +0200 Subject: [PATCH] book: Add Production-ready Accept Loop section Part of the #658 work --- docs/src/SUMMARY.md | 3 +- docs/src/patterns/accept-loop.md | 256 +++++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 docs/src/patterns/accept-loop.md diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index d679825..8b49ce7 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -19,8 +19,9 @@ - [Clean Shutdown](./tutorial/clean_shutdown.md) - [Handling Disconnection](./tutorial/handling_disconnection.md) - [Implementing a Client](./tutorial/implementing_a_client.md) -- [TODO: Async Patterns](./patterns.md) +- [Async Patterns](./patterns.md) - [TODO: Collected Small Patterns](./patterns/small-patterns.md) + - [Production-Ready Accept Loop](./patterns/accept-loop.md) - [Security practices](./security/index.md) - [Security Disclosures and Policy](./security/policy.md) - [Glossary](./glossary.md) diff --git a/docs/src/patterns/accept-loop.md b/docs/src/patterns/accept-loop.md new file mode 100644 index 0000000..acc0d75 --- /dev/null +++ b/docs/src/patterns/accept-loop.md @@ -0,0 +1,256 @@ +# Production-Ready Accept Loop + +Production-ready accept loop needs the following things: +1. Handling errors +2. Limiting the number of simultanteous connections to avoid deny-of-service + (DoS) attacks + + +## Handling errors + +There are two kinds of errors in accept loop: +1. Per-connection errors. System uses them to notify that there was a + connection in the queue and it's dropped by peer. Subsequent connection + can be already queued so next connection must be accepted immediately. +2. Resource shortages. When these are encountered it doesn't make sense to + accept next socket immediately. But listener stays active, so you server + should try to accept socket later. + +Here is the example of per-connection error (printed in normal and debug mode): +``` +Error: Connection reset by peer (os error 104) +Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" } +``` + +And the following is the most common example of a resource shortage error: +``` +Error: Too many open files (os error 24) +Error: Os { code: 24, kind: Other, message: "Too many open files" } +``` + +### Testing Application + +To test your application on these errors try the following (this works +on unixes only). + +Lower limit and start the application: +``` +$ ulimit -n 100 +$ cargo run --example your_app + Compiling your_app v0.1.0 (/work) + Finished dev [unoptimized + debuginfo] target(s) in 5.47s + Running `target/debug/examples/your_app` +Server is listening on: http://127.0.0.1:1234 +``` +Then in another console run [`wrk`] benchmark tool: +``` +$ wrk -c 1000 http://127.0.0.1:1234 +Running 10s test @ http://localhost:8080/ + 2 threads and 1000 connections +$ telnet localhost 1234 +Trying ::1... +Connected to localhost. +``` + +Important is to check the following things: + +1. Application doesn't crash on error (but may log errors, see below) +2. It's possible to connect to the application again once load is stopped + (few seconds after `wrk`). This is what `telnet` does in example above, + make sure it prints `Connected to `. +3. The `Too many open files` error is logged in the appropriate log. This + requires to set "maximum number of simultaneous connections" parameter (see + below) of your application to a value greater that `100` for this example. +4. Check CPU usage of the app while doing a test. It should not occupy 100% + of a single CPU core (it's unlikely that you can exhaust CPU by 1000 + connections in Rust, so this means error handling is not right). + +#### Testing non-HTTP applications + +If it's possible, use the appropriate benchmark tool and set the appropriate +number of connections. For example `redis-benchmark` has `-c` parameter for +that, if you implement redis protocol. + +Alternatively, can still use `wrk`, just make sure that connection is not +immediately closed. If it is, put a temporary timeout before handing +connection to the protocol handler, like this: + +```rust,edition2018 +# extern crate async_std; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { +# let listener = TcpListener::bind(addr).await?; +# let mut incoming = listener.incoming(); +while let Some(stream) = incoming.next().await { + task::spawn(async { + task:sleep(Duration::from_secs(10)).await; // 1 + connection_loop(stream).await; + }); +} +# Ok(()) +# } +``` + +1. Make sure the sleep coroutine is inside the spawned task, not in the loop. + +[`wrk`]: https://github.com/wg/wrk + + +### Handling Errors Manually + +Here is how basic accept loop could look like: + +```rust,edition2018 +# extern crate async_std; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener.incoming(); + while let Some(result) = incoming.next().await { + let stream = match stream { + Err(ref e) if is_connection_error(e) => continue, // 1 + Err(e) => { + eprintln!("Error: {}. Pausing for 500ms."); // 3 + task::sleep(Duration::from_millis(500)).await; // 2 + continue; + } + Ok(s) => s, + }; + // body + } + Ok(()) +} +``` + +1. Ignore per-connection errors. +2. Sleep and continue on resource shortage. +3. It's important to log the message, because these errors commonly mean the + misconfiguration of the system and are helpful for operations people running + the application. + +Be sure to [test your application](#testing-application). + + +### External Crates + +The crate [`async-listen`] have a helper to achieve this task: +```rust,edition2018 +# extern crate async_std; +# extern crate async_listen; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +use async_listen::ListenExt; + +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener + .incoming() + .log_warnings(|e| eprintln!("Error: {}. Pausing for 500ms.", e)) // 1 + .handle_errors(Duration::from_millis(500)); + while let Some(socket) = incoming.next().await { // 2 + // body + } + Ok(()) +} +``` + +1. Logs resource shortages (`async-listen` calls them warnings). If you use + `log` crate or any other in your app this should go to the log. +2. Stream yields sockets without `Result` wrapper after `handle_errors` because + all errors are already handled. + +[`async-listen`]: https://crates.io/crates/async-listen/ + +Be sure to [test your application](#testing-application). + + +## Connections Limit + +Even if you've applied everything described in +[Handling Errors](#handling-errors) section, there is still a problem. + +Let's imagine you have a server that needs to open a file to process +client request. At some point, you might encounter the following situation: + +1. There are as much client connection as max file descriptors allowed for + the application. +2. Listener gets `Too many open files` error so it sleeps. +3. Some client sends a request via the previously open connection. +4. Opening a file to serve request fails, because of the same + `Too many open files` error, until some other client drops a connection. + +There are many more possible situations, this is just a small illustation that +limiting number of connections is very useful. Generally, it's one of the ways +to control resources used by a server and avoiding some kinds of deny of +service (DoS) attacks. + +### `async-listen` crate + +Limiting maximum number of simultaneous connections with [`async-listen`] +looks like the following: + +```rust,edition2018 +# extern crate async_std; +# extern crate async_listen; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, TcpStream, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +use async_listen::{ListenExt, Token}; + +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener + .incoming() + .log_warnings(|e| eprintln!("Error: {}. Pausing for 500ms.", e)) + .handle_errors(Duration::from_millis(500)) // 1 + .backpressure(100); + while let Some((token, socket)) = incoming.next().await { // 2 + task::spawn(async move { + connection_loop(&token, stream).await; // 3 + }); + } + Ok(()) +} +async fn connection_loop(_token: &Token, stream: TcpStream) { // 4 + // ... +} +``` + +1. We need to handle errors first, because [`backpressure`] helper expects + stream of `TcpStream` rather than `Result`. +2. The token yielded by a new stream is what is counted by backpressure helper. + I.e. if you drop a token, new connection can be established. +3. We give connection loop a reference to token to bind token's lifetime to + the lifetime of the connection. +4. The token itsellf in the function can be ignored, hence `_token` + +[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure + +Be sure to [test this behavior](#testing-application).