Async Rust: From Futures to Production
Speaker Intro
- Principal Firmware Architect in Microsoft SCHIE (Silicon and Cloud Hardware Infrastructure Engineering) team
- Industry veteran with expertise in security, systems programming (firmware, operating systems, hypervisors), CPU and platform architecture, and C++ systems
- Started programming in Rust in 2017 (@AWS EC2), and have been in love with the language ever since
A deep-dive guide to asynchronous programming in Rust. Unlike most async tutorials that start with tokio::main and hand-wave the internals, this guide builds understanding from first principles — the Future trait, polling, state machines — then progresses to real-world patterns, runtime selection, and production pitfalls.
Who This Is For
- Rust developers who can write synchronous Rust but find async confusing
- Developers from C#, Go, Python, or JavaScript who know
async/awaitbut not Rust’s model - Anyone who’s been bitten by
Future is not Send,Pin<Box<dyn Future>>, or “why does my program hang?”
Prerequisites
You should be comfortable with:
- Ownership, borrowing, and lifetimes
- Traits and generics (including
impl Trait) - Using
Result<T, E>and the?operator - Basic multi-threading (
std::thread::spawn,Arc,Mutex)
No prior async Rust experience is needed.
How to Use This Book
Read linearly the first time. Parts I–III build on each other. Each chapter has:
| Symbol | Meaning |
|---|---|
| 🟢 | Beginner — foundational concept |
| 🟡 | Intermediate — requires earlier chapters |
| 🔴 | Advanced — deep internals or production patterns |
Each chapter includes:
- A “What you’ll learn” block at the top
- Mermaid diagrams for visual learners
- An inline exercise with a hidden solution
- Key Takeaways summarizing the core ideas
- Cross-references to related chapters
Pacing Guide
| Chapters | Topic | Suggested Time | Checkpoint |
|---|---|---|---|
| 1–5 | How Async Works | 6–8 hours | You can explain Future, Poll, Pin, and why Rust has no built-in runtime |
| 6–10 | The Ecosystem | 6–8 hours | You can build futures by hand, choose a runtime, and use tokio’s API |
| 11–13 | Production Async | 6–8 hours | You can write production-grade async code with streams, proper error handling, and graceful shutdown |
| Capstone | Chat Server | 4–6 hours | You’ve built a real async application integrating all concepts |
Total estimated time: 22–30 hours
Working Through Exercises
Every content chapter has an inline exercise. The capstone (Ch 16) integrates everything into a single project. For maximum learning:
- Try the exercise before expanding the solution — struggling is where learning happens
- Type the code, don’t copy-paste — muscle memory matters for Rust’s syntax
- Run every example —
cargo new async-exercisesand test as you go
Table of Contents
Part I: How Async Works
- 1. Why Async is Different in Rust 🟢 — The fundamental difference: Rust has no built-in runtime
- 2. The Future Trait 🟡 —
poll(),Waker, and the contract that makes it all work - 3. How Poll Works 🟡 — The polling state machine and a minimal executor
- 4. Pin and Unpin 🔴 — Why self-referential structs need pinning
- 5. The State Machine Reveal 🟢 — What the compiler actually generates from
async fn
Part II: The Ecosystem
- 6. Building Futures by Hand 🟡 — TimerFuture, Join, Select from scratch
- 7. Executors and Runtimes 🟡 — tokio, smol, async-std, embassy — how to choose
- 8. Tokio Deep Dive 🟡 — Runtime flavors, spawn, channels, sync primitives
- 9. When Tokio Isn’t the Right Fit 🟡 — LocalSet, FuturesUnordered, runtime-agnostic design
- 10. Async Traits 🟡 — RPITIT, dyn dispatch, trait_variant, async closures
Part III: Production Async
- 11. Streams and AsyncIterator 🟡 — Async iteration, AsyncRead/Write, stream combinators
- 12. Common Pitfalls 🔴 — 9 production bugs and how to avoid them
- 13. Production Patterns 🔴 — Graceful shutdown, backpressure, Tower middleware
Appendices
- Summary and Reference Card — Quick-lookup tables and decision trees
- Capstone Project: Async Chat Server — Build a complete async application
1. Why Async is Different in Rust 🟢
What you’ll learn:
- Why Rust has no built-in async runtime (and what that means for you)
- The three key properties: lazy execution, no runtime, zero-cost abstraction
- When async is the right tool (and when it’s slower)
- How Rust’s model compares to C#, Go, Python, and JavaScript
The Fundamental Difference
Most languages with async/await hide the machinery. C# has the CLR thread pool. JavaScript has the event loop. Go has goroutines and a scheduler built into the runtime. Python has asyncio.
Rust has nothing.
There is no built-in runtime, no thread pool, no event loop. The async keyword is a zero-cost compilation strategy — it transforms your function into a state machine that implements the Future trait. Someone else (an executor) must drive that state machine forward.
Three Key Properties of Rust Async
graph LR
subgraph "Other Languages"
EAGER["Eager Execution<br/>Task starts immediately"]
BUILTIN["Built-in Runtime<br/>Thread pool included"]
GC["GC-Managed<br/>No lifetime concerns"]
end
subgraph "Rust"
LAZY["Lazy Execution<br/>Nothing happens until polled"]
BYOB["Bring Your Own Runtime<br/>You choose the executor"]
OWNED["Ownership Applies<br/>Lifetimes, Send, Sync matter"]
end
EAGER -. "opposite" .-> LAZY
BUILTIN -. "opposite" .-> BYOB
GC -. "opposite" .-> OWNED
style LAZY fill:#e8f5e8,color:#000
style BYOB fill:#e8f5e8,color:#000
style OWNED fill:#e8f5e8,color:#000
style EAGER fill:#e3f2fd,color:#000
style BUILTIN fill:#e3f2fd,color:#000
style GC fill:#e3f2fd,color:#000
No Built-In Runtime
// This compiles but does NOTHING:
async fn fetch_data() -> String {
"hello".to_string()
}
fn main() {
let future = fetch_data(); // Creates the Future, but doesn't execute it
// future is just a struct sitting on the stack
// No output, no side effects, nothing happens
drop(future); // Silently dropped — work was never started
}
Compare with C# where Task starts eagerly:
// C# — this immediately starts executing:
async Task<string> FetchData() => "hello";
var task = FetchData(); // Already running!
var result = await task; // Just waits for completion
Lazy Futures vs Eager Tasks
This is the single most important mental shift:
| C# / JavaScript / Python | Go | Rust | |
|---|---|---|---|
| Creation | Task starts executing immediately | Goroutine starts immediately | Future does nothing until polled |
| Dropping | Detached task continues running | Goroutine runs until return | Dropping a Future cancels it |
| Runtime | Built into the language/VM | Built into the binary (M:N scheduler) | You choose (tokio, smol, etc.) |
| Scheduling | Automatic (thread pool) | Automatic (GMP scheduler) | Explicit (spawn, block_on) |
| Cancellation | CancellationToken (cooperative) | context.Context (cooperative) | Drop the future (immediate) |
// To actually RUN a future, you need an executor:
#[tokio::main]
async fn main() {
let result = fetch_data().await; // NOW it executes
println!("{result}");
}
When to Use Async (and When Not To)
graph TD
START["What kind of work?"]
IO["I/O-bound?<br/>(network, files, DB)"]
CPU["CPU-bound?<br/>(computation, parsing)"]
MANY["Many concurrent connections?<br/>(100+)"]
FEW["Few concurrent tasks?<br/>(<10)"]
USE_ASYNC["✅ Use async/await"]
USE_THREADS["✅ Use std::thread or rayon"]
USE_SPAWN_BLOCKING["✅ Use spawn_blocking()"]
MAYBE_SYNC["Consider synchronous code<br/>(simpler, less overhead)"]
START -->|Network, files, DB| IO
START -->|Computation| CPU
IO -->|Yes, many| MANY
IO -->|Just a few| FEW
MANY --> USE_ASYNC
FEW --> MAYBE_SYNC
CPU -->|Parallelize| USE_THREADS
CPU -->|Inside async context| USE_SPAWN_BLOCKING
style USE_ASYNC fill:#c8e6c9,color:#000
style USE_THREADS fill:#c8e6c9,color:#000
style USE_SPAWN_BLOCKING fill:#c8e6c9,color:#000
style MAYBE_SYNC fill:#fff3e0,color:#000
Rule of thumb: Async is for I/O concurrency (doing many things at once while waiting), not CPU parallelism (making one thing faster). If you have 10,000 network connections, async shines. If you’re crunching numbers, use rayon or OS threads.
When Async Can Be Slower
Async isn’t free. For low-concurrency workloads, synchronous code can outperform async:
| Cost | Why |
|---|---|
| State machine overhead | Each .await adds an enum variant; deeply nested futures produce large, complex state machines |
| Dynamic dispatch | Box<dyn Future> adds indirection and kills inlining |
| Context switching | Cooperative scheduling still has cost — the executor must manage a task queue, wakers, and I/O registrations |
| Compile time | Async code generates more complex types, slowing down compilation |
| Debuggability | Stack traces through state machines are harder to read (see Ch. 12) |
Benchmarking guidance: If fewer than ~10 concurrent I/O operations, profile before committing to async. A simple std::thread::spawn per connection scales fine to hundreds of threads on modern Linux.
Exercise: When Would You Use Async?
🏋️ Exercise (click to expand)
For each scenario, decide whether async is appropriate and explain why:
- A web server handling 10,000 concurrent WebSocket connections
- A CLI tool that compresses a single large file
- A service that queries 5 different databases and merges results
- A game engine running a physics simulation at 60 FPS
🔑 Solution
- Async — I/O-bound with massive concurrency. Each connection spends most time waiting for data. Threads would require 10K stacks.
- Sync/threads — CPU-bound, single task. Async adds overhead with no benefit. Use
rayonfor parallel compression. - Async — Five concurrent I/O waits.
tokio::join!runs all five queries simultaneously. - Sync/threads — CPU-bound, latency-sensitive. Async’s cooperative scheduling could introduce frame jitter.
Key Takeaways — Why Async is Different
- Rust futures are lazy — they do nothing until polled by an executor
- There is no built-in runtime — you choose (or build) your own
- Async is a zero-cost compilation strategy that produces state machines
- Async shines for I/O-bound concurrency; for CPU-bound work, use threads or rayon
See also: Ch 2 — The Future Trait for the trait that makes this all work, Ch 7 — Executors and Runtimes for choosing your runtime
2. The Future Trait 🟡
What you’ll learn:
- The
Futuretrait:Output,poll(),Context,Waker- How a waker tells the executor “poll me again”
- The contract: never call
wake()= your program silently hangs- Implementing a real future by hand (
Delay)
Anatomy of a Future
Everything in async Rust ultimately implements this trait:
#![allow(unused)]
fn main() {
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // The future has completed with value T
Pending, // The future is not ready yet — call me back later
}
}
That’s it. A Future is anything that can be polled — asked “are you done yet?” — and responds with either “yes, here’s the result” or “not yet, I’ll wake you up when I’m ready.”
Output, poll(), Context, Waker
sequenceDiagram
participant E as Executor
participant F as Future
participant R as Resource (I/O)
E->>F: poll(cx)
F->>R: Check: is data ready?
R-->>F: Not yet
F->>R: Register waker from cx
F-->>E: Poll::Pending
Note over R: ... time passes, data arrives ...
R->>E: waker.wake() — "I'm ready!"
E->>F: poll(cx) — try again
F->>R: Check: is data ready?
R-->>F: Yes! Here's the data
F-->>E: Poll::Ready(data)
Let’s break down each piece:
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// A future that returns 42 immediately
struct Ready42;
impl Future for Ready42 {
type Output = i32; // What the future eventually produces
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<i32> {
Poll::Ready(42) // Always ready — no waiting
}
}
}
The components:
Output— the type of value produced when the future completespoll()— called by the executor to check progress; returnsReady(value)orPendingPin<&mut Self>— ensures the future won’t be moved in memory (we’ll cover why in Ch. 4)Context— carries theWakerso the future can signal the executor when it’s ready to make progress
The Waker Contract
The Waker is the callback mechanism. When a future returns Pending, it must arrange for waker.wake() to be called later — otherwise the executor will never poll it again and the program hangs.
#![allow(unused)]
fn main() {
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
/// A future that completes after a delay (toy implementation)
struct Delay {
completed: Arc<Mutex<bool>>,
waker_stored: Arc<Mutex<Option<Waker>>>,
duration: Duration,
started: bool,
}
impl Delay {
fn new(duration: Duration) -> Self {
Delay {
completed: Arc::new(Mutex::new(false)),
waker_stored: Arc::new(Mutex::new(None)),
duration,
started: false,
}
}
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Check if already completed
if *self.completed.lock().unwrap() {
return Poll::Ready(());
}
// Store the waker so the background thread can wake us
*self.waker_stored.lock().unwrap() = Some(cx.waker().clone());
// Start the background timer on first poll
if !self.started {
self.started = true;
let completed = Arc::clone(&self.completed);
let waker = Arc::clone(&self.waker_stored);
let duration = self.duration;
thread::spawn(move || {
thread::sleep(duration);
*completed.lock().unwrap() = true;
// CRITICAL: wake the executor so it polls us again
if let Some(w) = waker.lock().unwrap().take() {
w.wake(); // "Hey executor, I'm ready — poll me again!"
}
});
}
Poll::Pending // Not done yet
}
}
}
Key insight: In C#, the TaskScheduler handles waking automatically. In Rust, you (or the I/O library you use) are responsible for calling
waker.wake(). Forget it, and your program silently hangs.
Exercise: Implement a CountdownFuture
🏋️ Exercise (click to expand)
Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!").
Hint: The future needs to store the current count and decrement it on each poll. Remember to always re-register the waker!
🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountdownFuture {
count: u32,
}
impl CountdownFuture {
fn new(start: u32) -> Self {
CountdownFuture { count: start }
}
}
impl Future for CountdownFuture {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
println!("Liftoff!");
Poll::Ready("Liftoff!")
} else {
println!("{}...", self.count);
self.count -= 1;
cx.waker().wake_by_ref(); // Schedule re-poll immediately
Poll::Pending
}
}
}
}
Key takeaway: This future is polled once per count. Each time it returns Pending, it immediately wakes itself to be polled again. In production, you’d use a timer instead of busy-polling.
Key Takeaways — The Future Trait
Future::poll()returnsPoll::Ready(value)orPoll::Pending- A future must register a
Wakerbefore returningPending— the executor uses it to know when to re-pollPin<&mut Self>guarantees the future won’t be moved in memory (needed for self-referential state machines — see Ch 4)- Everything in async Rust —
async fn,.await, combinators — is built on this one trait
See also: Ch 3 — How Poll Works for the executor loop, Ch 6 — Building Futures by Hand for more complex implementations
3. How Poll Works 🟡
What you’ll learn:
- The executor’s poll loop: poll → pending → wake → poll again
- How to build a minimal executor from scratch
- Spurious wake rules and why they matter
- Utility functions:
poll_fn()andyield_now()
The Polling State Machine
The executor runs a loop: poll a future, if it’s Pending, park it until its waker fires, then poll again. This is fundamentally different from OS threads where the kernel handles scheduling.
stateDiagram-v2
[*] --> Idle : Future created
Idle --> Polling : executor calls poll()
Polling --> Complete : Ready(value)
Polling --> Waiting : Pending
Waiting --> Polling : waker.wake() called
Complete --> [*] : Value returned
Important: While in the Waiting state the future must have registered the waker with an I/O source. No registration = hang forever.
A Minimal Executor
To demystify executors, let’s build the simplest possible one:
use std::future::Future;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::pin::Pin;
/// The simplest possible executor: busy-loop poll until Ready
fn block_on<F: Future>(mut future: F) -> F::Output {
// Pin the future on the stack
// SAFETY: `future` is never moved after this point — we only
// access it through the pinned reference until it completes.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
// Create a no-op waker (just keeps polling — inefficient but simple)
fn noop_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker { noop_raw_waker() }
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(std::ptr::null(), vtable)
}
// SAFETY: noop_raw_waker() returns a valid RawWaker with a correct vtable.
let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
let mut cx = Context::from_waker(&waker);
// Busy-loop until the future completes
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(value) => return value,
Poll::Pending => {
// A real executor would park the thread here
// and wait for waker.wake() — we just spin
std::thread::yield_now();
}
}
}
}
// Usage:
fn main() {
let result = block_on(async {
println!("Hello from our mini executor!");
42
});
println!("Got: {result}");
}
Don’t use this in production! It busy-loops, wasting CPU. Real executors (tokio, smol) use
epoll/kqueue/io_uringto sleep until I/O is ready. But this shows the core idea: an executor is just a loop that callspoll().
Wake-Up Notifications
A real executor is event-driven. When all futures are Pending, the executor sleeps. The waker is an interrupt mechanism:
#![allow(unused)]
fn main() {
// Conceptual model of a real executor's main loop:
fn executor_loop(tasks: &mut TaskQueue) {
loop {
// 1. Poll all tasks that have been woken
while let Some(task) = tasks.get_woken_task() {
match task.poll() {
Poll::Ready(result) => task.complete(result),
Poll::Pending => { /* task stays in queue, waiting for wake */ }
}
}
// 2. Sleep until something wakes us up (epoll_wait, kevent, etc.)
// This is where mio/polling does the heavy lifting
tasks.wait_for_events(); // blocks until an I/O event or waker fires
}
}
}
Spurious Wakes
A future may be polled even when its I/O isn’t ready. This is called a spurious wake. Futures must handle this correctly:
#![allow(unused)]
fn main() {
impl Future for MyFuture {
type Output = Data;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Data> {
// ✅ CORRECT: Always re-check the actual condition
if let Some(data) = self.try_read_data() {
Poll::Ready(data)
} else {
// Re-register the waker (it might have changed!)
self.register_waker(cx.waker());
Poll::Pending
}
// ❌ WRONG: Assuming poll means data is ready
// let data = self.read_data(); // might block or panic
// Poll::Ready(data)
}
}
}
Rules for implementing poll():
- Never block — return
Pendingimmediately if not ready - Always re-register the waker — it may have changed between polls
- Handle spurious wakes — check the actual condition, don’t assume readiness
- Don’t poll after
Ready— behavior is unspecified (may panic, returnPending, or repeatReady). OnlyFusedFutureguarantees safe post-completion polling
🏋️ Exercise: Implement a CountdownFuture (click to expand)
Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count as a side-effect each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!"). (Note: a Future produces only one final value — the printing is a side-effect, not a yielded value. For multiple async values, see Stream in Ch. 11.)
Hint: This doesn’t need a real I/O source — it can wake itself immediately with cx.waker().wake_by_ref() after each decrement.
🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CountdownFuture {
count: u32,
}
impl CountdownFuture {
fn new(start: u32) -> Self {
CountdownFuture { count: start }
}
}
impl Future for CountdownFuture {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready("Liftoff!")
} else {
println!("{}...", self.count);
self.count -= 1;
// Wake immediately — we're always ready to make progress
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
// Usage with our mini executor or tokio:
// let msg = block_on(CountdownFuture::new(5));
// prints: 5... 4... 3... 2... 1...
// msg == "Liftoff!"
}
Key takeaway: Even though this future is always ready to progress, it returns Pending to yield control between steps. It calls wake_by_ref() immediately so the executor re-polls it right away. This is the basis of cooperative multitasking — each future voluntarily yields.
Handy Utilities: poll_fn and yield_now
Two utilities from the standard library and tokio that avoid writing full Future impls:
#![allow(unused)]
fn main() {
use std::future::poll_fn;
use std::task::Poll;
// poll_fn: create a one-off future from a closure
let value = poll_fn(|cx| {
// Do something with cx.waker(), return Ready or Pending
Poll::Ready(42)
}).await;
// Real-world use: bridge a callback-based API into async
async fn read_when_ready(source: &MySource) -> Data {
poll_fn(|cx| source.poll_read(cx)).await
}
}
#![allow(unused)]
fn main() {
// yield_now: voluntarily yield control to the executor
// Useful in CPU-heavy async loops to avoid starving other tasks
async fn cpu_heavy_work(items: &[Item]) {
for (i, item) in items.iter().enumerate() {
process(item); // CPU work
// Every 100 items, yield to let other tasks run
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
When to use
yield_now(): If your async function does CPU work in a loop without any.awaitpoints, it monopolizes the executor thread. Insertyield_now().awaitperiodically to enable cooperative multitasking.
Key Takeaways — How Poll Works
- An executor repeatedly calls
poll()on futures that have been woken- Futures must handle spurious wakes — always re-check the actual condition
poll_fn()lets you create ad-hoc futures from closuresyield_now()is a cooperative scheduling escape hatch for CPU-heavy async code
See also: Ch 2 — The Future Trait for the trait definition, Ch 5 — The State Machine Reveal for what the compiler generates
4. Pin and Unpin 🔴
What you’ll learn:
- Why self-referential structs break when moved in memory
- What
Pin<P>guarantees and how it prevents moves- The three practical pinning patterns:
Box::pin(),tokio::pin!(),Pin::new()- When
Unpingives you an escape hatch
Why Pin Exists
This is the most confusing concept in async Rust. Let’s build the intuition step by step.
The Problem: Self-Referential Structs
When the compiler transforms an async fn into a state machine, that state machine may contain references to its own fields. This creates a self-referential struct — and moving it in memory would invalidate those internal references.
#![allow(unused)]
fn main() {
// What the compiler generates (simplified) for:
// async fn example() {
// let data = vec![1, 2, 3];
// let reference = &data; // Points to data above
// use_ref(reference).await;
// }
// Becomes something like:
enum ExampleStateMachine {
State0 {
data: Vec<i32>,
// reference: &Vec<i32>, // PROBLEM: points to `data` above
// // If this struct moves, the pointer is dangling!
},
State1 {
data: Vec<i32>,
reference: *const Vec<i32>, // Internal pointer to data field
},
Complete,
}
}
graph LR
subgraph "Before Move (Valid)"
A["data: [1,2,3]<br/>at addr 0x1000"]
B["reference: 0x1000<br/>(points to data)"]
B -->|"valid"| A
end
subgraph "After Move (INVALID)"
C["data: [1,2,3]<br/>at addr 0x2000"]
D["reference: 0x1000<br/>(still points to OLD location!)"]
D -->|"dangling!"| E["💥 0x1000<br/>(freed/garbage)"]
end
style E fill:#ffcdd2,color:#000
style D fill:#ffcdd2,color:#000
style B fill:#c8e6c9,color:#000
Self-Referential Structs
This isn’t an academic concern. Every async fn that holds a reference across an .await point creates a self-referential state machine:
#![allow(unused)]
fn main() {
async fn problematic() {
let data = String::from("hello");
let slice = &data[..]; // slice borrows data
some_io().await; // <-- .await point: state machine stores both data AND slice
println!("{slice}"); // uses the reference after await
}
// The generated state machine has `data: String` and `slice: &str`
// where slice points INTO data. Moving the state machine = dangling pointer.
}
Pin in Practice
Pin<P> is a wrapper that prevents moving the value behind the pointer:
#![allow(unused)]
fn main() {
use std::pin::Pin;
let mut data = String::from("hello");
// Pin it — now it can't be moved
let pinned: Pin<&mut String> = Pin::new(&mut data);
// Can still use it:
println!("{}", pinned.as_ref().get_ref()); // "hello"
// But we can't get &mut String back (which would allow mem::swap):
// let mutable: &mut String = Pin::into_inner(pinned); // Only if String: Unpin
// String IS Unpin, so this actually works for String.
// But for self-referential state machines (which are !Unpin), it's blocked.
}
In real code, you mostly encounter Pin in three places:
#![allow(unused)]
fn main() {
// 1. poll() signature — all futures are polled through Pin
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output>;
// 2. Box::pin() — heap-allocate and pin a future
let future: Pin<Box<dyn Future<Output = i32>>> = Box::pin(async { 42 });
// 3. tokio::pin!() — pin a future on the stack
tokio::pin!(my_future);
// Now my_future: Pin<&mut impl Future>
}
The Unpin Escape Hatch
Most types in Rust are Unpin — they don’t contain self-references, so pinning is a no-op. Only compiler-generated state machines (from async fn) are !Unpin.
#![allow(unused)]
fn main() {
// These are all Unpin — pinning them does nothing special:
// i32, String, Vec<T>, HashMap<K,V>, Box<T>, &T, &mut T
// These are !Unpin — they MUST be pinned before polling:
// The state machines generated by `async fn` and `async {}`
// Practical implication:
// If you write a Future by hand and it has NO self-references,
// implement Unpin to make it easier to work with:
impl Unpin for MySimpleFuture {} // "I'm safe to move, trust me"
}
Quick Reference
| What | When | How |
|---|---|---|
| Pin a future on the heap | Storing in a collection, returning from function | Box::pin(future) |
| Pin a future on the stack | Local use in select! or manual polling | tokio::pin!(future) or pin_mut! from pin-utils |
| Pin in function signature | Accepting pinned futures | future: Pin<&mut F> |
| Require Unpin | When you need to move a future after creation | F: Future + Unpin |
🏋️ Exercise: Pin and Move (click to expand)
Challenge: Which of these code snippets compile? For each one that doesn’t, explain why and fix it.
#![allow(unused)]
fn main() {
// Snippet A
let fut = async { 42 };
let pinned = Box::pin(fut);
let moved = pinned; // Move the Box
let result = moved.await;
// Snippet B
let fut = async { 42 };
tokio::pin!(fut);
let moved = fut; // Move the pinned future
let result = moved.await;
// Snippet C
use std::pin::Pin;
let mut fut = async { 42 };
let pinned = Pin::new(&mut fut);
}
🔑 Solution
Snippet A: ✅ Compiles. Box::pin() puts the future on the heap. Moving the Box moves the pointer, not the future itself. The future stays pinned in its heap location.
Snippet B: ❌ Does not compile. tokio::pin! pins the future to the stack and rebinds fut as Pin<&mut ...>. You can’t move out of a pinned reference. Fix: Don’t move it — use it in place:
#![allow(unused)]
fn main() {
let fut = async { 42 };
tokio::pin!(fut);
let result = fut.await; // Use directly, don't reassign
}
Snippet C: ❌ Does not compile. Pin::new() requires T: Unpin. Async blocks generate !Unpin types. Fix: Use Box::pin() or unsafe Pin::new_unchecked():
#![allow(unused)]
fn main() {
let fut = async { 42 };
let pinned = Box::pin(fut); // Heap-pin — works with !Unpin
}
Key takeaway: Box::pin() is the safe, easy way to pin !Unpin futures. tokio::pin!() pins on the stack but the future can’t be moved after. Pin::new() only works with Unpin types.
Key Takeaways — Pin and Unpin
Pin<P>is a wrapper that prevents the pointee from being moved — essential for self-referential state machinesBox::pin()is the safe, easy default for pinning futures on the heaptokio::pin!()pins on the stack — cheaper but the future can’t be moved afterwardUnpinis an auto-trait opt-out: types that implementUnpincan be moved even when pinned (most types areUnpin; async blocks are not)
See also: Ch 2 — The Future Trait for
Pin<&mut Self>in poll, Ch 5 — The State Machine Reveal for why async state machines are self-referential
5. The State Machine Reveal 🟢
What you’ll learn:
- How the compiler transforms
async fninto an enum state machine- Side-by-side comparison: source code vs generated states
- Why large stack allocations in
async fnblow up future sizes- The drop optimization: values drop as soon as they’re no longer needed
What the Compiler Actually Generates
When you write async fn, the compiler transforms your sequential-looking code into an enum-based state machine. Understanding this transformation is the key to understanding async Rust’s performance characteristics and many of its quirks.
Side-by-Side: async fn vs State Machine
#![allow(unused)]
fn main() {
// What you write:
async fn fetch_two_pages() -> String {
let page1 = http_get("https://example.com/a").await;
let page2 = http_get("https://example.com/b").await;
format!("{page1}\n{page2}")
}
}
The compiler generates something conceptually like this:
#![allow(unused)]
fn main() {
enum FetchTwoPagesStateMachine {
// State 0: About to call http_get for page1
Start,
// State 1: Waiting for page1, holding the future
WaitingPage1 {
fut1: HttpGetFuture,
},
// State 2: Got page1, waiting for page2
WaitingPage2 {
page1: String,
fut2: HttpGetFuture,
},
// Terminal state
Complete,
}
impl Future for FetchTwoPagesStateMachine {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
loop {
match self.as_mut().get_mut() {
Self::Start => {
let fut1 = http_get("https://example.com/a");
*self.as_mut().get_mut() = Self::WaitingPage1 { fut1 };
}
Self::WaitingPage1 { fut1 } => {
let page1 = match Pin::new(fut1).poll(cx) {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
let fut2 = http_get("https://example.com/b");
*self.as_mut().get_mut() = Self::WaitingPage2 { page1, fut2 };
}
Self::WaitingPage2 { page1, fut2 } => {
let page2 = match Pin::new(fut2).poll(cx) {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
let result = format!("{page1}\n{page2}");
*self.as_mut().get_mut() = Self::Complete;
return Poll::Ready(result);
}
Self::Complete => panic!("polled after completion"),
}
}
}
}
}
Note: This desugaring is conceptual. The real compiler output uses
unsafepin projections — theget_mut()calls shown here requireUnpin, but async state machines are!Unpin. The goal is to illustrate state transitions, not produce compilable code.
stateDiagram-v2
[*] --> Start
Start --> WaitingPage1: Create http_get future #1
WaitingPage1 --> WaitingPage1: poll() → Pending
WaitingPage1 --> WaitingPage2: poll() → Ready(page1)
WaitingPage2 --> WaitingPage2: poll() → Pending
WaitingPage2 --> Complete: poll() → Ready(page2)
Complete --> [*]: Return format!("{page1}\\n{page2}")
State contents:
- WaitingPage1 — stores
fut1: HttpGetFuture(page2 not yet allocated)- WaitingPage2 — stores
page1: String,fut2: HttpGetFuture(fut1 has been dropped)
Why This Matters for Performance
Zero-cost: The state machine is a stack-allocated enum. No heap allocation per future, no garbage collector, no boxing — unless you explicitly use Box::pin().
Size: The enum’s size is the maximum of all its variants. Each .await point creates a new variant. This means:
#![allow(unused)]
fn main() {
async fn small() {
let a: u8 = 0;
yield_now().await;
let b: u8 = 0;
yield_now().await;
}
// Size ≈ max(size_of(u8), size_of(u8)) + discriminant + future sizes
// ≈ small!
async fn big() {
let buf: [u8; 1_000_000] = [0; 1_000_000]; // 1MB on the stack!
some_io().await;
process(&buf);
}
// Size ≈ 1MB + inner future sizes
// ⚠️ Don't stack-allocate huge buffers in async functions!
// Use Vec<u8> or Box<[u8]> instead.
}
Drop optimization: When a state machine transitions, it drops values no longer needed. In the example above, fut1 is dropped when we transition from WaitingPage1 to WaitingPage2 — the compiler inserts the drop automatically.
Practical rule: Large stack allocations in
async fnblow up the future’s size. If you see stack overflows in async code, check for large arrays or deeply nested futures. UseBox::pin()to heap-allocate sub-futures if needed.
Exercise: Predict the State Machine
🏋️ Exercise (click to expand)
Challenge: Given this async function, sketch the state machine the compiler generates. How many states (enum variants) does it have? What values are stored in each?
#![allow(unused)]
fn main() {
async fn pipeline(url: &str) -> Result<usize, Error> {
let response = fetch(url).await?;
let body = response.text().await?;
let parsed = parse(body).await?;
Ok(parsed.len())
}
}
🔑 Solution
Four states:
- Start — stores
url - WaitingFetch — stores
url,fetchfuture - WaitingText — stores
response,text()future - WaitingParse — stores
body,parsefuture - Done — returned
Ok(parsed.len())
Each .await creates a yield point = a new enum variant. The ? adds early-exit paths but doesn’t add extra states — it’s just a match on the Poll::Ready value.
Key Takeaways — The State Machine Reveal
async fncompiles to an enum with one variant per.awaitpoint- The future’s size = max of all variant sizes — large stack values blow it up
- The compiler inserts drops at state transitions automatically
- Use
Box::pin()or heap allocation when future size becomes a problem
See also: Ch 4 — Pin and Unpin for why the generated enum needs pinning, Ch 6 — Building Futures by Hand to build these state machines yourself
6. Building Futures by Hand 🟡
What you’ll learn:
- Implementing a
TimerFuturewith thread-based waking- Building a
Joincombinator: run two futures concurrently- Building a
Selectcombinator: race two futures- How combinators compose — futures all the way down
A Simple Timer Future
Now let’s build real, useful futures from scratch. This cements the theory from chapters 2-5.
TimerFuture: A Complete Example
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn a thread that sets completed=true after the duration
let thread_shared_state = Arc::clone(&shared_state);
thread::spawn(move || {
thread::sleep(duration);
let mut state = thread_shared_state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake(); // Notify the executor
}
});
TimerFuture { shared_state }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// Store the waker so the timer thread can wake us
// IMPORTANT: Always update the waker — the executor may
// have changed it between polls
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
// Usage:
// async fn example() {
// println!("Starting timer...");
// TimerFuture::new(Duration::from_secs(2)).await;
// println!("Timer done!");
// }
//
// ⚠️ This spawns an OS thread per timer — fine for learning, but in
// production use `tokio::time::sleep` which is backed by a shared
// timer wheel and requires zero extra threads.
}
Join: Running Two Futures Concurrently
Join polls two futures and completes when both finish. This is how tokio::join! works internally:
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Polls two futures concurrently, returns both results as a tuple
pub struct Join<A, B>
where
A: Future,
B: Future,
{
a: MaybeDone<A>,
b: MaybeDone<B>,
}
enum MaybeDone<F: Future> {
Pending(F),
Done(F::Output),
Taken, // Output has been taken
}
impl<A, B> Join<A, B>
where
A: Future,
B: Future,
{
pub fn new(a: A, b: B) -> Self {
Join {
a: MaybeDone::Pending(a),
b: MaybeDone::Pending(b),
}
}
}
impl<A, B> Future for Join<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = (A::Output, B::Output);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A if not done
if let MaybeDone::Pending(ref mut fut) = self.a {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.a = MaybeDone::Done(val);
}
}
// Poll B if not done
if let MaybeDone::Pending(ref mut fut) = self.b {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.b = MaybeDone::Done(val);
}
}
// Both done?
match (&self.a, &self.b) {
(MaybeDone::Done(_), MaybeDone::Done(_)) => {
// Take both outputs
let a_val = match std::mem::replace(&mut self.a, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
let b_val = match std::mem::replace(&mut self.b, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
Poll::Ready((a_val, b_val))
}
_ => Poll::Pending, // At least one is still pending
}
}
}
// Usage:
// let (page1, page2) = Join::new(
// http_get("https://example.com/a"),
// http_get("https://example.com/b"),
// ).await;
// Both requests run concurrently!
}
Key insight: “Concurrent” here means interleaved on the same thread. Join doesn’t spawn threads — it polls both futures in the same
poll()call. This is cooperative concurrency, not parallelism.
graph LR
subgraph "Future Combinators"
direction TB
TIMER["TimerFuture<br/>Single future, wake after delay"]
JOIN["Join<A, B><br/>Wait for BOTH"]
SELECT["Select<A, B><br/>Wait for FIRST"]
RETRY["RetryFuture<br/>Re-create on failure"]
end
TIMER --> JOIN
TIMER --> SELECT
SELECT --> RETRY
style TIMER fill:#d4efdf,stroke:#27ae60,color:#000
style JOIN fill:#e8f4f8,stroke:#2980b9,color:#000
style SELECT fill:#fef9e7,stroke:#f39c12,color:#000
style RETRY fill:#fadbd8,stroke:#e74c3c,color:#000
Select: Racing Two Futures
Select completes when either future finishes first (the other is dropped):
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub enum Either<A, B> {
Left(A),
Right(B),
}
/// Returns whichever future completes first; drops the other
pub struct Select<A, B> {
a: A,
b: B,
}
impl<A, B> Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
pub fn new(a: A, b: B) -> Self {
Select { a, b }
}
}
impl<A, B> Future for Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = Either<A::Output, B::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A first
if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
return Poll::Ready(Either::Left(val));
}
// Then poll B
if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
return Poll::Ready(Either::Right(val));
}
Poll::Pending
}
}
// Usage with timeout:
// match Select::new(http_get(url), TimerFuture::new(timeout)).await {
// Either::Left(response) => println!("Got response: {}", response),
// Either::Right(()) => println!("Request timed out!"),
// }
}
Fairness note: Our
Selectalways polls A first — if both are ready, A always wins. Tokio’sselect!macro randomizes the poll order for fairness.
🏋️ Exercise: Build a RetryFuture (click to expand)
Challenge: Build a RetryFuture<F, Fut> that takes a closure F: Fn() -> Fut and retries up to N times if the inner future returns Err. It should return the first Ok result or the last Err.
Hint: You’ll need states for “running attempt” and “all attempts exhausted.”
🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
factory: F,
current: Option<Fut>,
remaining: usize,
last_error: Option<E>,
}
impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
pub fn new(max_attempts: usize, factory: F) -> Self {
let current = Some((factory)());
RetryFuture {
factory,
current,
remaining: max_attempts.saturating_sub(1),
last_error: None,
}
}
}
impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<T, E>> + Unpin,
T: Unpin,
E: Unpin,
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(ref mut fut) = self.current {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
Poll::Ready(Err(e)) => {
self.last_error = Some(e);
if self.remaining > 0 {
self.remaining -= 1;
self.current = Some((self.factory)());
// Loop to poll the new future immediately
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
}
}
// Usage:
// let result = RetryFuture::new(3, || async {
// http_get("https://flaky-server.com/api").await
// }).await;
}
Key takeaway: The retry future is itself a state machine: it holds the current attempt and creates new inner futures on failure. This is how combinators compose — futures all the way down.
Key Takeaways — Building Futures by Hand
- A future needs three things: state, a
poll()implementation, and a waker registrationJoinpolls both sub-futures;Selectreturns whichever finishes first- Combinators are themselves futures wrapping other futures — it’s turtles all the way down
- Building futures by hand gives deep insight, but in production use
tokio::join!/select!
See also: Ch 2 — The Future Trait for the trait definition, Ch 8 — Tokio Deep Dive for production-grade equivalents
7. Executors and Runtimes 🟡
What you’ll learn:
- What an executor does: poll + sleep efficiently
- The six major runtimes: mio, io_uring, tokio, async-std, smol, embassy
- A decision tree for choosing the right runtime
- Why runtime-agnostic library design matters
What an Executor Does
An executor has two jobs:
- Poll futures when they’re ready to make progress
- Sleep efficiently when no futures are ready (using OS I/O notification APIs)
graph TB
subgraph Executor["Executor (e.g., tokio)"]
QUEUE["Task Queue"]
POLLER["I/O Poller<br/>(epoll/kqueue/io_uring)"]
THREADS["Worker Thread Pool"]
end
subgraph Tasks
T1["Task 1<br/>(HTTP request)"]
T2["Task 2<br/>(DB query)"]
T3["Task 3<br/>(File read)"]
end
subgraph OS["Operating System"]
NET["Network Stack"]
DISK["Disk I/O"]
end
T1 --> QUEUE
T2 --> QUEUE
T3 --> QUEUE
QUEUE --> THREADS
THREADS -->|"poll()"| T1
THREADS -->|"poll()"| T2
THREADS -->|"poll()"| T3
POLLER <-->|"register/notify"| NET
POLLER <-->|"register/notify"| DISK
POLLER -->|"wake tasks"| QUEUE
style Executor fill:#e3f2fd,color:#000
style OS fill:#f3e5f5,color:#000
mio: The Foundation Layer
mio (Metal I/O) is not an executor — it’s the lowest-level cross-platform I/O notification library. It wraps epoll (Linux), kqueue (macOS/BSD), and IOCP (Windows).
#![allow(unused)]
fn main() {
// Conceptual mio usage (simplified):
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let mut server = TcpListener::bind("0.0.0.0:8080")?;
poll.registry().register(&mut server, Token(0), Interest::READABLE)?;
// Event loop — blocks until something happens
loop {
poll.poll(&mut events, None)?; // Sleeps until I/O event
for event in events.iter() {
match event.token() {
Token(0) => { /* server has a new connection */ }
_ => { /* other I/O ready */ }
}
}
}
}
Most developers never touch mio directly — tokio and smol build on top of it.
io_uring: The Completion-Based Future
Linux’s io_uring (kernel 5.1+) represents a fundamental shift from the readiness-based I/O model that mio/epoll use:
Readiness-based (epoll / mio / tokio):
1. Ask: "Is this socket readable?" → epoll_wait()
2. Kernel: "Yes, it's ready" → EPOLLIN event
3. App: read(fd, buf) → might still block briefly!
Completion-based (io_uring):
1. Submit: "Read from this socket into this buffer" → SQE
2. Kernel: does the read asynchronously
3. App: gets completed result with data → CQE
graph LR
subgraph "Readiness Model (epoll)"
A1["App: is it ready?"] --> K1["Kernel: yes"]
K1 --> A2["App: now read()"]
A2 --> K2["Kernel: here's data"]
end
subgraph "Completion Model (io_uring)"
B1["App: read this for me"] --> K3["Kernel: working..."]
K3 --> B2["App: got result + data"]
end
style B1 fill:#c8e6c9,color:#000
style B2 fill:#c8e6c9,color:#000
The ownership challenge: io_uring requires the kernel to own the buffer until the operation completes. This conflicts with Rust’s standard AsyncRead trait which borrows the buffer. That’s why tokio-uring has different I/O traits:
#![allow(unused)]
fn main() {
// Standard tokio (readiness-based) — borrows the buffer:
let n = stream.read(&mut buf).await?; // buf is borrowed
// tokio-uring (completion-based) — takes ownership of the buffer:
let (result, buf) = stream.read(buf).await; // buf is moved in, returned back
let n = result?;
}
// Cargo.toml: tokio-uring = "0.5"
// NOTE: Linux-only, requires kernel 5.1+
fn main() {
tokio_uring::start(async {
let file = tokio_uring::fs::File::open("data.bin").await.unwrap();
let buf = vec![0u8; 4096];
let (result, buf) = file.read_at(buf, 0).await;
let bytes_read = result.unwrap();
println!("Read {} bytes: {:?}", bytes_read, &buf[..bytes_read]);
});
}
| Aspect | epoll (tokio) | io_uring (tokio-uring) |
|---|---|---|
| Model | Readiness notification | Completion notification |
| Syscalls | epoll_wait + read/write | Batched SQE/CQE ring |
| Buffer ownership | App retains (&mut buf) | Ownership transfer (move buf) |
| Platform | Linux, macOS (kqueue), Windows (IOCP) | Linux 5.1+ only |
| Zero-copy | No (userspace copy) | Yes (registered buffers) |
| Maturity | Production-ready | Experimental |
When to use io_uring: High-throughput file I/O or networking where syscall overhead is the bottleneck (databases, storage engines, proxies serving 100k+ connections). For most applications, standard tokio with epoll is the right choice.
tokio: The Batteries-Included Runtime
The dominant async runtime in the Rust ecosystem. Used by Axum, Hyper, Tonic, and most production Rust servers.
// Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
#[tokio::main]
async fn main() {
// Spawns a multi-threaded runtime with work-stealing scheduler
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
"done"
});
let result = handle.await.unwrap();
println!("{result}");
}
tokio features: Timer, I/O, TCP/UDP, Unix sockets, signal handling, sync primitives (Mutex, RwLock, Semaphore, channels), fs, process, tracing integration.
async-std: The Standard Library Mirror
Mirrors the std API with async versions. Less popular than tokio but simpler for beginners.
// Cargo.toml:
// [dependencies]
// async-std = { version = "1", features = ["attributes"] }
#[async_std::main]
async fn main() {
use async_std::fs;
let content = fs::read_to_string("hello.txt").await.unwrap();
println!("{content}");
}
smol: The Minimalist Runtime
Small, zero-dependency async runtime. Great for libraries that want async without pulling in tokio.
// Cargo.toml:
// [dependencies]
// smol = "2"
fn main() {
smol::block_on(async {
let result = smol::unblock(|| {
// Runs blocking code on a thread pool
std::fs::read_to_string("hello.txt")
}).await.unwrap();
println!("{result}");
});
}
embassy: Async for Embedded (no_std)
Async runtime for embedded systems. No heap allocation, no std required.
// Runs on microcontrollers (e.g., STM32, nRF52, RP2040)
#[embassy_executor::main]
async fn main(spawner: embassy_executor::Spawner) {
// Blink an LED with async/await — no RTOS needed!
let mut led = Output::new(p.PA5, Level::Low, Speed::Low);
loop {
led.set_high();
Timer::after(Duration::from_millis(500)).await;
led.set_low();
Timer::after(Duration::from_millis(500)).await;
}
}
Runtime Decision Tree
graph TD
START["Choosing a Runtime"]
Q1{"Building a<br/>network server?"}
Q2{"Need tokio ecosystem<br/>(Axum, Tonic, Hyper)?"}
Q3{"Building a library?"}
Q4{"Embedded /<br/>no_std?"}
Q5{"Want minimal<br/>dependencies?"}
TOKIO["🟢 tokio<br/>Best ecosystem, most popular"]
SMOL["🔵 smol<br/>Minimal, no ecosystem lock-in"]
EMBASSY["🟠 embassy<br/>Embedded-first, no alloc"]
ASYNC_STD["🟣 async-std<br/>std-like API, good for learning"]
AGNOSTIC["🔵 runtime-agnostic<br/>Use futures crate only"]
START --> Q1
Q1 -->|Yes| Q2
Q1 -->|No| Q3
Q2 -->|Yes| TOKIO
Q2 -->|No| Q5
Q3 -->|Yes| AGNOSTIC
Q3 -->|No| Q4
Q4 -->|Yes| EMBASSY
Q4 -->|No| Q5
Q5 -->|Yes| SMOL
Q5 -->|No| ASYNC_STD
style TOKIO fill:#c8e6c9,color:#000
style SMOL fill:#bbdefb,color:#000
style EMBASSY fill:#ffe0b2,color:#000
style ASYNC_STD fill:#e1bee7,color:#000
style AGNOSTIC fill:#bbdefb,color:#000
Runtime Comparison Table
| Feature | tokio | async-std | smol | embassy |
|---|---|---|---|---|
| Ecosystem | Dominant | Small | Minimal | Embedded |
| Multi-threaded | ✅ Work-stealing | ✅ | ✅ | ❌ (single-core) |
| no_std | ❌ | ❌ | ❌ | ✅ |
| Timer | ✅ Built-in | ✅ Built-in | Via async-io | ✅ HAL-based |
| I/O | ✅ Own abstractions | ✅ std mirror | ✅ Via async-io | ✅ HAL drivers |
| Channels | ✅ Rich set | ✅ | Via async-channel | ✅ |
| Learning curve | Medium | Low | Low | High (HW) |
| Binary size | Large | Medium | Small | Tiny |
🏋️ Exercise: Runtime Comparison (click to expand)
Challenge: Write the same program using three different runtimes (tokio, smol, and async-std). The program should:
- Fetch a URL (simulate with a sleep)
- Read a file (simulate with a sleep)
- Print both results
This exercise demonstrates that the async/await code is the same — only the runtime setup differs.
🔑 Solution
// ----- tokio version -----
// Cargo.toml: tokio = { version = "1", features = ["full"] }
#[tokio::main]
async fn main() {
let (url_result, file_result) = tokio::join!(
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
);
println!("URL: {url_result}, File: {file_result}");
}
// ----- smol version -----
// Cargo.toml: smol = "2", futures-lite = "2"
fn main() {
smol::block_on(async {
let (url_result, file_result) = futures_lite::future::zip(
async {
smol::Timer::after(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
smol::Timer::after(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
).await;
println!("URL: {url_result}, File: {file_result}");
});
}
// ----- async-std version -----
// Cargo.toml: async-std = { version = "1", features = ["attributes"] }
#[async_std::main]
async fn main() {
let (url_result, file_result) = futures::future::join(
async {
async_std::task::sleep(std::time::Duration::from_millis(100)).await;
"Response from URL"
},
async {
async_std::task::sleep(std::time::Duration::from_millis(50)).await;
"Contents of file"
},
).await;
println!("URL: {url_result}, File: {file_result}");
}
Key takeaway: The async business logic is identical across runtimes. Only the entry point and timer/IO APIs differ. This is why writing runtime-agnostic libraries (using only std::future::Future) is valuable.
Key Takeaways — Executors and Runtimes
- An executor’s job: poll futures when woken, sleep efficiently using OS I/O APIs
- tokio is the default for servers; smol for minimal footprint; embassy for embedded
- Your business logic should depend on
std::future::Future, not a specific runtime- io_uring (Linux 5.1+) is the future of high-perf I/O but the ecosystem is still maturing
See also: Ch 8 — Tokio Deep Dive for tokio specifics, Ch 9 — When Tokio Isn’t the Right Fit for alternatives
8. Tokio Deep Dive 🟡
What you’ll learn:
- Runtime flavors: multi-thread vs current-thread and when to use each
tokio::spawn, the'staticrequirement, andJoinHandle- Task cancellation semantics (cancel-on-drop)
- Sync primitives: Mutex, RwLock, Semaphore, and all four channel types
Runtime Flavors: Multi-Thread vs Current-Thread
Tokio offers two runtime configurations:
// Multi-threaded (default with #[tokio::main])
// Uses a work-stealing thread pool — tasks can move between threads
#[tokio::main]
async fn main() {
// N worker threads (default = number of CPU cores)
// Tasks are Send + 'static
}
// Current-thread — everything runs on one thread
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Single-threaded — tasks don't need to be Send
// Lighter weight, good for simple tools or WASM
}
// Manual runtime construction:
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Running on custom runtime");
});
graph TB
subgraph "Multi-Thread (default)"
MT_Q1["Thread 1<br/>Task A, Task D"]
MT_Q2["Thread 2<br/>Task B"]
MT_Q3["Thread 3<br/>Task C, Task E"]
STEAL["Work Stealing:<br/>idle threads steal from busy ones"]
MT_Q1 <--> STEAL
MT_Q2 <--> STEAL
MT_Q3 <--> STEAL
end
subgraph "Current-Thread"
ST_Q["Single Thread<br/>Task A → Task B → Task C → Task D"]
end
style MT_Q1 fill:#c8e6c9,color:#000
style MT_Q2 fill:#c8e6c9,color:#000
style MT_Q3 fill:#c8e6c9,color:#000
style ST_Q fill:#bbdefb,color:#000
tokio::spawn and the ’static Requirement
tokio::spawn puts a future onto the runtime’s task queue. Because it might run on any worker thread at any time, the future must be Send + 'static:
#![allow(unused)]
fn main() {
use tokio::task;
async fn example() {
let data = String::from("hello");
// ✅ Works: move ownership into the task
let handle = task::spawn(async move {
println!("{data}");
data.len()
});
let len = handle.await.unwrap();
println!("Length: {len}");
}
async fn problem() {
let data = String::from("hello");
// ❌ FAILS: data is borrowed, not 'static
// task::spawn(async {
// println!("{data}"); // borrows `data` — not 'static
// });
// ❌ FAILS: Rc is not Send
// let rc = std::rc::Rc::new(42);
// task::spawn(async move {
// println!("{rc}"); // Rc is !Send — can't cross thread boundary
// });
}
}
Why 'static? The spawned task runs independently — it might outlive the scope that created it. The compiler can’t prove the references will remain valid, so it requires owned data.
Why Send? The task might be resumed on a different thread than where it was suspended. All data held across .await points must be safe to send between threads.
#![allow(unused)]
fn main() {
// Common pattern: clone shared data into the task
let shared = Arc::new(config);
for i in 0..10 {
let shared = Arc::clone(&shared); // Clone the Arc, not the data
tokio::spawn(async move {
process_item(i, &shared).await;
});
}
}
JoinHandle and Task Cancellation
#![allow(unused)]
fn main() {
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
async fn cancellation_example() {
let handle: JoinHandle<String> = tokio::spawn(async {
sleep(Duration::from_secs(10)).await;
"completed".to_string()
});
// Cancel the task by dropping the handle? NO — task keeps running!
// drop(handle); // Task continues in the background
// To actually cancel, call abort():
handle.abort();
// Awaiting an aborted task returns JoinError
match handle.await {
Ok(val) => println!("Got: {val}"),
Err(e) if e.is_cancelled() => println!("Task was cancelled"),
Err(e) => println!("Task panicked: {e}"),
}
}
}
Important: Dropping a
JoinHandledoes NOT cancel the task in tokio. The task becomes detached and keeps running. You must explicitly call.abort()to cancel it. This is different from dropping aFuturedirectly, which does cancel/drop the underlying computation.
Tokio Sync Primitives
Tokio provides async-aware synchronization primitives. The key principle: don’t use std::sync::Mutex across .await points.
#![allow(unused)]
fn main() {
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc, oneshot, broadcast, watch};
// --- Mutex ---
// Async mutex: the lock() method is async and won't block the thread
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
{
let mut guard = data.lock().await; // Non-blocking lock
guard.push(4);
} // Guard dropped here — lock released
// --- Channels ---
// mpsc: Multiple producer, single consumer
let (tx, mut rx) = mpsc::channel::<String>(100); // Bounded buffer
tokio::spawn(async move {
tx.send("hello".into()).await.unwrap();
});
let msg = rx.recv().await.unwrap();
// oneshot: Single value, single consumer
let (tx, rx) = oneshot::channel::<i32>();
tx.send(42).unwrap(); // No await needed — either sends or fails
let val = rx.await.unwrap();
// broadcast: Multiple producers, multiple consumers (all get every message)
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
// watch: Single value, multiple consumers (only latest value)
let (tx, rx) = watch::channel(0u64);
tx.send(42).unwrap();
println!("Latest: {}", *rx.borrow());
}
Note:
.unwrap()is used for brevity throughout these channel examples. In production, handle send/receive errors gracefully — a failed.send()means the receiver was dropped, and a failed.recv()means the channel is closed.
graph LR
subgraph "Channel Types"
direction TB
MPSC["mpsc<br/>N→1<br/>Buffered queue"]
ONESHOT["oneshot<br/>1→1<br/>Single value"]
BROADCAST["broadcast<br/>N→N<br/>All receivers get all"]
WATCH["watch<br/>1→N<br/>Latest value only"]
end
P1["Producer 1"] --> MPSC
P2["Producer 2"] --> MPSC
MPSC --> C1["Consumer"]
P3["Producer"] --> ONESHOT
ONESHOT --> C2["Consumer"]
P4["Producer"] --> BROADCAST
BROADCAST --> C3["Consumer 1"]
BROADCAST --> C4["Consumer 2"]
P5["Producer"] --> WATCH
WATCH --> C5["Consumer 1"]
WATCH --> C6["Consumer 2"]
Case Study: Choosing the Right Channel for a Notification Service
You’re building a notification service where:
- Multiple API handlers produce events
- A single background task batches and sends them
- A config watcher updates rate limits at runtime
- A shutdown signal must reach all components
Which channels for each?
| Requirement | Channel | Why |
|---|---|---|
| API handlers → Batcher | mpsc (bounded) | N producers, 1 consumer. Bounded for backpressure — if the batcher falls behind, API handlers slow down instead of OOM |
| Config watcher → Rate limiter | watch | Only the latest config matters. Multiple readers (each worker) see the current value |
| Shutdown signal → All components | broadcast | Every component must receive the shutdown notification independently |
| Single health-check response | oneshot | Request/response pattern — one value, then done |
graph LR
subgraph "Notification Service"
direction TB
API1["API Handler 1"] -->|mpsc| BATCH["Batcher"]
API2["API Handler 2"] -->|mpsc| BATCH
CONFIG["Config Watcher"] -->|watch| RATE["Rate Limiter"]
CTRL["Ctrl+C"] -->|broadcast| API1
CTRL -->|broadcast| BATCH
CTRL -->|broadcast| RATE
end
style API1 fill:#d4efdf,stroke:#27ae60,color:#000
style API2 fill:#d4efdf,stroke:#27ae60,color:#000
style BATCH fill:#e8f4f8,stroke:#2980b9,color:#000
style CONFIG fill:#fef9e7,stroke:#f39c12,color:#000
style RATE fill:#fef9e7,stroke:#f39c12,color:#000
style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
🏋️ Exercise: Build a Task Pool (click to expand)
Challenge: Build a function run_with_limit that accepts a list of async closures and a concurrency limit, executing at most N tasks simultaneously. Use tokio::sync::Semaphore.
🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn run_with_limit<F, Fut, T>(tasks: Vec<F>, limit: usize) -> Vec<T>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let semaphore = Arc::new(Semaphore::new(limit));
let mut handles = Vec::new();
for task in tasks {
let permit = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
// Permit is held while task runs, then dropped
task().await
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
// Usage:
// let tasks: Vec<_> = urls.into_iter().map(|url| {
// move || async move { fetch(url).await }
// }).collect();
// let results = run_with_limit(tasks, 10).await; // Max 10 concurrent
}
Key takeaway: Semaphore is the standard way to limit concurrency in tokio. Each task acquires a permit before starting work. When the semaphore is full, new tasks wait asynchronously (non-blocking) until a slot opens.
Key Takeaways — Tokio Deep Dive
- Use
multi_threadfor servers (default);current_threadfor CLI tools, tests, or!Sendtypestokio::spawnrequires'staticfutures — useArcor channels to share data- Dropping a
JoinHandledoes not cancel the task — call.abort()explicitly- Choose sync primitives by need:
Mutexfor shared state,Semaphorefor concurrency limits,mpsc/oneshot/broadcast/watchfor communication
See also: Ch 9 — When Tokio Isn’t the Right Fit for alternatives to spawn, Ch 12 — Common Pitfalls for MutexGuard-across-await bugs
9. When Tokio Isn’t the Right Fit 🟡
What you’ll learn:
- The
'staticproblem: whentokio::spawnforces you intoArceverywhereLocalSetfor!SendfuturesFuturesUnorderedfor borrow-friendly concurrency (no spawn needed)JoinSetfor managed task groups- Writing runtime-agnostic libraries
graph TD
START["Need concurrent futures?"] --> STATIC{"Can futures be 'static?"}
STATIC -->|Yes| SEND{"Are futures Send?"}
STATIC -->|No| FU["FuturesUnordered<br/>Runs on current task"]
SEND -->|Yes| SPAWN["tokio::spawn<br/>Multi-threaded"]
SEND -->|No| LOCAL["LocalSet<br/>Single-threaded"]
SPAWN --> MANAGE{"Need to track/abort tasks?"}
MANAGE -->|Yes| JOINSET["JoinSet / TaskTracker"]
MANAGE -->|No| HANDLE["JoinHandle"]
style START fill:#f5f5f5,stroke:#333,color:#000
style FU fill:#d4efdf,stroke:#27ae60,color:#000
style SPAWN fill:#e8f4f8,stroke:#2980b9,color:#000
style LOCAL fill:#fef9e7,stroke:#f39c12,color:#000
style JOINSET fill:#e8daef,stroke:#8e44ad,color:#000
style HANDLE fill:#e8f4f8,stroke:#2980b9,color:#000
The ’static Future Problem
Tokio’s spawn requires 'static futures. This means you can’t borrow local data in spawned tasks:
#![allow(unused)]
fn main() {
async fn process_items(items: &[String]) {
// ❌ Can't do this — items is borrowed, not 'static
// for item in items {
// tokio::spawn(async {
// process(item).await;
// });
// }
// 😐 Workaround 1: Clone everything
for item in items {
let item = item.clone();
tokio::spawn(async move {
process(&item).await;
});
}
// 😐 Workaround 2: Use Arc
let items = Arc::new(items.to_vec());
for i in 0..items.len() {
let items = Arc::clone(&items);
tokio::spawn(async move {
process(&items[i]).await;
});
}
}
}
This is annoying! In Go, you can just go func() { use(item) } with a closure. In Rust, the ownership system forces you to think about who owns what and how long it lives.
Scoped Tasks and Alternatives
Several solutions exist for the 'static problem:
#![allow(unused)]
fn main() {
// 1. tokio::task::LocalSet — run !Send futures on current thread
use tokio::task::LocalSet;
let local_set = LocalSet::new();
local_set.run_until(async {
tokio::task::spawn_local(async {
// Can use Rc, Cell, and other !Send types here
let rc = std::rc::Rc::new(42);
println!("{rc}");
}).await.unwrap();
}).await;
// 2. FuturesUnordered — concurrent without spawning
use futures::stream::{FuturesUnordered, StreamExt};
async fn process_items(items: &[String]) {
let futures: FuturesUnordered<_> = items
.iter()
.map(|item| async move {
// ✅ Can borrow item — no spawn, no 'static needed!
process(item).await
})
.collect();
// Drive all futures to completion
futures.for_each(|result| async {
println!("Result: {result:?}");
}).await;
}
// 3. tokio JoinSet (tokio 1.21+) — managed set of spawned tasks
use tokio::task::JoinSet;
async fn with_joinset() {
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
i * 2
});
}
while let Some(result) = set.join_next().await {
println!("Task completed: {:?}", result.unwrap());
}
}
}
Lightweight Runtimes for Libraries
If you’re writing a library — don’t force users into tokio:
#![allow(unused)]
fn main() {
// ❌ BAD: Library forces tokio on users
pub async fn my_lib_function() {
tokio::time::sleep(Duration::from_secs(1)).await;
// Now your users MUST use tokio
}
// ✅ GOOD: Library is runtime-agnostic
pub async fn my_lib_function() {
// Use only types from std::future and futures crate
do_computation().await;
}
// ✅ GOOD: Accept a generic future for I/O operations
pub async fn fetch_with_retry<F, Fut, T, E>(
operation: F,
max_retries: usize,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
for attempt in 0..max_retries {
match operation().await {
Ok(val) => return Ok(val),
Err(e) if attempt == max_retries - 1 => return Err(e),
Err(_) => continue,
}
}
unreachable!()
}
}
Rule of thumb: Libraries should depend on
futurescrate, nottokio. Applications should depend ontokio(or their chosen runtime). This keeps the ecosystem composable.
🏋️ Exercise: FuturesUnordered vs Spawn (click to expand)
Challenge: Write the same function two ways — once using tokio::spawn (requires 'static) and once using FuturesUnordered (borrows data). The function receives &[String] and returns the length of each string after a simulated async lookup.
Compare: Which approach requires .clone()? Which can borrow the input slice?
🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, Duration};
// Version 1: tokio::spawn — requires 'static, must clone
async fn lengths_with_spawn(items: &[String]) -> Vec<usize> {
let mut handles = Vec::new();
for item in items {
let owned = item.clone(); // Must clone — spawn requires 'static
handles.push(tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
owned.len()
}));
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}
// Version 2: FuturesUnordered — borrows data, no clone needed
async fn lengths_without_spawn(items: &[String]) -> Vec<usize> {
let futures: FuturesUnordered<_> = items
.iter()
.map(|item| async move {
sleep(Duration::from_millis(10)).await;
item.len() // ✅ Borrows item — no clone!
})
.collect();
futures.collect().await
}
#[tokio::test]
async fn test_both_versions() {
let items = vec!["hello".into(), "world".into(), "rust".into()];
let v1 = lengths_with_spawn(&items).await;
// Note: v1 preserves insertion order (sequential join)
let mut v2 = lengths_without_spawn(&items).await;
v2.sort(); // FuturesUnordered returns in completion order
assert_eq!(v1, vec![5, 5, 4]);
assert_eq!(v2, vec![4, 5, 5]);
}
}
Key takeaway: FuturesUnordered avoids the 'static requirement by running all futures on the current task (no thread migration). The trade-off: all futures share one task — if one blocks, the others stall. Use spawn for CPU-heavy work that should run on separate threads.
Key Takeaways — When Tokio Isn’t the Right Fit
FuturesUnorderedruns futures concurrently on the current task — no'staticrequirementLocalSetenables!Sendfutures on a single-threaded executorJoinSet(tokio 1.21+) provides managed task groups with automatic cleanup- For libraries: depend only on
std::future::Future+futurescrate, not tokio directly
See also: Ch 8 — Tokio Deep Dive for when spawn is the right tool, Ch 11 — Streams for
buffer_unordered()as another concurrency limiter
10. Async Traits 🟡
What you’ll learn:
- Why async methods in traits took years to stabilize
- RPITIT: native async trait methods (Rust 1.75+)
- The dyn dispatch challenge and
trait_variantworkaround- Async closures (Rust 1.85+):
async Fn()andasync FnOnce()
graph TD
subgraph "Async Trait Approaches"
direction TB
RPITIT["RPITIT (Rust 1.75+)<br/>async fn in trait<br/>Static dispatch only"]
VARIANT["trait_variant<br/>Auto-generates Send variant<br/>Enables dyn dispatch"]
BOXED["Box<dyn Future><br/>Manual boxing<br/>Works everywhere"]
CLOSURE["Async Closures (1.85+)<br/>async Fn() / async FnOnce()<br/>Callbacks & middleware"]
end
RPITIT -->|"Need dyn?"| VARIANT
RPITIT -->|"Pre-1.75?"| BOXED
CLOSURE -->|"Replaces"| BOXED
style RPITIT fill:#d4efdf,stroke:#27ae60,color:#000
style VARIANT fill:#e8f4f8,stroke:#2980b9,color:#000
style BOXED fill:#fef9e7,stroke:#f39c12,color:#000
style CLOSURE fill:#e8daef,stroke:#8e44ad,color:#000
The History: Why It Took So Long
Async methods in traits were Rust’s most requested feature for years. The problem:
#![allow(unused)]
fn main() {
// This didn't compile until Rust 1.75 (Dec 2023):
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
}
// Why? Because async fn returns `impl Future<Output = T>`,
// and `impl Trait` in trait return position wasn't supported.
}
The fundamental challenge: when a trait method returns impl Future, each implementor returns a different concrete type. The compiler needs to know the size of the return type, but trait methods are dynamically dispatched.
RPITIT: Return Position Impl Trait in Trait
Since Rust 1.75, this just works for static dispatch:
#![allow(unused)]
fn main() {
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
// Desugars to:
// fn get(&self, key: &str) -> impl Future<Output = Option<String>>;
}
struct InMemoryStore {
data: std::collections::HashMap<String, String>,
}
impl DataStore for InMemoryStore {
async fn get(&self, key: &str) -> Option<String> {
self.data.get(key).cloned()
}
}
// ✅ Works with generics (static dispatch):
async fn lookup<S: DataStore>(store: &S, key: &str) {
if let Some(val) = store.get(key).await {
println!("{key} = {val}");
}
}
}
dyn Dispatch and Send Bounds
The limitation: you can’t use dyn DataStore directly because the compiler doesn’t know the size of the returned future:
#![allow(unused)]
fn main() {
// ❌ Doesn't work:
// async fn lookup_dyn(store: &dyn DataStore, key: &str) { ... }
// Error: the trait `DataStore` is not dyn-compatible because method `get`
// is `async`
// ✅ Workaround: Return a boxed future
trait DynDataStore {
fn get(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send + '_>>;
}
// Or use the trait_variant macro (see below)
}
The Send problem: In multi-threaded runtimes, spawned tasks must be Send. But async trait methods don’t automatically add Send bounds:
#![allow(unused)]
fn main() {
trait Worker {
async fn run(&self); // Future might or might not be Send
}
struct MyWorker;
impl Worker for MyWorker {
async fn run(&self) {
// If this uses !Send types, the future is !Send
let rc = std::rc::Rc::new(42);
some_work().await;
println!("{rc}");
}
}
// ❌ This fails if the future isn't Send:
// tokio::spawn(worker.run()); // Requires Send + 'static
}
The trait_variant Crate
The trait_variant crate (from the Rust async working group) generates a Send variant automatically:
#![allow(unused)]
fn main() {
// Cargo.toml: trait-variant = "0.1"
#[trait_variant::make(SendDataStore: Send)]
trait DataStore {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// Now you have two traits:
// - DataStore: no Send bound on the futures
// - SendDataStore: all futures are Send
// Both have the same methods, implementors implement DataStore
// and get SendDataStore for free if their futures are Send.
// Use SendDataStore when you need to spawn:
async fn spawn_lookup(store: Arc<dyn SendDataStore>) {
tokio::spawn(async move {
store.get("key").await;
});
}
}
Quick Reference: Async Traits
| Approach | Static Dispatch | Dynamic Dispatch | Send | Syntax Overhead |
|---|---|---|---|---|
Native async fn in trait | ✅ | ❌ | Implicit | None |
trait_variant | ✅ | ✅ | Explicit | #[trait_variant::make] |
Manual Box::pin | ✅ | ✅ | Explicit | High |
async-trait crate | ✅ | ✅ | #[async_trait] | Medium (proc macro) |
Recommendation: For new code (Rust 1.75+), use native async traits with
trait_variantwhen you needdyndispatch. Theasync-traitcrate is still widely used but boxes every future — the native approach is zero-cost for static dispatch.
Async Closures (Rust 1.85+)
Since Rust 1.85, async closures are stable — closures that capture their environment and return a future:
#![allow(unused)]
fn main() {
// Before 1.85: awkward workaround
let urls = vec!["https://a.com", "https://b.com"];
let fetchers: Vec<_> = urls.iter().map(|url| {
let url = url.to_string();
// Returns a non-async closure that returns an async block
move || async move { reqwest::get(&url).await }
}).collect();
// After 1.85: async closures just work
let fetchers: Vec<_> = urls.iter().map(|url| {
async move || { reqwest::get(url).await }
// ↑ This is an async closure — captures url, returns a Future
}).collect();
}
Async closures implement the new AsyncFn, AsyncFnMut, and AsyncFnOnce traits, which mirror Fn, FnMut, FnOnce:
#![allow(unused)]
fn main() {
// Generic function accepting an async closure
async fn retry<F>(max: usize, f: F) -> Result<String, Error>
where
F: AsyncFn() -> Result<String, Error>,
{
for _ in 0..max {
if let Ok(val) = f().await {
return Ok(val);
}
}
f().await
}
}
Migration tip: If you have code using
Fn() -> impl Future<Output = T>, consider switching toAsyncFn() -> Tfor cleaner signatures.
🏋️ Exercise: Design an Async Service Trait (click to expand)
Challenge: Design a Cache trait with async get and set methods. Implement it twice: once with a HashMap (in-memory) and once with a simulated Redis backend (use tokio::time::sleep to simulate network latency). Write a generic function that works with both.
🔑 Solution
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
trait Cache {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// --- In-memory implementation ---
struct MemoryCache {
store: Mutex<HashMap<String, String>>,
}
impl MemoryCache {
fn new() -> Self {
MemoryCache {
store: Mutex::new(HashMap::new()),
}
}
}
impl Cache for MemoryCache {
async fn get(&self, key: &str) -> Option<String> {
self.store.lock().await.get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
self.store.lock().await.insert(key.to_string(), value);
}
}
// --- Simulated Redis implementation ---
struct RedisCache {
store: Mutex<HashMap<String, String>>,
latency: Duration,
}
impl RedisCache {
fn new(latency_ms: u64) -> Self {
RedisCache {
store: Mutex::new(HashMap::new()),
latency: Duration::from_millis(latency_ms),
}
}
}
impl Cache for RedisCache {
async fn get(&self, key: &str) -> Option<String> {
sleep(self.latency).await; // Simulate network round-trip
self.store.lock().await.get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
sleep(self.latency).await;
self.store.lock().await.insert(key.to_string(), value);
}
}
// --- Generic function working with any Cache ---
async fn cache_demo<C: Cache>(cache: &C, label: &str) {
cache.set("greeting", "Hello, async!".into()).await;
let val = cache.get("greeting").await;
println!("[{label}] greeting = {val:?}");
}
#[tokio::main]
async fn main() {
let mem = MemoryCache::new();
cache_demo(&mem, "memory").await;
let redis = RedisCache::new(50);
cache_demo(&redis, "redis").await;
}
Key takeaway: The same generic function works with both implementations through static dispatch. No boxing, no allocation overhead. For dynamic dispatch, add trait_variant::make(SendCache: Send).
Key Takeaways — Async Traits
- Since Rust 1.75, you can write
async fndirectly in traits (no#[async_trait]crate needed)trait_variant::makeauto-generates aSendvariant for dynamic dispatch- Async closures (
async Fn()) stabilized in 1.85 — use for callbacks and middleware- Prefer static dispatch (
<S: Service>) overdynfor performance-critical code
See also: Ch 13 — Production Patterns for Tower’s
Servicetrait, Ch 6 — Building Futures by Hand for manual trait implementations
11. Streams and AsyncIterator 🟡
What you’ll learn:
- The
Streamtrait: async iteration over multiple values- Creating streams:
stream::iter,async_stream,unfold- Stream combinators:
map,filter,buffer_unordered,fold- Async I/O traits:
AsyncRead,AsyncWrite,AsyncBufRead
Stream Trait Overview
A Stream is to Iterator what Future is to a single value — it yields multiple values asynchronously:
#![allow(unused)]
fn main() {
// std::iter::Iterator (synchronous, multiple values)
trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
}
// futures::Stream (async, multiple values)
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
}
graph LR
subgraph "Sync"
VAL["Value<br/>(T)"]
ITER["Iterator<br/>(multiple T)"]
end
subgraph "Async"
FUT["Future<br/>(async T)"]
STREAM["Stream<br/>(async multiple T)"]
end
VAL -->|"make async"| FUT
ITER -->|"make async"| STREAM
VAL -->|"make multiple"| ITER
FUT -->|"make multiple"| STREAM
style VAL fill:#e3f2fd,color:#000
style ITER fill:#e3f2fd,color:#000
style FUT fill:#c8e6c9,color:#000
style STREAM fill:#c8e6c9,color:#000
Creating Streams
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
// 1. From an iterator
let s = stream::iter(vec![1, 2, 3]);
// 2. From an async generator (using async_stream crate)
// Cargo.toml: async-stream = "0.3"
use async_stream::stream;
fn countdown(from: u32) -> impl futures::Stream<Item = u32> {
stream! {
for i in (0..=from).rev() {
tokio::time::sleep(Duration::from_millis(500)).await;
yield i;
}
}
}
// 3. From a tokio interval
let tick_stream = IntervalStream::new(interval(Duration::from_secs(1)));
// 4. From a channel receiver (tokio_stream::wrappers)
let (tx, rx) = tokio::sync::mpsc::channel::<String>(100);
let rx_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
// 5. From unfold (generate from async state)
let s = stream::unfold(0u32, |state| async move {
if state >= 5 {
None // Stream ends
} else {
let next = state + 1;
Some((state, next)) // yield `state`, new state is `next`
}
});
}
Consuming Streams
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
async fn stream_examples() {
let s = stream::iter(vec![1, 2, 3, 4, 5]);
// for_each — process each item
s.for_each(|x| async move {
println!("{x}");
}).await;
// map + collect
let doubled: Vec<i32> = stream::iter(vec![1, 2, 3])
.map(|x| x * 2)
.collect()
.await;
// filter
let evens: Vec<i32> = stream::iter(1..=10)
.filter(|x| futures::future::ready(x % 2 == 0))
.collect()
.await;
// buffer_unordered — process N items concurrently
let results: Vec<_> = stream::iter(vec!["url1", "url2", "url3"])
.map(|url| async move {
// Simulate HTTP fetch
tokio::time::sleep(Duration::from_millis(100)).await;
format!("response from {url}")
})
.buffer_unordered(10) // Up to 10 concurrent fetches
.collect()
.await;
// take, skip, zip, chain — just like Iterator
let first_three: Vec<i32> = stream::iter(1..=100)
.take(3)
.collect()
.await;
}
}
Comparison with C# IAsyncEnumerable
| Feature | Rust Stream | C# IAsyncEnumerable<T> |
|---|---|---|
| Syntax | stream! { yield x; } | await foreach / yield return |
| Cancellation | Drop the stream | CancellationToken |
| Backpressure | Consumer controls poll rate | Consumer controls MoveNextAsync |
| Built-in | No (needs futures crate) | Yes (since C# 8.0) |
| Combinators | .map(), .filter(), .buffer_unordered() | LINQ + System.Linq.Async |
| Error handling | Stream<Item = Result<T, E>> | Throw in async iterator |
#![allow(unused)]
fn main() {
// Rust: Stream of database rows
// NOTE: try_stream! (not stream!) is required when using ? inside the body.
// stream! doesn't propagate errors — try_stream! yields Err(e) and ends.
fn get_users(db: &Database) -> impl Stream<Item = Result<User, DbError>> + '_ {
try_stream! {
let mut cursor = db.query("SELECT * FROM users").await?;
while let Some(row) = cursor.next().await {
yield User::from_row(row?);
}
}
}
// Consume:
let mut users = pin!(get_users(&db));
while let Some(result) = users.next().await {
match result {
Ok(user) => println!("{}", user.name),
Err(e) => eprintln!("Error: {e}"),
}
}
}
// C# equivalent:
async IAsyncEnumerable<User> GetUsers() {
await using var reader = await db.QueryAsync("SELECT * FROM users");
while (await reader.ReadAsync()) {
yield return User.FromRow(reader);
}
}
// Consume:
await foreach (var user in GetUsers()) {
Console.WriteLine(user.Name);
}
🏋️ Exercise: Build an Async Stats Aggregator (click to expand)
Challenge: Given a stream of sensor readings Stream<Item = f64>, write an async function that consumes the stream and returns (count, min, max, average). Use StreamExt combinators — don’t just collect into a Vec.
Hint: Use .fold() to accumulate state across the stream.
🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Stats {
count: usize,
min: f64,
max: f64,
sum: f64,
}
impl Stats {
fn average(&self) -> f64 {
if self.count == 0 { 0.0 } else { self.sum / self.count as f64 }
}
}
async fn compute_stats<S: futures::Stream<Item = f64> + Unpin>(stream: S) -> Stats {
stream
.fold(
Stats { count: 0, min: f64::INFINITY, max: f64::NEG_INFINITY, sum: 0.0 },
|mut acc, value| async move {
acc.count += 1;
acc.min = acc.min.min(value);
acc.max = acc.max.max(value);
acc.sum += value;
acc
},
)
.await
}
#[tokio::test]
async fn test_stats() {
let readings = stream::iter(vec![23.5, 24.1, 22.8, 25.0, 23.9]);
let stats = compute_stats(readings).await;
assert_eq!(stats.count, 5);
assert!((stats.min - 22.8).abs() < f64::EPSILON);
assert!((stats.max - 25.0).abs() < f64::EPSILON);
assert!((stats.average() - 23.86).abs() < 0.01);
}
}
Key takeaway: Stream combinators like .fold() process items one-at-a-time without collecting into memory — essential for processing large or unbounded data streams.
Async I/O Traits: AsyncRead, AsyncWrite, AsyncBufRead
Just as std::io::Read/Write are the foundation of synchronous I/O, their async counterparts are the foundation of async I/O. These traits are provided by tokio::io (or futures::io for runtime-agnostic code):
#![allow(unused)]
fn main() {
// tokio::io — the async versions of std::io traits
/// Read bytes from a source asynchronously
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, // Tokio's safe wrapper around uninitialized memory
) -> Poll<io::Result<()>>;
}
/// Write bytes to a sink asynchronously
pub trait AsyncWrite {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
/// Buffered reading with line support
pub trait AsyncBufRead: AsyncRead {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
fn consume(self: Pin<&mut Self>, amt: usize);
}
}
In practice, you rarely call these poll_* methods directly. Instead, use the extension traits AsyncReadExt and AsyncWriteExt which provide .await-friendly helper methods:
#![allow(unused)]
fn main() {
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt};
use tokio::net::TcpStream;
use tokio::io::BufReader;
async fn io_examples() -> tokio::io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// AsyncWriteExt: write_all, write_u32, write_buf, etc.
stream.write_all(b"GET / HTTP/1.0\r\n\r\n").await?;
// AsyncReadExt: read, read_exact, read_to_end, read_to_string
let mut response = Vec::new();
stream.read_to_end(&mut response).await?;
// AsyncBufReadExt: read_line, lines(), split()
let file = tokio::fs::File::open("config.txt").await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
println!("{line}");
}
Ok(())
}
}
Implementing custom async I/O — wrap a protocol over raw TCP:
#![allow(unused)]
fn main() {
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
/// A length-prefixed protocol: [u32 length][payload bytes]
struct FramedStream<T> {
inner: T,
}
impl<T: AsyncRead + AsyncReadExt + Unpin> FramedStream<T> {
/// Read one complete frame
async fn read_frame(&mut self) -> tokio::io::Result<Vec<u8>>
{
// Read the 4-byte length prefix
let len = self.inner.read_u32().await? as usize;
// Read exactly that many bytes
let mut payload = vec![0u8; len];
self.inner.read_exact(&mut payload).await?;
Ok(payload)
}
}
impl<T: AsyncWrite + AsyncWriteExt + Unpin> FramedStream<T> {
/// Write one complete frame
async fn write_frame(&mut self, data: &[u8]) -> tokio::io::Result<()>
{
self.inner.write_u32(data.len() as u32).await?;
self.inner.write_all(data).await?;
self.inner.flush().await?;
Ok(())
}
}
}
| Sync Trait | Async Trait (tokio) | Async Trait (futures) | Extension Trait |
|---|---|---|---|
std::io::Read | tokio::io::AsyncRead | futures::io::AsyncRead | AsyncReadExt |
std::io::Write | tokio::io::AsyncWrite | futures::io::AsyncWrite | AsyncWriteExt |
std::io::BufRead | tokio::io::AsyncBufRead | futures::io::AsyncBufRead | AsyncBufReadExt |
std::io::Seek | tokio::io::AsyncSeek | futures::io::AsyncSeek | AsyncSeekExt |
tokio vs futures I/O traits: They’re similar but not identical — tokio’s
AsyncReadusesReadBuf(handles uninitialized memory safely), whilefutures::AsyncReaduses&mut [u8]. Usetokio_util::compatto convert between them.
Copy utilities:
tokio::io::copy(&mut reader, &mut writer)is the async equivalent ofstd::io::copy— useful for proxy servers or file transfers.tokio::io::copy_bidirectionalcopies both directions concurrently.
🏋️ Exercise: Build an Async Line Counter (click to expand)
Challenge: Write an async function that takes any AsyncBufRead source and returns the number of non-empty lines. It should work with files, TCP streams, or any buffered reader.
Hint: Use AsyncBufReadExt::lines() and count lines where !line.is_empty().
🔑 Solution
#![allow(unused)]
fn main() {
use tokio::io::AsyncBufReadExt;
async fn count_non_empty_lines<R: tokio::io::AsyncBufRead + Unpin>(
reader: R,
) -> tokio::io::Result<usize> {
let mut lines = reader.lines();
let mut count = 0;
while let Some(line) = lines.next_line().await? {
if !line.is_empty() {
count += 1;
}
}
Ok(count)
}
// Works with any AsyncBufRead:
// let file = tokio::io::BufReader::new(tokio::fs::File::open("data.txt").await?);
// let count = count_non_empty_lines(file).await?;
//
// let tcp = tokio::io::BufReader::new(TcpStream::connect("...").await?);
// let count = count_non_empty_lines(tcp).await?;
}
Key takeaway: By programming against AsyncBufRead instead of a concrete type, your I/O code is reusable across files, sockets, pipes, and even in-memory buffers (tokio::io::BufReader::new(std::io::Cursor::new(data))).
Key Takeaways — Streams and AsyncIterator
Streamis the async equivalent ofIterator— yieldsPoll::Ready(Some(item))orPoll::Ready(None).buffer_unordered(N)processes N stream items concurrently — the key concurrency tool for streamsasync_stream::stream!is the easiest way to create custom streams (usesyield)AsyncRead/AsyncBufReadenable generic, reusable I/O code across files, sockets, and pipes
See also: Ch 9 — When Tokio Isn’t the Right Fit for
FuturesUnordered(related pattern), Ch 13 — Production Patterns for backpressure with bounded channels
12. Common Pitfalls 🔴
What you’ll learn:
- 9 common async Rust bugs and how to fix each one
- Why blocking the executor is the #1 mistake (and how
spawn_blockingfixes it)- Cancellation hazards: what happens when a future is dropped mid-await
- Debugging:
tokio-console,tracing,#[instrument]- Testing:
#[tokio::test],time::pause(), trait-based mocking
Blocking the Executor
The #1 mistake in async Rust: running blocking code on the async executor thread. This starves other tasks.
#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the entire executor thread
async fn bad_handler() -> String {
let data = std::fs::read_to_string("big_file.txt").unwrap(); // BLOCKS!
process(&data)
}
// ✅ CORRECT: Offload blocking work to a dedicated thread pool
async fn good_handler() -> String {
let data = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("big_file.txt").unwrap()
}).await.unwrap();
process(&data)
}
// ✅ ALSO CORRECT: Use tokio's async fs
async fn also_good_handler() -> String {
let data = tokio::fs::read_to_string("big_file.txt").await.unwrap();
process(&data)
}
}
graph TB
subgraph "❌ Blocking Call on Executor"
T1_BAD["Thread 1: std::fs::read()<br/>🔴 BLOCKED for 500ms"]
T2_BAD["Thread 2: handling requests<br/>🟢 Working alone"]
TASKS_BAD["100 pending tasks<br/>⏳ Starved"]
T1_BAD -->|"can't poll"| TASKS_BAD
end
subgraph "✅ spawn_blocking"
T1_GOOD["Thread 1: polling futures<br/>🟢 Available"]
T2_GOOD["Thread 2: polling futures<br/>🟢 Available"]
BT["Blocking pool thread:<br/>std::fs::read()<br/>🔵 Separate pool"]
TASKS_GOOD["100 tasks<br/>✅ All making progress"]
T1_GOOD -->|"polls"| TASKS_GOOD
T2_GOOD -->|"polls"| TASKS_GOOD
end
std::thread::sleep vs tokio::time::sleep
#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the executor thread for 5 seconds
async fn bad_delay() {
std::thread::sleep(Duration::from_secs(5)); // Thread can't poll anything else!
}
// ✅ CORRECT: Yields to the executor, other tasks can run
async fn good_delay() {
tokio::time::sleep(Duration::from_secs(5)).await; // Non-blocking!
}
}
Holding MutexGuard Across .await
#![allow(unused)]
fn main() {
use std::sync::Mutex; // std Mutex — NOT async-aware
// ❌ WRONG: MutexGuard held across .await
async fn bad_mutex(data: &Mutex<Vec<String>>) {
let mut guard = data.lock().unwrap();
guard.push("item".into());
some_io().await; // 💥 Guard is held here — blocks other threads from locking!
guard.push("another".into());
}
// Also: std::sync::MutexGuard is !Send, so this won't compile
// with tokio's multi-threaded runtime.
// ✅ FIX 1: Scope the guard to drop before .await
async fn good_mutex_scoped(data: &Mutex<Vec<String>>) {
{
let mut guard = data.lock().unwrap();
guard.push("item".into());
} // Guard dropped here
some_io().await; // Safe — lock is released
{
let mut guard = data.lock().unwrap();
guard.push("another".into());
}
}
// ✅ FIX 2: Use tokio::sync::Mutex (async-aware)
use tokio::sync::Mutex as AsyncMutex;
async fn good_async_mutex(data: &AsyncMutex<Vec<String>>) {
let mut guard = data.lock().await; // Async lock — doesn't block the thread
guard.push("item".into());
some_io().await; // OK — tokio Mutex guard is Send
guard.push("another".into());
}
}
When to use which Mutex:
std::sync::Mutex: Short critical sections with no.awaitinsidetokio::sync::Mutex: When you must hold the lock across.awaitpointsparking_lot::Mutex: Drop-instdreplacement, faster, smaller, still no.await
Cancellation Hazards
Dropping a future cancels it — but this can leave things in an inconsistent state:
#![allow(unused)]
fn main() {
// ❌ DANGEROUS: Resource leak on cancellation
async fn transfer(from: &Account, to: &Account, amount: u64) {
from.debit(amount).await; // If cancelled HERE...
to.credit(amount).await; // ...money vanishes!
}
// ✅ SAFE: Make operations atomic or use compensation
async fn safe_transfer(from: &Account, to: &Account, amount: u64) -> Result<(), Error> {
// Use a database transaction (all-or-nothing)
let tx = db.begin_transaction().await?;
tx.debit(from, amount).await?;
tx.credit(to, amount).await?;
tx.commit().await?; // Only commits if everything succeeded
Ok(())
}
// ✅ ALSO SAFE: Use tokio::select! with cancellation awareness
tokio::select! {
result = transfer(from, to, amount) => {
// Transfer completed
}
_ = shutdown_signal() => {
// Don't cancel mid-transfer — let it finish
// Or: roll back explicitly
}
}
}
No Async Drop
Rust’s Drop trait is synchronous — you cannot .await inside drop(). This is a frequent source of confusion:
#![allow(unused)]
fn main() {
struct DbConnection { /* ... */ }
impl Drop for DbConnection {
fn drop(&mut self) {
// ❌ Can't do this — drop() is sync!
// self.connection.shutdown().await;
// ✅ Workaround 1: Spawn a cleanup task (fire-and-forget)
let conn = self.connection.take();
tokio::spawn(async move {
let _ = conn.shutdown().await;
});
// ✅ Workaround 2: Use a synchronous close
// self.connection.blocking_close();
}
}
}
Best practice: Provide an explicit async fn close(self) method and document that callers should use it. Rely on Drop only as a safety net, not the primary cleanup path.
select! Fairness and Starvation
#![allow(unused)]
fn main() {
use tokio::sync::mpsc;
// ❌ UNFAIR: busy_stream always wins, slow_stream starves
async fn unfair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
loop {
tokio::select! {
Some(v) = fast.recv() => println!("fast: {v}"),
Some(v) = slow.recv() => println!("slow: {v}"),
// If both are ready, tokio randomly picks one.
// But if `fast` is ALWAYS ready, `slow` rarely gets polled.
}
}
}
// ✅ FAIR: Use biased select or drain in batches
async fn fair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
loop {
tokio::select! {
biased; // Always check in order — explicit priority
Some(v) = slow.recv() => println!("slow: {v}"), // Priority!
Some(v) = fast.recv() => println!("fast: {v}"),
}
}
}
}
Accidental Sequential Execution
#![allow(unused)]
fn main() {
// ❌ SEQUENTIAL: Takes 2 seconds total
async fn slow() {
let a = fetch("url_a").await; // 1 second
let b = fetch("url_b").await; // 1 second (waits for a to finish first!)
}
// ✅ CONCURRENT: Takes 1 second total
async fn fast() {
let (a, b) = tokio::join!(
fetch("url_a"), // Both start immediately
fetch("url_b"),
);
}
// ✅ ALSO CONCURRENT: Using let + join
async fn also_fast() {
let fut_a = fetch("url_a"); // Create future (lazy — not started yet)
let fut_b = fetch("url_b"); // Create future
let (a, b) = tokio::join!(fut_a, fut_b); // NOW both run concurrently
}
}
Trap:
let a = fetch(url).await; let b = fetch(url).await;is sequential! The second.awaitdoesn’t start until the first finishes. Usejoin!orspawnfor concurrency.
Case Study: Debugging a Hung Production Service
A real-world scenario: a service handles requests fine for 10 minutes, then stops responding. No errors in logs. CPU at 0%.
Diagnosis steps:
- Attach
tokio-console— reveals 200+ tasks stuck inPendingstate - Check task details — all waiting on the same
Mutex::lock().await - Root cause — one task held a
std::sync::MutexGuardacross an.awaitand panicked, poisoning the mutex. All other tasks now fail onlock().unwrap()
The fix:
| Before (broken) | After (fixed) |
|---|---|
std::sync::Mutex | tokio::sync::Mutex |
.lock().unwrap() across .await | Scope lock before .await |
| No timeout on lock acquisition | tokio::time::timeout(dur, mutex.lock()) |
| No recovery on poisoned mutex | tokio::sync::Mutex doesn’t poison |
Prevention checklist:
- Use
tokio::sync::Mutexif the guard crosses any.await - Add
#[tracing::instrument]to async functions for span tracking - Run
tokio-consolein staging to catch hung tasks early - Add health check endpoints that verify task responsiveness
🏋️ Exercise: Spot the Bugs (click to expand)
Challenge: Find all the async pitfalls in this code and fix them.
#![allow(unused)]
fn main() {
use std::sync::Mutex;
async fn process_requests(urls: Vec<String>) -> Vec<String> {
let results = Mutex::new(Vec::new());
for url in &urls {
let response = reqwest::get(url).await.unwrap().text().await.unwrap();
std::thread::sleep(std::time::Duration::from_millis(100)); // Rate limit
let mut guard = results.lock().unwrap();
guard.push(response);
expensive_parse(&guard).await; // Parse all results so far
}
results.into_inner().unwrap()
}
}
🔑 Solution
Bugs found:
- Sequential fetches — URLs are fetched one at a time instead of concurrently
std::thread::sleep— Blocks the executor thread- MutexGuard held across
.await—guardis alive whenexpensive_parseis awaited - No concurrency — Should use
join!orFuturesUnordered
#![allow(unused)]
fn main() {
use tokio::sync::Mutex;
use std::sync::Arc;
use futures::stream::{self, StreamExt};
async fn process_requests(urls: Vec<String>) -> Vec<String> {
// Fix 4: Process URLs concurrently with buffer_unordered
let results: Vec<String> = stream::iter(urls)
.map(|url| async move {
let response = reqwest::get(&url).await.unwrap().text().await.unwrap();
// Fix 2: Use tokio::time::sleep instead of std::thread::sleep
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
response
})
.buffer_unordered(10) // Up to 10 concurrent requests
.collect()
.await;
// Fix 3: Parse after collecting — no mutex needed at all!
for result in &results {
expensive_parse(result).await;
}
results
}
}
Key takeaway: Often you can restructure async code to eliminate mutexes entirely. Collect results with streams/join, then process. Simpler, faster, no deadlock risk.
Debugging Async Code
Async stack traces are notoriously cryptic — they show the executor’s poll loop rather than your logical call chain. Here are the essential debugging tools.
tokio-console: Real-Time Task Inspector
tokio-console gives you an htop-like view of every spawned task: its state, poll duration, waker activity, and resource usage.
# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
#[tokio::main]
async fn main() {
console_subscriber::init(); // Replaces the default tracing subscriber
// ... rest of your application
}
Then in another terminal:
$ RUSTFLAGS="--cfg tokio_unstable" cargo run # Required compile-time flag
$ tokio-console # Connects to 127.0.0.1:6669
tracing + #[instrument]: Structured Logging for Async
The tracing crate understands Future lifetimes. Spans stay open across .await points, giving you a logical call stack even when the OS thread has moved on:
#![allow(unused)]
fn main() {
use tracing::{info, instrument};
#[instrument(skip(db_pool), fields(user_id = %user_id))]
async fn handle_request(user_id: u64, db_pool: &Pool) -> Result<Response> {
info!("looking up user");
let user = db_pool.get_user(user_id).await?; // span stays open across .await
info!(email = %user.email, "found user");
let orders = fetch_orders(user_id).await?; // still the same span
Ok(build_response(user, orders))
}
}
Output (with tracing_subscriber::fmt::json()):
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"message":"looking up user"}
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"fields":{"email":"a@b.com"},"message":"found user"}
Debugging Checklist
| Symptom | Likely Cause | Tool |
|---|---|---|
| Task hangs forever | Missing .await or deadlocked Mutex | tokio-console task view |
| Low throughput | Blocking call on async thread | tokio-console poll-time histogram |
Future is not Send | Non-Send type held across .await | Compiler error + #[instrument] to locate |
| Mysterious cancellation | Parent select! dropped a branch | tracing span lifecycle events |
Tip: Enable
RUSTFLAGS="--cfg tokio_unstable"to get task-level metrics in tokio-console. This is a compile-time flag, not a runtime one.
Testing Async Code
Async code introduces unique testing challenges — you need a runtime, time control, and strategies for testing concurrent behavior.
Basic async tests with #[tokio::test]:
#![allow(unused)]
fn main() {
// Cargo.toml
// [dev-dependencies]
// tokio = { version = "1", features = ["full", "test-util"] }
#[tokio::test]
async fn test_basic_async() {
let result = fetch_data().await;
assert_eq!(result, "expected");
}
// Single-threaded test (useful for !Send types):
#[tokio::test(flavor = "current_thread")]
async fn test_single_threaded() {
let rc = std::rc::Rc::new(42);
let val = async { *rc }.await;
assert_eq!(val, 42);
}
// Multi-threaded with explicit worker count:
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_behavior() {
// Tests race conditions with real concurrency
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let c1 = counter.clone();
let c2 = counter.clone();
let (a, b) = tokio::join!(
tokio::spawn(async move { c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
tokio::spawn(async move { c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
);
a.unwrap();
b.unwrap();
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 2);
}
}
Time manipulation — test timeouts without actually waiting:
#![allow(unused)]
fn main() {
use tokio::time::{self, Duration, Instant};
#[tokio::test]
async fn test_timeout_behavior() {
// Pause time — sleep() advances instantly, no real wall-clock delay
time::pause();
let start = Instant::now();
time::sleep(Duration::from_secs(3600)).await; // "waits" 1 hour — takes 0ms
assert!(start.elapsed() >= Duration::from_secs(3600));
// Test ran in milliseconds, not an hour!
}
#[tokio::test]
async fn test_retry_timing() {
time::pause();
// Test that our retry logic waits the expected durations
let start = Instant::now();
let result = retry_with_backoff(|| async {
Err::<(), _>("simulated failure")
}, 3, Duration::from_secs(1))
.await;
assert!(result.is_err());
// 1s + 2s + 4s = 7s of backoff (exponential)
assert!(start.elapsed() >= Duration::from_secs(7));
}
#[tokio::test]
async fn test_deadline_exceeded() {
time::pause();
let result = tokio::time::timeout(
Duration::from_secs(5),
async {
// Simulate slow operation
time::sleep(Duration::from_secs(10)).await;
"done"
}
).await;
assert!(result.is_err()); // Timed out
}
}
Mocking async dependencies — use trait objects or generics:
#![allow(unused)]
fn main() {
// Define a trait for the dependency:
trait Storage {
async fn get(&self, key: &str) -> Option<String>;
async fn set(&self, key: &str, value: String);
}
// Production implementation:
struct RedisStorage { /* ... */ }
impl Storage for RedisStorage {
async fn get(&self, key: &str) -> Option<String> {
// Real Redis call
todo!()
}
async fn set(&self, key: &str, value: String) {
todo!()
}
}
// Test mock:
struct MockStorage {
data: std::sync::Mutex<std::collections::HashMap<String, String>>,
}
impl MockStorage {
fn new() -> Self {
MockStorage { data: std::sync::Mutex::new(std::collections::HashMap::new()) }
}
}
impl Storage for MockStorage {
async fn get(&self, key: &str) -> Option<String> {
self.data.lock().unwrap().get(key).cloned()
}
async fn set(&self, key: &str, value: String) {
self.data.lock().unwrap().insert(key.to_string(), value);
}
}
// Tested function is generic over Storage:
async fn cache_lookup<S: Storage>(store: &S, key: &str) -> String {
match store.get(key).await {
Some(val) => val,
None => {
let val = "computed".to_string();
store.set(key, val.clone()).await;
val
}
}
}
#[tokio::test]
async fn test_cache_miss_then_hit() {
let mock = MockStorage::new();
// First call: miss → computes and stores
let val = cache_lookup(&mock, "key1").await;
assert_eq!(val, "computed");
// Second call: hit → returns stored value
let val = cache_lookup(&mock, "key1").await;
assert_eq!(val, "computed");
assert!(mock.data.lock().unwrap().contains_key("key1"));
}
}
Testing channels and task communication:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_producer_consumer() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
// tx dropped here — channel closes
});
let mut received = Vec::new();
while let Some(val) = rx.recv().await {
received.push(val);
}
assert_eq!(received, vec![0, 1, 2, 3, 4]);
}
}
| Test Pattern | When to Use | Key Tool |
|---|---|---|
#[tokio::test] | All async tests | tokio = { features = ["macros", "rt"] } |
time::pause() | Testing timeouts, retries, periodic tasks | tokio::time::pause() |
| Trait mocking | Testing business logic without I/O | Generic <S: Storage> |
current_thread flavor | Testing !Send types or deterministic scheduling | #[tokio::test(flavor = "current_thread")] |
multi_thread flavor | Testing race conditions | #[tokio::test(flavor = "multi_thread")] |
Key Takeaways — Common Pitfalls
- Never block the executor — use
spawn_blockingfor CPU/sync work- Never hold a
MutexGuardacross.await— scope locks tightly or usetokio::sync::Mutex- Cancellation drops the future instantly — use “cancel-safe” patterns for partial operations
- Use
tokio-consoleand#[tracing::instrument]for debugging async code- Test async code with
#[tokio::test]andtime::pause()for deterministic timing
See also: Ch 8 — Tokio Deep Dive for sync primitives, Ch 13 — Production Patterns for graceful shutdown and structured concurrency
13. Production Patterns 🔴
What you’ll learn:
- Graceful shutdown with
watchchannels andselect!- Backpressure: bounded channels prevent OOM
- Structured concurrency:
JoinSetandTaskTracker- Timeouts, retries, and exponential backoff
- Error handling:
thiserrorvsanyhow, the double-?pattern- Tower: the middleware pattern used by axum, tonic, and hyper
Graceful Shutdown
Production servers must shut down cleanly — finish in-flight requests, flush buffers, close connections:
use tokio::signal;
use tokio::sync::watch;
async fn main_server() {
// Create a shutdown signal channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Spawn the server
let server_handle = tokio::spawn(run_server(shutdown_rx.clone()));
// Wait for Ctrl+C
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
println!("Shutdown signal received, finishing in-flight requests...");
// Notify all tasks to shut down
// NOTE: .unwrap() is used for brevity. Production code should handle
// the case where all receivers have been dropped.
shutdown_tx.send(true).unwrap();
// Wait for server to finish (with timeout)
match tokio::time::timeout(
std::time::Duration::from_secs(30),
server_handle,
).await {
Ok(Ok(())) => println!("Server shut down gracefully"),
Ok(Err(e)) => eprintln!("Server error: {e}"),
Err(_) => eprintln!("Server shutdown timed out — forcing exit"),
}
}
async fn run_server(mut shutdown: watch::Receiver<bool>) {
loop {
tokio::select! {
// Accept new connections
conn = accept_connection() => {
let shutdown = shutdown.clone();
tokio::spawn(handle_connection(conn, shutdown));
}
// Shutdown signal
_ = shutdown.changed() => {
if *shutdown.borrow() {
println!("Stopping accepting new connections");
break;
}
}
}
}
// In-flight connections will finish on their own
// because they have their own shutdown_rx clone
}
async fn handle_connection(conn: Connection, mut shutdown: watch::Receiver<bool>) {
loop {
tokio::select! {
request = conn.next_request() => {
// Process the request fully — don't abandon mid-request
process_request(request).await;
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
// Finish current request, then exit
break;
}
}
}
}
}
sequenceDiagram
participant OS as OS Signal
participant Main as Main Task
participant WCH as watch Channel
participant W1 as Worker 1
participant W2 as Worker 2
OS->>Main: SIGINT (Ctrl+C)
Main->>WCH: send(true)
WCH-->>W1: changed()
WCH-->>W2: changed()
Note over W1: Finish current request
Note over W2: Finish current request
W1-->>Main: Task complete
W2-->>Main: Task complete
Main->>Main: All workers done → exit
Backpressure with Bounded Channels
Unbounded channels can lead to OOM if the producer is faster than the consumer. Always use bounded channels in production:
#![allow(unused)]
fn main() {
use tokio::sync::mpsc;
async fn backpressure_example() {
// Bounded channel: max 100 items buffered
let (tx, mut rx) = mpsc::channel::<WorkItem>(100);
// Producer: slows down naturally when buffer is full
let producer = tokio::spawn(async move {
for i in 0..1_000_000 {
// send() is async — waits if buffer is full
// This creates natural backpressure!
tx.send(WorkItem { id: i }).await.unwrap();
}
});
// Consumer: processes items at its own pace
let consumer = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
process(item).await; // Slow processing is OK — producer waits
}
});
let _ = tokio::join!(producer, consumer);
}
// Compare with unbounded — DANGEROUS:
// let (tx, rx) = mpsc::unbounded_channel(); // No backpressure!
// Producer can fill memory indefinitely
}
Structured Concurrency: JoinSet and TaskTracker
JoinSet groups related tasks and ensures they all complete:
#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};
async fn structured_concurrency() {
let mut set = JoinSet::new();
// Spawn a batch of tasks
for url in get_urls() {
set.spawn(async move {
fetch_and_process(url).await
});
}
// Collect all results (order not guaranteed)
let mut results = Vec::new();
while let Some(result) = set.join_next().await {
match result {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => eprintln!("Task error: {e}"),
Err(e) => eprintln!("Task panicked: {e}"),
}
}
// ALL tasks are done here — no dangling background work
println!("Processed {} items", results.len());
}
// TaskTracker (tokio-util 0.7.9+) — wait for all spawned tasks
use tokio_util::task::TaskTracker;
async fn with_tracker() {
let tracker = TaskTracker::new();
for i in 0..10 {
tracker.spawn(async move {
sleep(Duration::from_millis(100 * i)).await;
println!("Task {i} done");
});
}
tracker.close(); // No more tasks will be added
tracker.wait().await; // Wait for ALL tracked tasks
println!("All tasks finished");
}
}
Timeouts and Retries
#![allow(unused)]
fn main() {
use tokio::time::{timeout, sleep, Duration};
// Simple timeout
async fn with_timeout() -> Result<Response, Error> {
match timeout(Duration::from_secs(5), fetch_data()).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(e)) => Err(Error::Fetch(e)),
Err(_) => Err(Error::Timeout),
}
}
// Exponential backoff retry
async fn retry_with_backoff<F, Fut, T, E>(
max_attempts: u32,
base_delay_ms: u64,
operation: F,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut delay = Duration::from_millis(base_delay_ms);
for attempt in 1..=max_attempts {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt == max_attempts {
eprintln!("Final attempt {attempt} failed: {e}");
return Err(e);
}
eprintln!("Attempt {attempt} failed: {e}, retrying in {delay:?}");
sleep(delay).await;
delay *= 2; // Exponential backoff
}
}
}
unreachable!()
}
// Usage:
// let result = retry_with_backoff(3, 100, || async {
// reqwest::get("https://api.example.com/data").await
// }).await?;
}
Production tip — add jitter: The function above uses pure exponential backoff, but in production many clients failing simultaneously will all retry at the same intervals (thundering herd). Add random jitter — e.g.,
sleep(delay + rand_jitter)whererand_jitteris0..delay/4— so retries spread out over time.
Error Handling in Async Code
Async introduces unique error propagation challenges — spawned tasks create error boundaries, timeout errors wrap inner errors, and ? interacts differently when futures cross task boundaries.
thiserror vs anyhow — choosing the right tool:
#![allow(unused)]
fn main() {
// thiserror: Define typed errors for libraries and public APIs
// Every variant is explicit — callers can match on specific errors
use thiserror::Error;
#[derive(Error, Debug)]
enum DiagError {
#[error("IPMI command failed: {0}")]
Ipmi(#[from] IpmiError),
#[error("Sensor {sensor} out of range: {value}°C (max {max}°C)")]
OverTemp { sensor: String, value: f64, max: f64 },
#[error("Operation timed out after {0:?}")]
Timeout(std::time::Duration),
#[error("Task panicked: {0}")]
TaskPanic(#[from] tokio::task::JoinError),
}
// anyhow: Quick error handling for applications and prototypes
// Wraps any error — no need to define types for every case
use anyhow::{Context, Result};
async fn run_diagnostics() -> Result<()> {
let config = load_config()
.await
.context("Failed to load diagnostic config")?; // Adds context
let result = run_gpu_test(&config)
.await
.context("GPU diagnostic failed")?; // Chains context
Ok(())
}
// anyhow prints: "GPU diagnostic failed: IPMI command failed: timeout"
}
| Crate | Use When | Error Type | Matching |
|---|---|---|---|
thiserror | Library code, public APIs | enum MyError { ... } | match err { MyError::Timeout => ... } |
anyhow | Applications, CLI tools, scripts | anyhow::Error (type-erased) | err.downcast_ref::<MyError>() |
| Both together | Library exposes thiserror, app wraps with anyhow | Best of both | Library errors are typed, app doesn’t care |
The double-? pattern with tokio::spawn:
#![allow(unused)]
fn main() {
use thiserror::Error;
use tokio::task::JoinError;
#[derive(Error, Debug)]
enum AppError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("Task panicked: {0}")]
TaskPanic(#[from] JoinError),
}
async fn spawn_with_errors() -> Result<String, AppError> {
let handle = tokio::spawn(async {
let resp = reqwest::get("https://example.com").await?;
Ok::<_, reqwest::Error>(resp.text().await?)
});
// Double ?: First ? unwraps JoinError (task panic), second ? unwraps inner Result
let result = handle.await??;
Ok(result)
}
}
The error boundary problem — tokio::spawn erases context:
#![allow(unused)]
fn main() {
// ❌ Error context is lost across spawn boundaries:
async fn bad_error_handling() -> Result<()> {
let handle = tokio::spawn(async {
some_fallible_work().await // Returns Result<T, SomeError>
});
// handle.await returns Result<Result<T, SomeError>, JoinError>
// The inner error has no context about what task failed
let result = handle.await??;
Ok(())
}
// ✅ Add context at the spawn boundary:
async fn good_error_handling() -> Result<()> {
let handle = tokio::spawn(async {
some_fallible_work()
.await
.context("worker task failed") // Context before crossing boundary
});
let result = handle.await
.context("worker task panicked")??; // Context for JoinError too
Ok(())
}
}
Timeout errors — wrapping vs replacing:
#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};
async fn with_timeout_context() -> Result<String, DiagError> {
let dur = Duration::from_secs(30);
match timeout(dur, fetch_sensor_data()).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(e)) => Err(e), // Inner error preserved
Err(_) => Err(DiagError::Timeout(dur)), // Timeout → typed error
}
}
}
Tower: The Middleware Pattern
The Tower crate defines a composable Service trait — the backbone of async middleware in Rust (used by axum, tonic, hyper):
#![allow(unused)]
fn main() {
// Tower's core trait (simplified):
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
}
Middleware wraps a Service to add cross-cutting behavior — logging, timeouts, rate-limiting — without modifying inner logic:
#![allow(unused)]
fn main() {
use tower::{ServiceBuilder, timeout::TimeoutLayer, limit::RateLimitLayer};
use std::time::Duration;
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(10))) // Outermost: timeout
.layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Then: rate limit
.service(my_handler); // Innermost: your code
}
Why this matters: If you’ve used ASP.NET middleware or Express.js middleware, Tower is the Rust equivalent. It’s how production Rust services add cross-cutting concerns without code duplication.
Exercise: Graceful Shutdown with Worker Pool
🏋️ Exercise (click to expand)
Challenge: Build a task processor with a channel-based work queue, N worker tasks, and graceful shutdown on Ctrl+C. Workers should finish in-flight work before exiting.
🔑 Solution
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};
struct WorkItem { id: u64, payload: String }
#[tokio::main]
async fn main() {
let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));
let mut handles = Vec::new();
for id in 0..4 {
let rx = work_rx.clone();
let mut shutdown = shutdown_rx.clone();
handles.push(tokio::spawn(async move {
loop {
let item = {
let mut rx = rx.lock().await;
tokio::select! {
item = rx.recv() => item,
_ = shutdown.changed() => {
if *shutdown.borrow() { None } else { continue }
}
}
};
match item {
Some(work) => {
println!("Worker {id}: processing {}", work.id);
sleep(Duration::from_millis(200)).await;
}
None => break,
}
}
}));
}
// Submit work
for i in 0..20 {
let _ = work_tx.send(WorkItem { id: i, payload: format!("task-{i}") }).await;
sleep(Duration::from_millis(50)).await;
}
// On Ctrl+C: signal shutdown, wait for workers
// NOTE: .unwrap() is used for brevity — handle errors in production.
tokio::signal::ctrl_c().await.unwrap();
shutdown_tx.send(true).unwrap();
for h in handles { let _ = h.await; }
println!("Shut down cleanly.");
}
Key Takeaways — Production Patterns
- Use a
watchchannel +select!for coordinated graceful shutdown- Bounded channels (
mpsc::channel(N)) provide backpressure — senders block when the buffer is fullJoinSetandTaskTrackerprovide structured concurrency: track, abort, and await task groups- Always add timeouts to network operations —
tokio::time::timeout(dur, fut)- Tower’s
Servicetrait is the standard middleware pattern for production Rust services
See also: Ch 8 — Tokio Deep Dive for channels and sync primitives, Ch 12 — Common Pitfalls for cancellation hazards during shutdown
14. Exercises
Exercises
Exercise 1: Async Echo Server
Build a TCP echo server that handles multiple clients concurrently.
Requirements:
- Listen on
127.0.0.1:8080 - Accept connections and echo back each line
- Handle client disconnections gracefully
- Print a log when clients connect/disconnect
🔑 Solution
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Echo server listening on :8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("[{addr}] Connected");
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
println!("[{addr}] Disconnected");
break;
}
Ok(_) => {
print!("[{addr}] Echo: {line}");
if writer.write_all(line.as_bytes()).await.is_err() {
println!("[{addr}] Write error, disconnecting");
break;
}
}
Err(e) => {
eprintln!("[{addr}] Read error: {e}");
break;
}
}
}
});
}
}
Exercise 2: Concurrent URL Fetcher with Rate Limiting
Fetch a list of URLs concurrently, with at most 5 concurrent requests.
🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn fetch_urls(urls: Vec<String>) -> Vec<Result<String, String>> {
// buffer_unordered(5) ensures at most 5 futures are polled
// concurrently — no separate Semaphore needed here.
let results: Vec<_> = stream::iter(urls)
.map(|url| {
async move {
println!("Fetching: {url}");
match reqwest::get(&url).await {
Ok(resp) => match resp.text().await {
Ok(body) => Ok(body),
Err(e) => Err(format!("{url}: {e}")),
},
Err(e) => Err(format!("{url}: {e}")),
}
}
})
.buffer_unordered(5) // ← This alone limits concurrency to 5
.collect()
.await;
results
}
// NOTE: Use Semaphore when you need to limit concurrency across
// independently spawned tasks (tokio::spawn). Use buffer_unordered
// when processing a stream. Don't combine both for the same limit.
}
Exercise 3: Graceful Shutdown with Worker Pool
Build a task processor with:
- A channel-based work queue
- N worker tasks consuming from the queue
- Graceful shutdown on Ctrl+C: stop accepting, finish in-flight work
🔑 Solution
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};
struct WorkItem {
id: u64,
payload: String,
}
#[tokio::main]
async fn main() {
let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Spawn 4 workers
let mut worker_handles = Vec::new();
let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));
for id in 0..4 {
let rx = work_rx.clone();
let mut shutdown = shutdown_rx.clone();
let handle = tokio::spawn(async move {
loop {
let item = {
let mut rx = rx.lock().await;
tokio::select! {
item = rx.recv() => item,
_ = shutdown.changed() => {
if *shutdown.borrow() { None } else { continue }
}
}
};
match item {
Some(work) => {
println!("Worker {id}: processing item {}", work.id);
sleep(Duration::from_millis(200)).await; // Simulate work
println!("Worker {id}: done with item {}", work.id);
}
None => {
println!("Worker {id}: channel closed, exiting");
break;
}
}
}
});
worker_handles.push(handle);
}
// Producer: submit some work
let producer = tokio::spawn(async move {
for i in 0..20 {
let _ = work_tx.send(WorkItem {
id: i,
payload: format!("task-{i}"),
}).await;
sleep(Duration::from_millis(50)).await;
}
});
// Wait for Ctrl+C
tokio::signal::ctrl_c().await.unwrap();
println!("\nShutdown signal received!");
shutdown_tx.send(true).unwrap();
producer.abort(); // Cancel the producer task
// Wait for workers to finish
for handle in worker_handles {
let _ = handle.await;
}
println!("All workers shut down. Goodbye!");
}
Exercise 4: Build a Simple Async Mutex from Scratch
Implement an async-aware mutex using channels (without using tokio::sync::Mutex).
Hint: Use a tokio::sync::mpsc channel with capacity 1 as a semaphore.
🔑 Solution
#![allow(unused)]
fn main() {
use std::cell::UnsafeCell;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub struct SimpleAsyncMutex<T> {
data: Arc<UnsafeCell<T>>,
semaphore: Arc<Semaphore>,
}
// SAFETY: Access to T is serialized by the semaphore (max 1 permit).
unsafe impl<T: Send> Send for SimpleAsyncMutex<T> {}
unsafe impl<T: Send> Sync for SimpleAsyncMutex<T> {}
pub struct SimpleGuard<T> {
data: Arc<UnsafeCell<T>>,
_permit: OwnedSemaphorePermit, // Dropped on guard drop → releases lock
}
impl<T> SimpleAsyncMutex<T> {
pub fn new(value: T) -> Self {
SimpleAsyncMutex {
data: Arc::new(UnsafeCell::new(value)),
semaphore: Arc::new(Semaphore::new(1)),
}
}
pub async fn lock(&self) -> SimpleGuard<T> {
let permit = self.semaphore.clone().acquire_owned().await.unwrap();
SimpleGuard {
data: self.data.clone(),
_permit: permit,
}
}
}
impl<T> std::ops::Deref for SimpleGuard<T> {
type Target = T;
fn deref(&self) -> &T {
// SAFETY: We hold the only semaphore permit, so no other
// SimpleGuard exists → exclusive access is guaranteed.
unsafe { &*self.data.get() }
}
}
impl<T> std::ops::DerefMut for SimpleGuard<T> {
fn deref_mut(&mut self) -> &mut T {
// SAFETY: Same reasoning — single permit guarantees exclusivity.
unsafe { &mut *self.data.get() }
}
}
// When SimpleGuard is dropped, _permit is dropped,
// which releases the semaphore permit — another lock() can proceed.
// Usage:
// let mutex = SimpleAsyncMutex::new(vec![1, 2, 3]);
// {
// let mut guard = mutex.lock().await;
// guard.push(4);
// } // permit released here
}
Key takeaway: Async mutexes are typically built on top of semaphores. The semaphore provides the async wait mechanism — when locked, acquire() suspends the task until the permit is released. This is exactly how tokio::sync::Mutex works internally.
Why
UnsafeCelland notstd::sync::Mutex? A previous version of this exercise usedArc<Mutex<T>>withDeref/DerefMutcalling.lock().unwrap(). That doesn’t compile — the returned&Tborrows from a temporaryMutexGuardthat’s dropped immediately.UnsafeCellavoids the intermediate guard, and the semaphore-based serialization makes theunsafesound.
Exercise 5: Stream Pipeline
Build a data processing pipeline using streams:
- Generate numbers 1..=100
- Filter to even numbers
- Map each to its square
- Process 10 at a time concurrently (simulate with sleep)
- Collect results
🔑 Solution
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let results: Vec<u64> = stream::iter(1u64..=100)
// Step 2: Filter evens
.filter(|x| futures::future::ready(x % 2 == 0))
// Step 3: Square each
.map(|x| x * x)
// Step 4: Process concurrently (simulate async work)
.map(|x| async move {
sleep(Duration::from_millis(50)).await;
println!("Processed: {x}");
x
})
.buffer_unordered(10) // 10 concurrent
// Step 5: Collect
.collect()
.await;
println!("Got {} results", results.len());
println!("Sum: {}", results.iter().sum::<u64>());
}
Exercise 6: Implement Select with Timeout
Without using tokio::select! or tokio::time::timeout, implement a function that races a future against a deadline and returns Either::Left(result) or Either::Right(()) on timeout.
Hint: Build on the Select combinator from Chapter 6 and the TimerFuture from the same chapter.
🔑 Solution
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub enum Either<A, B> {
Left(A),
Right(B),
}
pub struct Timeout<F> {
future: F,
timer: TimerFuture, // From Chapter 6
}
impl<F: Future + Unpin> Timeout<F> {
pub fn new(future: F, duration: Duration) -> Self {
Timeout {
future,
timer: TimerFuture::new(duration),
}
}
}
impl<F: Future + Unpin> Future for Timeout<F> {
type Output = Either<F::Output, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the main future is done
if let Poll::Ready(val) = Pin::new(&mut self.future).poll(cx) {
return Poll::Ready(Either::Left(val));
}
// Check if the timer expired
if let Poll::Ready(()) = Pin::new(&mut self.timer).poll(cx) {
return Poll::Ready(Either::Right(()));
}
Poll::Pending
}
}
// Usage:
// match Timeout::new(fetch_data(), Duration::from_secs(5)).await {
// Either::Left(data) => println!("Got data: {data}"),
// Either::Right(()) => println!("Timed out!"),
// }
Key takeaway: select/timeout is just polling two futures and seeing which completes first. The entire async ecosystem is built from this simple primitive: poll, Pending/Ready, Waker.
Summary and Reference Card
Quick Reference Card
Async Mental Model
┌─────────────────────────────────────────────────────┐
│ async fn → State Machine (enum) → impl Future │
│ .await → poll() the inner future │
│ executor → loop { poll(); sleep_until_woken(); } │
│ waker → "hey executor, poll me again" │
│ Pin → "promise I won't move in memory" │
└─────────────────────────────────────────────────────┘
Common Patterns Cheat Sheet
| Goal | Use |
|---|---|
| Run two futures concurrently | tokio::join!(a, b) |
| Race two futures | tokio::select! { ... } |
| Spawn a background task | tokio::spawn(async { ... }) |
| Run blocking code in async | tokio::task::spawn_blocking(\|\| { ... }) |
| Limit concurrency | Semaphore::new(N) |
| Collect many task results | JoinSet |
| Share state across tasks | Arc<Mutex<T>> or channels |
| Graceful shutdown | watch::channel + select! |
| Process a stream N-at-a-time | .buffer_unordered(N) |
| Timeout a future | tokio::time::timeout(dur, fut) |
| Retry with backoff | Custom combinator (see Ch. 13) |
Pinning Quick Reference
| Situation | Use |
|---|---|
| Pin a future on the heap | Box::pin(fut) |
| Pin a future on the stack | tokio::pin!(fut) |
Pin an Unpin type | Pin::new(&mut val) — safe, free |
| Return a pinned trait object | -> Pin<Box<dyn Future<Output = T> + Send>> |
Channel Selection Guide
| Channel | Producers | Consumers | Values | Use When |
|---|---|---|---|---|
mpsc | N | 1 | Stream | Work queues, event buses |
oneshot | 1 | 1 | Single | Request/response, completion notification |
broadcast | N | N | All recv all | Fan-out notifications, shutdown signals |
watch | 1 | N | Latest only | Config updates, health status |
Mutex Selection Guide
| Mutex | Use When |
|---|---|
std::sync::Mutex | Lock is held briefly, never across .await |
tokio::sync::Mutex | Lock must be held across .await |
parking_lot::Mutex | High contention, no .await, need performance |
tokio::sync::RwLock | Many readers, few writers, locks cross .await |
Decision Quick Reference
Need concurrency?
├── I/O-bound → async/await
├── CPU-bound → rayon / std::thread
└── Mixed → spawn_blocking for CPU parts
Choosing runtime?
├── Server app → tokio
├── Library → runtime-agnostic (futures crate)
├── Embedded → embassy
└── Minimal → smol
Need concurrent futures?
├── Can be 'static + Send → tokio::spawn
├── Can be 'static + !Send → LocalSet
├── Can't be 'static → FuturesUnordered
└── Need to track/abort → JoinSet
Common Error Messages and Fixes
| Error | Cause | Fix |
|---|---|---|
future is not Send | Holding !Send type across .await | Scope the value so it’s dropped before .await, or use current_thread runtime |
borrowed value does not live long enough in spawn | tokio::spawn requires 'static | Use Arc, clone(), or FuturesUnordered |
the trait Future is not implemented for () | Missing .await | Add .await to the async call |
cannot borrow as mutable in poll | Self-referential borrow | Use Pin<&mut Self> correctly (see Ch. 4) |
| Program hangs silently | Forgot to call waker.wake() | Ensure every Pending path registers and triggers the waker |
Further Reading
| Resource | Why |
|---|---|
| Tokio Tutorial | Official hands-on guide — excellent for first projects |
| Async Book (official) | Covers Future, Pin, Stream at the language level |
| Jon Gjengset — Crust of Rust: async/await | 2-hour deep dive into internals with live coding |
| Alice Ryhl — Actors with Tokio | Production architecture pattern for stateful services |
| Without Boats — Pin, Unpin, and why Rust needs them | The original motivation from the language designer |
| Tokio mini-Redis | Complete async Rust project — study-quality production code |
| Tower documentation | Middleware/service architecture used by axum, tonic, hyper |
End of Async Rust Training Guide
Capstone Project: Async Chat Server
This project integrates patterns from across the book into a single, production-style application. You’ll build a multi-room async chat server using tokio, channels, streams, graceful shutdown, and proper error handling.
Estimated time: 4–6 hours | Difficulty: ★★★
What you’ll practice:
tokio::spawnand the'staticrequirement (Ch 8)- Channels:
mpscfor messages,broadcastfor rooms,watchfor shutdown (Ch 8)- Streams: reading lines from TCP connections (Ch 11)
- Common pitfalls: cancellation safety, MutexGuard across
.await(Ch 12)- Production patterns: graceful shutdown, backpressure (Ch 13)
- Async traits for pluggable backends (Ch 10)
The Problem
Build a TCP chat server where:
- Clients connect via TCP and join named rooms
- Messages are broadcast to all clients in the same room
- Commands:
/join <room>,/nick <name>,/rooms,/quit - The server shuts down gracefully on Ctrl+C — finishing in-flight messages
graph LR
C1["Client 1<br/>(Alice)"] -->|TCP| SERVER["Chat Server"]
C2["Client 2<br/>(Bob)"] -->|TCP| SERVER
C3["Client 3<br/>(Carol)"] -->|TCP| SERVER
SERVER --> R1["#general<br/>broadcast channel"]
SERVER --> R2["#rust<br/>broadcast channel"]
R1 -->|msg| C1
R1 -->|msg| C2
R2 -->|msg| C3
CTRL["Ctrl+C"] -->|watch| SERVER
style SERVER fill:#e8f4f8,stroke:#2980b9,color:#000
style R1 fill:#d4efdf,stroke:#27ae60,color:#000
style R2 fill:#d4efdf,stroke:#27ae60,color:#000
style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
Step 1: Basic TCP Accept Loop
Start with a server that accepts connections and echoes lines back:
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Chat server listening on :8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("[{addr}] Connected");
tokio::spawn(async move {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let _ = writer.write_all(line.as_bytes()).await;
}
}
}
println!("[{addr}] Disconnected");
});
}
}
Your job: Verify this compiles and works with telnet localhost 8080.
Step 2: Room State with Broadcast Channels
Each room is a broadcast::Sender. All clients in a room subscribe to receive messages.
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
type RoomMap = Arc<RwLock<HashMap<String, broadcast::Sender<String>>>>;
fn get_or_create_room(rooms: &mut HashMap<String, broadcast::Sender<String>>, name: &str) -> broadcast::Sender<String> {
rooms.entry(name.to_string())
.or_insert_with(|| {
let (tx, _) = broadcast::channel(100); // 100-message buffer
tx
})
.clone()
}
}
Your job: Implement room state so that:
- Clients start in
#general /join <room>switches rooms (unsubscribe from old, subscribe to new)- Messages are broadcast to all clients in the sender’s current room
💡 Hint — Client task structure
Each client task needs two concurrent loops:
- Read from TCP → parse commands or broadcast to room
- Read from broadcast receiver → write to TCP
Use tokio::select! to run both:
#![allow(unused)]
fn main() {
loop {
tokio::select! {
// Client sent us a line
result = reader.read_line(&mut line) => {
match result {
Ok(0) | Err(_) => break,
Ok(_) => {
// Parse command or broadcast message
}
}
}
// Room broadcast received
result = room_rx.recv() => {
match result {
Ok(msg) => {
let _ = writer.write_all(msg.as_bytes()).await;
}
Err(_) => break,
}
}
}
}
}
Step 3: Commands
Implement the command protocol:
| Command | Action |
|---|---|
/join <room> | Leave current room, join new room, announce in both |
/nick <name> | Change display name |
/rooms | List all active rooms and member counts |
/quit | Disconnect gracefully |
| Anything else | Broadcast as a chat message |
Your job: Parse commands from the input line. For /rooms, you’ll need to read from the RoomMap — use RwLock::read() to avoid blocking other clients.
Step 4: Graceful Shutdown
Add Ctrl+C handling so the server:
- Stops accepting new connections
- Sends “Server shutting down…” to all rooms
- Waits for in-flight messages to drain
- Exits cleanly
#![allow(unused)]
fn main() {
use tokio::sync::watch;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// In the accept loop:
loop {
tokio::select! {
result = listener.accept() => {
let (socket, addr) = result?;
// spawn client task with shutdown_rx.clone()
}
_ = tokio::signal::ctrl_c() => {
println!("Shutdown signal received");
shutdown_tx.send(true)?;
break;
}
}
}
}
Your job: Add shutdown_rx.changed() to each client’s select! loop so clients exit when shutdown is signaled.
Step 5: Error Handling and Edge Cases
Production-harden the server:
- Lagging receivers:
broadcast::recv()returnsRecvError::Lagged(n)if a slow client misses messages. Handle it gracefully (log + continue, don’t crash). - Nickname validation: Reject empty or too-long nicknames.
- Backpressure: The broadcast channel buffer is bounded (100). If a client can’t keep up, they get the
Laggederror. - Timeout: Disconnect clients that are idle for >5 minutes.
#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};
// Wrap the read in a timeout:
match timeout(Duration::from_secs(300), reader.read_line(&mut line)).await {
Ok(Ok(0)) | Ok(Err(_)) | Err(_) => break, // EOF, error, or timeout
Ok(Ok(_)) => { /* process line */ }
}
}
Step 6: Integration Test
Write a test that starts the server, connects two clients, and verifies message delivery:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn two_clients_can_chat() {
// Start server in background
let server = tokio::spawn(run_server("127.0.0.1:0")); // Port 0 = OS picks
// Connect two clients
let mut client1 = TcpStream::connect(addr).await.unwrap();
let mut client2 = TcpStream::connect(addr).await.unwrap();
// Client 1 sends a message
client1.write_all(b"Hello from client 1\n").await.unwrap();
// Client 2 should receive it
let mut buf = vec![0u8; 1024];
let n = client2.read(&mut buf).await.unwrap();
let msg = String::from_utf8_lossy(&buf[..n]);
assert!(msg.contains("Hello from client 1"));
}
}
Evaluation Criteria
| Criterion | Target |
|---|---|
| Concurrency | Multiple clients in multiple rooms, no blocking |
| Correctness | Messages only go to clients in the same room |
| Graceful shutdown | Ctrl+C drains messages and exits cleanly |
| Error handling | Lagged receivers, disconnections, timeouts handled |
| Code organization | Clean separation: accept loop, client task, room state |
| Testing | At least 2 integration tests |
Extension Ideas
Once the basic chat server works, try these enhancements:
- Persistent history: Store last N messages per room; replay to new joiners
- WebSocket support: Accept both TCP and WebSocket clients using
tokio-tungstenite - Rate limiting: Use
tokio::time::Intervalto limit messages per client per second - Metrics: Track connected clients, messages/sec, room count via
prometheuscrate - TLS: Add
tokio-rustlsfor encrypted connections