Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async Rust: From Futures to Production

Speaker Intro

  • Principal Firmware Architect in Microsoft SCHIE (Silicon and Cloud Hardware Infrastructure Engineering) team
  • Industry veteran with expertise in security, systems programming (firmware, operating systems, hypervisors), CPU and platform architecture, and C++ systems
  • Started programming in Rust in 2017 (@AWS EC2), and have been in love with the language ever since

A deep-dive guide to asynchronous programming in Rust. Unlike most async tutorials that start with tokio::main and hand-wave the internals, this guide builds understanding from first principles — the Future trait, polling, state machines — then progresses to real-world patterns, runtime selection, and production pitfalls.

Who This Is For

  • Rust developers who can write synchronous Rust but find async confusing
  • Developers from C#, Go, Python, or JavaScript who know async/await but not Rust’s model
  • Anyone who’s been bitten by Future is not Send, Pin<Box<dyn Future>>, or “why does my program hang?”

Prerequisites

You should be comfortable with:

  • Ownership, borrowing, and lifetimes
  • Traits and generics (including impl Trait)
  • Using Result<T, E> and the ? operator
  • Basic multi-threading (std::thread::spawn, Arc, Mutex)

No prior async Rust experience is needed.

How to Use This Book

Read linearly the first time. Parts I–III build on each other. Each chapter has:

SymbolMeaning
🟢Beginner — foundational concept
🟡Intermediate — requires earlier chapters
🔴Advanced — deep internals or production patterns

Each chapter includes:

  • A “What you’ll learn” block at the top
  • Mermaid diagrams for visual learners
  • An inline exercise with a hidden solution
  • Key Takeaways summarizing the core ideas
  • Cross-references to related chapters

Pacing Guide

ChaptersTopicSuggested TimeCheckpoint
1–5How Async Works6–8 hoursYou can explain Future, Poll, Pin, and why Rust has no built-in runtime
6–10The Ecosystem6–8 hoursYou can build futures by hand, choose a runtime, and use tokio’s API
11–13Production Async6–8 hoursYou can write production-grade async code with streams, proper error handling, and graceful shutdown
CapstoneChat Server4–6 hoursYou’ve built a real async application integrating all concepts

Total estimated time: 22–30 hours

Working Through Exercises

Every content chapter has an inline exercise. The capstone (Ch 16) integrates everything into a single project. For maximum learning:

  1. Try the exercise before expanding the solution — struggling is where learning happens
  2. Type the code, don’t copy-paste — muscle memory matters for Rust’s syntax
  3. Run every examplecargo new async-exercises and test as you go

Table of Contents

Part I: How Async Works

Part II: The Ecosystem

Part III: Production Async

Appendices


1. Why Async is Different in Rust 🟢

What you’ll learn:

  • Why Rust has no built-in async runtime (and what that means for you)
  • The three key properties: lazy execution, no runtime, zero-cost abstraction
  • When async is the right tool (and when it’s slower)
  • How Rust’s model compares to C#, Go, Python, and JavaScript

The Fundamental Difference

Most languages with async/await hide the machinery. C# has the CLR thread pool. JavaScript has the event loop. Go has goroutines and a scheduler built into the runtime. Python has asyncio.

Rust has nothing.

There is no built-in runtime, no thread pool, no event loop. The async keyword is a zero-cost compilation strategy — it transforms your function into a state machine that implements the Future trait. Someone else (an executor) must drive that state machine forward.

Three Key Properties of Rust Async

graph LR
    subgraph "Other Languages"
        EAGER["Eager Execution<br/>Task starts immediately"]
        BUILTIN["Built-in Runtime<br/>Thread pool included"]
        GC["GC-Managed<br/>No lifetime concerns"]
    end

    subgraph "Rust"
        LAZY["Lazy Execution<br/>Nothing happens until polled"]
        BYOB["Bring Your Own Runtime<br/>You choose the executor"]
        OWNED["Ownership Applies<br/>Lifetimes, Send, Sync matter"]
    end

    EAGER -. "opposite" .-> LAZY
    BUILTIN -. "opposite" .-> BYOB
    GC -. "opposite" .-> OWNED

    style LAZY fill:#e8f5e8,color:#000
    style BYOB fill:#e8f5e8,color:#000
    style OWNED fill:#e8f5e8,color:#000
    style EAGER fill:#e3f2fd,color:#000
    style BUILTIN fill:#e3f2fd,color:#000
    style GC fill:#e3f2fd,color:#000

No Built-In Runtime

// This compiles but does NOTHING:
async fn fetch_data() -> String {
    "hello".to_string()
}

fn main() {
    let future = fetch_data(); // Creates the Future, but doesn't execute it
    // future is just a struct sitting on the stack
    // No output, no side effects, nothing happens
    drop(future); // Silently dropped — work was never started
}

Compare with C# where Task starts eagerly:

// C# — this immediately starts executing:
async Task<string> FetchData() => "hello";

var task = FetchData(); // Already running!
var result = await task; // Just waits for completion

Lazy Futures vs Eager Tasks

This is the single most important mental shift:

C# / JavaScript / PythonGoRust
CreationTask starts executing immediatelyGoroutine starts immediatelyFuture does nothing until polled
DroppingDetached task continues runningGoroutine runs until returnDropping a Future cancels it
RuntimeBuilt into the language/VMBuilt into the binary (M:N scheduler)You choose (tokio, smol, etc.)
SchedulingAutomatic (thread pool)Automatic (GMP scheduler)Explicit (spawn, block_on)
CancellationCancellationToken (cooperative)context.Context (cooperative)Drop the future (immediate)
// To actually RUN a future, you need an executor:
#[tokio::main]
async fn main() {
    let result = fetch_data().await; // NOW it executes
    println!("{result}");
}

When to Use Async (and When Not To)

graph TD
    START["What kind of work?"]

    IO["I/O-bound?<br/>(network, files, DB)"]
    CPU["CPU-bound?<br/>(computation, parsing)"]
    MANY["Many concurrent connections?<br/>(100+)"]
    FEW["Few concurrent tasks?<br/>(<10)"]

    USE_ASYNC["✅ Use async/await"]
    USE_THREADS["✅ Use std::thread or rayon"]
    USE_SPAWN_BLOCKING["✅ Use spawn_blocking()"]
    MAYBE_SYNC["Consider synchronous code<br/>(simpler, less overhead)"]

    START -->|Network, files, DB| IO
    START -->|Computation| CPU
    IO -->|Yes, many| MANY
    IO -->|Just a few| FEW
    MANY --> USE_ASYNC
    FEW --> MAYBE_SYNC
    CPU -->|Parallelize| USE_THREADS
    CPU -->|Inside async context| USE_SPAWN_BLOCKING

    style USE_ASYNC fill:#c8e6c9,color:#000
    style USE_THREADS fill:#c8e6c9,color:#000
    style USE_SPAWN_BLOCKING fill:#c8e6c9,color:#000
    style MAYBE_SYNC fill:#fff3e0,color:#000

Rule of thumb: Async is for I/O concurrency (doing many things at once while waiting), not CPU parallelism (making one thing faster). If you have 10,000 network connections, async shines. If you’re crunching numbers, use rayon or OS threads.

When Async Can Be Slower

Async isn’t free. For low-concurrency workloads, synchronous code can outperform async:

CostWhy
State machine overheadEach .await adds an enum variant; deeply nested futures produce large, complex state machines
Dynamic dispatchBox<dyn Future> adds indirection and kills inlining
Context switchingCooperative scheduling still has cost — the executor must manage a task queue, wakers, and I/O registrations
Compile timeAsync code generates more complex types, slowing down compilation
DebuggabilityStack traces through state machines are harder to read (see Ch. 12)

Benchmarking guidance: If fewer than ~10 concurrent I/O operations, profile before committing to async. A simple std::thread::spawn per connection scales fine to hundreds of threads on modern Linux.

Exercise: When Would You Use Async?

🏋️ Exercise (click to expand)

For each scenario, decide whether async is appropriate and explain why:

  1. A web server handling 10,000 concurrent WebSocket connections
  2. A CLI tool that compresses a single large file
  3. A service that queries 5 different databases and merges results
  4. A game engine running a physics simulation at 60 FPS
🔑 Solution
  1. Async — I/O-bound with massive concurrency. Each connection spends most time waiting for data. Threads would require 10K stacks.
  2. Sync/threads — CPU-bound, single task. Async adds overhead with no benefit. Use rayon for parallel compression.
  3. Async — Five concurrent I/O waits. tokio::join! runs all five queries simultaneously.
  4. Sync/threads — CPU-bound, latency-sensitive. Async’s cooperative scheduling could introduce frame jitter.

Key Takeaways — Why Async is Different

  • Rust futures are lazy — they do nothing until polled by an executor
  • There is no built-in runtime — you choose (or build) your own
  • Async is a zero-cost compilation strategy that produces state machines
  • Async shines for I/O-bound concurrency; for CPU-bound work, use threads or rayon

See also: Ch 2 — The Future Trait for the trait that makes this all work, Ch 7 — Executors and Runtimes for choosing your runtime


2. The Future Trait 🟡

What you’ll learn:

  • The Future trait: Output, poll(), Context, Waker
  • How a waker tells the executor “poll me again”
  • The contract: never call wake() = your program silently hangs
  • Implementing a real future by hand (Delay)

Anatomy of a Future

Everything in async Rust ultimately implements this trait:

#![allow(unused)]
fn main() {
pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),   // The future has completed with value T
    Pending,    // The future is not ready yet — call me back later
}
}

That’s it. A Future is anything that can be polled — asked “are you done yet?” — and responds with either “yes, here’s the result” or “not yet, I’ll wake you up when I’m ready.”

Output, poll(), Context, Waker

sequenceDiagram
    participant E as Executor
    participant F as Future
    participant R as Resource (I/O)

    E->>F: poll(cx)
    F->>R: Check: is data ready?
    R-->>F: Not yet
    F->>R: Register waker from cx
    F-->>E: Poll::Pending

    Note over R: ... time passes, data arrives ...

    R->>E: waker.wake() — "I'm ready!"
    E->>F: poll(cx) — try again
    F->>R: Check: is data ready?
    R-->>F: Yes! Here's the data
    F-->>E: Poll::Ready(data)

Let’s break down each piece:

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// A future that returns 42 immediately
struct Ready42;

impl Future for Ready42 {
    type Output = i32; // What the future eventually produces

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<i32> {
        Poll::Ready(42) // Always ready — no waiting
    }
}
}

The components:

  • Output — the type of value produced when the future completes
  • poll() — called by the executor to check progress; returns Ready(value) or Pending
  • Pin<&mut Self> — ensures the future won’t be moved in memory (we’ll cover why in Ch. 4)
  • Context — carries the Waker so the future can signal the executor when it’s ready to make progress

The Waker Contract

The Waker is the callback mechanism. When a future returns Pending, it must arrange for waker.wake() to be called later — otherwise the executor will never poll it again and the program hangs.

#![allow(unused)]
fn main() {
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

/// A future that completes after a delay (toy implementation)
struct Delay {
    completed: Arc<Mutex<bool>>,
    waker_stored: Arc<Mutex<Option<Waker>>>,
    duration: Duration,
    started: bool,
}

impl Delay {
    fn new(duration: Duration) -> Self {
        Delay {
            completed: Arc::new(Mutex::new(false)),
            waker_stored: Arc::new(Mutex::new(None)),
            duration,
            started: false,
        }
    }
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // Check if already completed
        if *self.completed.lock().unwrap() {
            return Poll::Ready(());
        }

        // Store the waker so the background thread can wake us
        *self.waker_stored.lock().unwrap() = Some(cx.waker().clone());

        // Start the background timer on first poll
        if !self.started {
            self.started = true;
            let completed = Arc::clone(&self.completed);
            let waker = Arc::clone(&self.waker_stored);
            let duration = self.duration;

            thread::spawn(move || {
                thread::sleep(duration);
                *completed.lock().unwrap() = true;

                // CRITICAL: wake the executor so it polls us again
                if let Some(w) = waker.lock().unwrap().take() {
                    w.wake(); // "Hey executor, I'm ready — poll me again!"
                }
            });
        }

        Poll::Pending // Not done yet
    }
}
}

Key insight: In C#, the TaskScheduler handles waking automatically. In Rust, you (or the I/O library you use) are responsible for calling waker.wake(). Forget it, and your program silently hangs.

Exercise: Implement a CountdownFuture

🏋️ Exercise (click to expand)

Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!").

Hint: The future needs to store the current count and decrement it on each poll. Remember to always re-register the waker!

🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountdownFuture {
    count: u32,
}

impl CountdownFuture {
    fn new(start: u32) -> Self {
        CountdownFuture { count: start }
    }
}

impl Future for CountdownFuture {
    type Output = &'static str;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            println!("Liftoff!");
            Poll::Ready("Liftoff!")
        } else {
            println!("{}...", self.count);
            self.count -= 1;
            cx.waker().wake_by_ref(); // Schedule re-poll immediately
            Poll::Pending
        }
    }
}
}

Key takeaway: This future is polled once per count. Each time it returns Pending, it immediately wakes itself to be polled again. In production, you’d use a timer instead of busy-polling.

Key Takeaways — The Future Trait

  • Future::poll() returns Poll::Ready(value) or Poll::Pending
  • A future must register a Waker before returning Pending — the executor uses it to know when to re-poll
  • Pin<&mut Self> guarantees the future won’t be moved in memory (needed for self-referential state machines — see Ch 4)
  • Everything in async Rust — async fn, .await, combinators — is built on this one trait

See also: Ch 3 — How Poll Works for the executor loop, Ch 6 — Building Futures by Hand for more complex implementations


3. How Poll Works 🟡

What you’ll learn:

  • The executor’s poll loop: poll → pending → wake → poll again
  • How to build a minimal executor from scratch
  • Spurious wake rules and why they matter
  • Utility functions: poll_fn() and yield_now()

The Polling State Machine

The executor runs a loop: poll a future, if it’s Pending, park it until its waker fires, then poll again. This is fundamentally different from OS threads where the kernel handles scheduling.

stateDiagram-v2
    [*] --> Idle : Future created
    Idle --> Polling : executor calls poll()
    Polling --> Complete : Ready(value)
    Polling --> Waiting : Pending
    Waiting --> Polling : waker.wake() called
    Complete --> [*] : Value returned

Important: While in the Waiting state the future must have registered the waker with an I/O source. No registration = hang forever.

A Minimal Executor

To demystify executors, let’s build the simplest possible one:

use std::future::Future;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::pin::Pin;

/// The simplest possible executor: busy-loop poll until Ready
fn block_on<F: Future>(mut future: F) -> F::Output {
    // Pin the future on the stack
    // SAFETY: `future` is never moved after this point — we only
    // access it through the pinned reference until it completes.
    let mut future = unsafe { Pin::new_unchecked(&mut future) };

    // Create a no-op waker (just keeps polling — inefficient but simple)
    fn noop_raw_waker() -> RawWaker {
        fn no_op(_: *const ()) {}
        fn clone(_: *const ()) -> RawWaker { noop_raw_waker() }
        let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
        RawWaker::new(std::ptr::null(), vtable)
    }

    // SAFETY: noop_raw_waker() returns a valid RawWaker with a correct vtable.
    let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
    let mut cx = Context::from_waker(&waker);

    // Busy-loop until the future completes
    loop {
        match future.as_mut().poll(&mut cx) {
            Poll::Ready(value) => return value,
            Poll::Pending => {
                // A real executor would park the thread here
                // and wait for waker.wake() — we just spin
                std::thread::yield_now();
            }
        }
    }
}

// Usage:
fn main() {
    let result = block_on(async {
        println!("Hello from our mini executor!");
        42
    });
    println!("Got: {result}");
}

Don’t use this in production! It busy-loops, wasting CPU. Real executors (tokio, smol) use epoll/kqueue/io_uring to sleep until I/O is ready. But this shows the core idea: an executor is just a loop that calls poll().

Wake-Up Notifications

A real executor is event-driven. When all futures are Pending, the executor sleeps. The waker is an interrupt mechanism:

#![allow(unused)]
fn main() {
// Conceptual model of a real executor's main loop:
fn executor_loop(tasks: &mut TaskQueue) {
    loop {
        // 1. Poll all tasks that have been woken
        while let Some(task) = tasks.get_woken_task() {
            match task.poll() {
                Poll::Ready(result) => task.complete(result),
                Poll::Pending => { /* task stays in queue, waiting for wake */ }
            }
        }

        // 2. Sleep until something wakes us up (epoll_wait, kevent, etc.)
        //    This is where mio/polling does the heavy lifting
        tasks.wait_for_events(); // blocks until an I/O event or waker fires
    }
}
}

Spurious Wakes

A future may be polled even when its I/O isn’t ready. This is called a spurious wake. Futures must handle this correctly:

#![allow(unused)]
fn main() {
impl Future for MyFuture {
    type Output = Data;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Data> {
        // ✅ CORRECT: Always re-check the actual condition
        if let Some(data) = self.try_read_data() {
            Poll::Ready(data)
        } else {
            // Re-register the waker (it might have changed!)
            self.register_waker(cx.waker());
            Poll::Pending
        }

        // ❌ WRONG: Assuming poll means data is ready
        // let data = self.read_data(); // might block or panic
        // Poll::Ready(data)
    }
}
}

Rules for implementing poll():

  1. Never block — return Pending immediately if not ready
  2. Always re-register the waker — it may have changed between polls
  3. Handle spurious wakes — check the actual condition, don’t assume readiness
  4. Don’t poll after Ready — behavior is unspecified (may panic, return Pending, or repeat Ready). Only FusedFuture guarantees safe post-completion polling
🏋️ Exercise: Implement a CountdownFuture (click to expand)

Challenge: Implement a CountdownFuture that counts down from N to 0, printing the current count as a side-effect each time it’s polled. When it reaches 0, it completes with Ready("Liftoff!"). (Note: a Future produces only one final value — the printing is a side-effect, not a yielded value. For multiple async values, see Stream in Ch. 11.)

Hint: This doesn’t need a real I/O source — it can wake itself immediately with cx.waker().wake_by_ref() after each decrement.

🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountdownFuture {
    count: u32,
}

impl CountdownFuture {
    fn new(start: u32) -> Self {
        CountdownFuture { count: start }
    }
}

impl Future for CountdownFuture {
    type Output = &'static str;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            Poll::Ready("Liftoff!")
        } else {
            println!("{}...", self.count);
            self.count -= 1;
            // Wake immediately — we're always ready to make progress
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

// Usage with our mini executor or tokio:
// let msg = block_on(CountdownFuture::new(5));
// prints: 5... 4... 3... 2... 1...
// msg == "Liftoff!"
}

Key takeaway: Even though this future is always ready to progress, it returns Pending to yield control between steps. It calls wake_by_ref() immediately so the executor re-polls it right away. This is the basis of cooperative multitasking — each future voluntarily yields.

Handy Utilities: poll_fn and yield_now

Two utilities from the standard library and tokio that avoid writing full Future impls:

#![allow(unused)]
fn main() {
use std::future::poll_fn;
use std::task::Poll;

// poll_fn: create a one-off future from a closure
let value = poll_fn(|cx| {
    // Do something with cx.waker(), return Ready or Pending
    Poll::Ready(42)
}).await;

// Real-world use: bridge a callback-based API into async
async fn read_when_ready(source: &MySource) -> Data {
    poll_fn(|cx| source.poll_read(cx)).await
}
}
#![allow(unused)]
fn main() {
// yield_now: voluntarily yield control to the executor
// Useful in CPU-heavy async loops to avoid starving other tasks
async fn cpu_heavy_work(items: &[Item]) {
    for (i, item) in items.iter().enumerate() {
        process(item); // CPU work

        // Every 100 items, yield to let other tasks run
        if i % 100 == 0 {
            tokio::task::yield_now().await;
        }
    }
}
}

When to use yield_now(): If your async function does CPU work in a loop without any .await points, it monopolizes the executor thread. Insert yield_now().await periodically to enable cooperative multitasking.

Key Takeaways — How Poll Works

  • An executor repeatedly calls poll() on futures that have been woken
  • Futures must handle spurious wakes — always re-check the actual condition
  • poll_fn() lets you create ad-hoc futures from closures
  • yield_now() is a cooperative scheduling escape hatch for CPU-heavy async code

See also: Ch 2 — The Future Trait for the trait definition, Ch 5 — The State Machine Reveal for what the compiler generates


4. Pin and Unpin 🔴

What you’ll learn:

  • Why self-referential structs break when moved in memory
  • What Pin<P> guarantees and how it prevents moves
  • The three practical pinning patterns: Box::pin(), tokio::pin!(), Pin::new()
  • When Unpin gives you an escape hatch

Why Pin Exists

This is the most confusing concept in async Rust. Let’s build the intuition step by step.

The Problem: Self-Referential Structs

When the compiler transforms an async fn into a state machine, that state machine may contain references to its own fields. This creates a self-referential struct — and moving it in memory would invalidate those internal references.

#![allow(unused)]
fn main() {
// What the compiler generates (simplified) for:
// async fn example() {
//     let data = vec![1, 2, 3];
//     let reference = &data;       // Points to data above
//     use_ref(reference).await;
// }

// Becomes something like:
enum ExampleStateMachine {
    State0 {
        data: Vec<i32>,
        // reference: &Vec<i32>,  // PROBLEM: points to `data` above
        //                        // If this struct moves, the pointer is dangling!
    },
    State1 {
        data: Vec<i32>,
        reference: *const Vec<i32>, // Internal pointer to data field
    },
    Complete,
}
}
graph LR
    subgraph "Before Move (Valid)"
        A["data: [1,2,3]<br/>at addr 0x1000"]
        B["reference: 0x1000<br/>(points to data)"]
        B -->|"valid"| A
    end

    subgraph "After Move (INVALID)"
        C["data: [1,2,3]<br/>at addr 0x2000"]
        D["reference: 0x1000<br/>(still points to OLD location!)"]
        D -->|"dangling!"| E["💥 0x1000<br/>(freed/garbage)"]
    end

    style E fill:#ffcdd2,color:#000
    style D fill:#ffcdd2,color:#000
    style B fill:#c8e6c9,color:#000

Self-Referential Structs

This isn’t an academic concern. Every async fn that holds a reference across an .await point creates a self-referential state machine:

#![allow(unused)]
fn main() {
async fn problematic() {
    let data = String::from("hello");
    let slice = &data[..]; // slice borrows data
    
    some_io().await; // <-- .await point: state machine stores both data AND slice
    
    println!("{slice}"); // uses the reference after await
}
// The generated state machine has `data: String` and `slice: &str`
// where slice points INTO data. Moving the state machine = dangling pointer.
}

Pin in Practice

Pin<P> is a wrapper that prevents moving the value behind the pointer:

#![allow(unused)]
fn main() {
use std::pin::Pin;

let mut data = String::from("hello");

// Pin it — now it can't be moved
let pinned: Pin<&mut String> = Pin::new(&mut data);

// Can still use it:
println!("{}", pinned.as_ref().get_ref()); // "hello"

// But we can't get &mut String back (which would allow mem::swap):
// let mutable: &mut String = Pin::into_inner(pinned); // Only if String: Unpin
// String IS Unpin, so this actually works for String.
// But for self-referential state machines (which are !Unpin), it's blocked.
}

In real code, you mostly encounter Pin in three places:

#![allow(unused)]
fn main() {
// 1. poll() signature — all futures are polled through Pin
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output>;

// 2. Box::pin() — heap-allocate and pin a future
let future: Pin<Box<dyn Future<Output = i32>>> = Box::pin(async { 42 });

// 3. tokio::pin!() — pin a future on the stack
tokio::pin!(my_future);
// Now my_future: Pin<&mut impl Future>
}

The Unpin Escape Hatch

Most types in Rust are Unpin — they don’t contain self-references, so pinning is a no-op. Only compiler-generated state machines (from async fn) are !Unpin.

#![allow(unused)]
fn main() {
// These are all Unpin — pinning them does nothing special:
// i32, String, Vec<T>, HashMap<K,V>, Box<T>, &T, &mut T

// These are !Unpin — they MUST be pinned before polling:
// The state machines generated by `async fn` and `async {}`

// Practical implication:
// If you write a Future by hand and it has NO self-references,
// implement Unpin to make it easier to work with:
impl Unpin for MySimpleFuture {} // "I'm safe to move, trust me"
}

Quick Reference

WhatWhenHow
Pin a future on the heapStoring in a collection, returning from functionBox::pin(future)
Pin a future on the stackLocal use in select! or manual pollingtokio::pin!(future) or pin_mut! from pin-utils
Pin in function signatureAccepting pinned futuresfuture: Pin<&mut F>
Require UnpinWhen you need to move a future after creationF: Future + Unpin
🏋️ Exercise: Pin and Move (click to expand)

Challenge: Which of these code snippets compile? For each one that doesn’t, explain why and fix it.

#![allow(unused)]
fn main() {
// Snippet A
let fut = async { 42 };
let pinned = Box::pin(fut);
let moved = pinned; // Move the Box
let result = moved.await;

// Snippet B
let fut = async { 42 };
tokio::pin!(fut);
let moved = fut; // Move the pinned future
let result = moved.await;

// Snippet C
use std::pin::Pin;
let mut fut = async { 42 };
let pinned = Pin::new(&mut fut);
}
🔑 Solution

Snippet A: ✅ Compiles. Box::pin() puts the future on the heap. Moving the Box moves the pointer, not the future itself. The future stays pinned in its heap location.

Snippet B: ❌ Does not compile. tokio::pin! pins the future to the stack and rebinds fut as Pin<&mut ...>. You can’t move out of a pinned reference. Fix: Don’t move it — use it in place:

#![allow(unused)]
fn main() {
let fut = async { 42 };
tokio::pin!(fut);
let result = fut.await; // Use directly, don't reassign
}

Snippet C: ❌ Does not compile. Pin::new() requires T: Unpin. Async blocks generate !Unpin types. Fix: Use Box::pin() or unsafe Pin::new_unchecked():

#![allow(unused)]
fn main() {
let fut = async { 42 };
let pinned = Box::pin(fut); // Heap-pin — works with !Unpin
}

Key takeaway: Box::pin() is the safe, easy way to pin !Unpin futures. tokio::pin!() pins on the stack but the future can’t be moved after. Pin::new() only works with Unpin types.

Key Takeaways — Pin and Unpin

  • Pin<P> is a wrapper that prevents the pointee from being moved — essential for self-referential state machines
  • Box::pin() is the safe, easy default for pinning futures on the heap
  • tokio::pin!() pins on the stack — cheaper but the future can’t be moved afterward
  • Unpin is an auto-trait opt-out: types that implement Unpin can be moved even when pinned (most types are Unpin; async blocks are not)

See also: Ch 2 — The Future Trait for Pin<&mut Self> in poll, Ch 5 — The State Machine Reveal for why async state machines are self-referential


5. The State Machine Reveal 🟢

What you’ll learn:

  • How the compiler transforms async fn into an enum state machine
  • Side-by-side comparison: source code vs generated states
  • Why large stack allocations in async fn blow up future sizes
  • The drop optimization: values drop as soon as they’re no longer needed

What the Compiler Actually Generates

When you write async fn, the compiler transforms your sequential-looking code into an enum-based state machine. Understanding this transformation is the key to understanding async Rust’s performance characteristics and many of its quirks.

Side-by-Side: async fn vs State Machine

#![allow(unused)]
fn main() {
// What you write:
async fn fetch_two_pages() -> String {
    let page1 = http_get("https://example.com/a").await;
    let page2 = http_get("https://example.com/b").await;
    format!("{page1}\n{page2}")
}
}

The compiler generates something conceptually like this:

#![allow(unused)]
fn main() {
enum FetchTwoPagesStateMachine {
    // State 0: About to call http_get for page1
    Start,

    // State 1: Waiting for page1, holding the future
    WaitingPage1 {
        fut1: HttpGetFuture,
    },

    // State 2: Got page1, waiting for page2
    WaitingPage2 {
        page1: String,
        fut2: HttpGetFuture,
    },

    // Terminal state
    Complete,
}

impl Future for FetchTwoPagesStateMachine {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
        loop {
            match self.as_mut().get_mut() {
                Self::Start => {
                    let fut1 = http_get("https://example.com/a");
                    *self.as_mut().get_mut() = Self::WaitingPage1 { fut1 };
                }
                Self::WaitingPage1 { fut1 } => {
                    let page1 = match Pin::new(fut1).poll(cx) {
                        Poll::Ready(v) => v,
                        Poll::Pending => return Poll::Pending,
                    };
                    let fut2 = http_get("https://example.com/b");
                    *self.as_mut().get_mut() = Self::WaitingPage2 { page1, fut2 };
                }
                Self::WaitingPage2 { page1, fut2 } => {
                    let page2 = match Pin::new(fut2).poll(cx) {
                        Poll::Ready(v) => v,
                        Poll::Pending => return Poll::Pending,
                    };
                    let result = format!("{page1}\n{page2}");
                    *self.as_mut().get_mut() = Self::Complete;
                    return Poll::Ready(result);
                }
                Self::Complete => panic!("polled after completion"),
            }
        }
    }
}
}

Note: This desugaring is conceptual. The real compiler output uses unsafe pin projections — the get_mut() calls shown here require Unpin, but async state machines are !Unpin. The goal is to illustrate state transitions, not produce compilable code.

stateDiagram-v2
    [*] --> Start
    Start --> WaitingPage1: Create http_get future #1
    WaitingPage1 --> WaitingPage1: poll() → Pending
    WaitingPage1 --> WaitingPage2: poll() → Ready(page1)
    WaitingPage2 --> WaitingPage2: poll() → Pending
    WaitingPage2 --> Complete: poll() → Ready(page2)
    Complete --> [*]: Return format!("{page1}\\n{page2}")

State contents:

  • WaitingPage1 — stores fut1: HttpGetFuture (page2 not yet allocated)
  • WaitingPage2 — stores page1: String, fut2: HttpGetFuture (fut1 has been dropped)

Why This Matters for Performance

Zero-cost: The state machine is a stack-allocated enum. No heap allocation per future, no garbage collector, no boxing — unless you explicitly use Box::pin().

Size: The enum’s size is the maximum of all its variants. Each .await point creates a new variant. This means:

#![allow(unused)]
fn main() {
async fn small() {
    let a: u8 = 0;
    yield_now().await;
    let b: u8 = 0;
    yield_now().await;
}
// Size ≈ max(size_of(u8), size_of(u8)) + discriminant + future sizes
//      ≈ small!

async fn big() {
    let buf: [u8; 1_000_000] = [0; 1_000_000]; // 1MB on the stack!
    some_io().await;
    process(&buf);
}
// Size ≈ 1MB + inner future sizes
// ⚠️ Don't stack-allocate huge buffers in async functions!
// Use Vec<u8> or Box<[u8]> instead.
}

Drop optimization: When a state machine transitions, it drops values no longer needed. In the example above, fut1 is dropped when we transition from WaitingPage1 to WaitingPage2 — the compiler inserts the drop automatically.

Practical rule: Large stack allocations in async fn blow up the future’s size. If you see stack overflows in async code, check for large arrays or deeply nested futures. Use Box::pin() to heap-allocate sub-futures if needed.

Exercise: Predict the State Machine

🏋️ Exercise (click to expand)

Challenge: Given this async function, sketch the state machine the compiler generates. How many states (enum variants) does it have? What values are stored in each?

#![allow(unused)]
fn main() {
async fn pipeline(url: &str) -> Result<usize, Error> {
    let response = fetch(url).await?;
    let body = response.text().await?;
    let parsed = parse(body).await?;
    Ok(parsed.len())
}
}
🔑 Solution

Four states:

  1. Start — stores url
  2. WaitingFetch — stores url, fetch future
  3. WaitingText — stores response, text() future
  4. WaitingParse — stores body, parse future
  5. Done — returned Ok(parsed.len())

Each .await creates a yield point = a new enum variant. The ? adds early-exit paths but doesn’t add extra states — it’s just a match on the Poll::Ready value.

Key Takeaways — The State Machine Reveal

  • async fn compiles to an enum with one variant per .await point
  • The future’s size = max of all variant sizes — large stack values blow it up
  • The compiler inserts drops at state transitions automatically
  • Use Box::pin() or heap allocation when future size becomes a problem

See also: Ch 4 — Pin and Unpin for why the generated enum needs pinning, Ch 6 — Building Futures by Hand to build these state machines yourself


6. Building Futures by Hand 🟡

What you’ll learn:

  • Implementing a TimerFuture with thread-based waking
  • Building a Join combinator: run two futures concurrently
  • Building a Select combinator: race two futures
  • How combinators compose — futures all the way down

A Simple Timer Future

Now let’s build real, useful futures from scratch. This cements the theory from chapters 2-5.

TimerFuture: A Complete Example

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn a thread that sets completed=true after the duration
        let thread_shared_state = Arc::clone(&shared_state);
        thread::spawn(move || {
            thread::sleep(duration);
            let mut state = thread_shared_state.lock().unwrap();
            state.completed = true;
            if let Some(waker) = state.waker.take() {
                waker.wake(); // Notify the executor
            }
        });

        TimerFuture { shared_state }
    }
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let mut state = self.shared_state.lock().unwrap();
        if state.completed {
            Poll::Ready(())
        } else {
            // Store the waker so the timer thread can wake us
            // IMPORTANT: Always update the waker — the executor may
            // have changed it between polls
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

// Usage:
// async fn example() {
//     println!("Starting timer...");
//     TimerFuture::new(Duration::from_secs(2)).await;
//     println!("Timer done!");
// }
//
// ⚠️ This spawns an OS thread per timer — fine for learning, but in
// production use `tokio::time::sleep` which is backed by a shared
// timer wheel and requires zero extra threads.
}

Join: Running Two Futures Concurrently

Join polls two futures and completes when both finish. This is how tokio::join! works internally:

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Polls two futures concurrently, returns both results as a tuple
pub struct Join<A, B>
where
    A: Future,
    B: Future,
{
    a: MaybeDone<A>,
    b: MaybeDone<B>,
}

enum MaybeDone<F: Future> {
    Pending(F),
    Done(F::Output),
    Taken, // Output has been taken
}

impl<A, B> Join<A, B>
where
    A: Future,
    B: Future,
{
    pub fn new(a: A, b: B) -> Self {
        Join {
            a: MaybeDone::Pending(a),
            b: MaybeDone::Pending(b),
        }
    }
}

impl<A, B> Future for Join<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    type Output = (A::Output, B::Output);

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll A if not done
        if let MaybeDone::Pending(ref mut fut) = self.a {
            if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
                self.a = MaybeDone::Done(val);
            }
        }

        // Poll B if not done
        if let MaybeDone::Pending(ref mut fut) = self.b {
            if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
                self.b = MaybeDone::Done(val);
            }
        }

        // Both done?
        match (&self.a, &self.b) {
            (MaybeDone::Done(_), MaybeDone::Done(_)) => {
                // Take both outputs
                let a_val = match std::mem::replace(&mut self.a, MaybeDone::Taken) {
                    MaybeDone::Done(v) => v,
                    _ => unreachable!(),
                };
                let b_val = match std::mem::replace(&mut self.b, MaybeDone::Taken) {
                    MaybeDone::Done(v) => v,
                    _ => unreachable!(),
                };
                Poll::Ready((a_val, b_val))
            }
            _ => Poll::Pending, // At least one is still pending
        }
    }
}

// Usage:
// let (page1, page2) = Join::new(
//     http_get("https://example.com/a"),
//     http_get("https://example.com/b"),
// ).await;
// Both requests run concurrently!
}

Key insight: “Concurrent” here means interleaved on the same thread. Join doesn’t spawn threads — it polls both futures in the same poll() call. This is cooperative concurrency, not parallelism.

graph LR
    subgraph "Future Combinators"
        direction TB
        TIMER["TimerFuture<br/>Single future, wake after delay"]
        JOIN["Join&lt;A, B&gt;<br/>Wait for BOTH"]
        SELECT["Select&lt;A, B&gt;<br/>Wait for FIRST"]
        RETRY["RetryFuture<br/>Re-create on failure"]
    end

    TIMER --> JOIN
    TIMER --> SELECT
    SELECT --> RETRY

    style TIMER fill:#d4efdf,stroke:#27ae60,color:#000
    style JOIN fill:#e8f4f8,stroke:#2980b9,color:#000
    style SELECT fill:#fef9e7,stroke:#f39c12,color:#000
    style RETRY fill:#fadbd8,stroke:#e74c3c,color:#000

Select: Racing Two Futures

Select completes when either future finishes first (the other is dropped):

#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub enum Either<A, B> {
    Left(A),
    Right(B),
}

/// Returns whichever future completes first; drops the other
pub struct Select<A, B> {
    a: A,
    b: B,
}

impl<A, B> Select<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    pub fn new(a: A, b: B) -> Self {
        Select { a, b }
    }
}

impl<A, B> Future for Select<A, B>
where
    A: Future + Unpin,
    B: Future + Unpin,
{
    type Output = Either<A::Output, B::Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll A first
        if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
            return Poll::Ready(Either::Left(val));
        }

        // Then poll B
        if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
            return Poll::Ready(Either::Right(val));
        }

        Poll::Pending
    }
}

// Usage with timeout:
// match Select::new(http_get(url), TimerFuture::new(timeout)).await {
//     Either::Left(response) => println!("Got response: {}", response),
//     Either::Right(()) => println!("Request timed out!"),
// }
}

Fairness note: Our Select always polls A first — if both are ready, A always wins. Tokio’s select! macro randomizes the poll order for fairness.

🏋️ Exercise: Build a RetryFuture (click to expand)

Challenge: Build a RetryFuture<F, Fut> that takes a closure F: Fn() -> Fut and retries up to N times if the inner future returns Err. It should return the first Ok result or the last Err.

Hint: You’ll need states for “running attempt” and “all attempts exhausted.”

🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>> + Unpin,
{
    factory: F,
    current: Option<Fut>,
    remaining: usize,
    last_error: Option<E>,
}

impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>> + Unpin,
{
    pub fn new(max_attempts: usize, factory: F) -> Self {
        let current = Some((factory)());
        RetryFuture {
            factory,
            current,
            remaining: max_attempts.saturating_sub(1),
            last_error: None,
        }
    }
}

impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut + Unpin,
    Fut: Future<Output = Result<T, E>> + Unpin,
    T: Unpin,
    E: Unpin,
{
    type Output = Result<T, E>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            if let Some(ref mut fut) = self.current {
                match Pin::new(fut).poll(cx) {
                    Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
                    Poll::Ready(Err(e)) => {
                        self.last_error = Some(e);
                        if self.remaining > 0 {
                            self.remaining -= 1;
                            self.current = Some((self.factory)());
                            // Loop to poll the new future immediately
                        } else {
                            return Poll::Ready(Err(self.last_error.take().unwrap()));
                        }
                    }
                    Poll::Pending => return Poll::Pending,
                }
            } else {
                return Poll::Ready(Err(self.last_error.take().unwrap()));
            }
        }
    }
}

// Usage:
// let result = RetryFuture::new(3, || async {
//     http_get("https://flaky-server.com/api").await
// }).await;
}

Key takeaway: The retry future is itself a state machine: it holds the current attempt and creates new inner futures on failure. This is how combinators compose — futures all the way down.

Key Takeaways — Building Futures by Hand

  • A future needs three things: state, a poll() implementation, and a waker registration
  • Join polls both sub-futures; Select returns whichever finishes first
  • Combinators are themselves futures wrapping other futures — it’s turtles all the way down
  • Building futures by hand gives deep insight, but in production use tokio::join!/select!

See also: Ch 2 — The Future Trait for the trait definition, Ch 8 — Tokio Deep Dive for production-grade equivalents


7. Executors and Runtimes 🟡

What you’ll learn:

  • What an executor does: poll + sleep efficiently
  • The six major runtimes: mio, io_uring, tokio, async-std, smol, embassy
  • A decision tree for choosing the right runtime
  • Why runtime-agnostic library design matters

What an Executor Does

An executor has two jobs:

  1. Poll futures when they’re ready to make progress
  2. Sleep efficiently when no futures are ready (using OS I/O notification APIs)
graph TB
    subgraph Executor["Executor (e.g., tokio)"]
        QUEUE["Task Queue"]
        POLLER["I/O Poller<br/>(epoll/kqueue/io_uring)"]
        THREADS["Worker Thread Pool"]
    end

    subgraph Tasks
        T1["Task 1<br/>(HTTP request)"]
        T2["Task 2<br/>(DB query)"]
        T3["Task 3<br/>(File read)"]
    end

    subgraph OS["Operating System"]
        NET["Network Stack"]
        DISK["Disk I/O"]
    end

    T1 --> QUEUE
    T2 --> QUEUE
    T3 --> QUEUE
    QUEUE --> THREADS
    THREADS -->|"poll()"| T1
    THREADS -->|"poll()"| T2
    THREADS -->|"poll()"| T3
    POLLER <-->|"register/notify"| NET
    POLLER <-->|"register/notify"| DISK
    POLLER -->|"wake tasks"| QUEUE

    style Executor fill:#e3f2fd,color:#000
    style OS fill:#f3e5f5,color:#000

mio: The Foundation Layer

mio (Metal I/O) is not an executor — it’s the lowest-level cross-platform I/O notification library. It wraps epoll (Linux), kqueue (macOS/BSD), and IOCP (Windows).

#![allow(unused)]
fn main() {
// Conceptual mio usage (simplified):
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;

let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);

let mut server = TcpListener::bind("0.0.0.0:8080")?;
poll.registry().register(&mut server, Token(0), Interest::READABLE)?;

// Event loop — blocks until something happens
loop {
    poll.poll(&mut events, None)?; // Sleeps until I/O event
    for event in events.iter() {
        match event.token() {
            Token(0) => { /* server has a new connection */ }
            _ => { /* other I/O ready */ }
        }
    }
}
}

Most developers never touch mio directly — tokio and smol build on top of it.

io_uring: The Completion-Based Future

Linux’s io_uring (kernel 5.1+) represents a fundamental shift from the readiness-based I/O model that mio/epoll use:

Readiness-based (epoll / mio / tokio):
  1. Ask: "Is this socket readable?"     → epoll_wait()
  2. Kernel: "Yes, it's ready"           → EPOLLIN event
  3. App:   read(fd, buf)                → might still block briefly!

Completion-based (io_uring):
  1. Submit: "Read from this socket into this buffer"  → SQE
  2. Kernel: does the read asynchronously
  3. App:   gets completed result with data            → CQE
graph LR
    subgraph "Readiness Model (epoll)"
        A1["App: is it ready?"] --> K1["Kernel: yes"]
        K1 --> A2["App: now read()"]
        A2 --> K2["Kernel: here's data"]
    end

    subgraph "Completion Model (io_uring)"
        B1["App: read this for me"] --> K3["Kernel: working..."]
        K3 --> B2["App: got result + data"]
    end

    style B1 fill:#c8e6c9,color:#000
    style B2 fill:#c8e6c9,color:#000

The ownership challenge: io_uring requires the kernel to own the buffer until the operation completes. This conflicts with Rust’s standard AsyncRead trait which borrows the buffer. That’s why tokio-uring has different I/O traits:

#![allow(unused)]
fn main() {
// Standard tokio (readiness-based) — borrows the buffer:
let n = stream.read(&mut buf).await?;  // buf is borrowed

// tokio-uring (completion-based) — takes ownership of the buffer:
let (result, buf) = stream.read(buf).await;  // buf is moved in, returned back
let n = result?;
}
// Cargo.toml: tokio-uring = "0.5"
// NOTE: Linux-only, requires kernel 5.1+

fn main() {
    tokio_uring::start(async {
        let file = tokio_uring::fs::File::open("data.bin").await.unwrap();
        let buf = vec![0u8; 4096];
        let (result, buf) = file.read_at(buf, 0).await;
        let bytes_read = result.unwrap();
        println!("Read {} bytes: {:?}", bytes_read, &buf[..bytes_read]);
    });
}
Aspectepoll (tokio)io_uring (tokio-uring)
ModelReadiness notificationCompletion notification
Syscallsepoll_wait + read/writeBatched SQE/CQE ring
Buffer ownershipApp retains (&mut buf)Ownership transfer (move buf)
PlatformLinux, macOS (kqueue), Windows (IOCP)Linux 5.1+ only
Zero-copyNo (userspace copy)Yes (registered buffers)
MaturityProduction-readyExperimental

When to use io_uring: High-throughput file I/O or networking where syscall overhead is the bottleneck (databases, storage engines, proxies serving 100k+ connections). For most applications, standard tokio with epoll is the right choice.

tokio: The Batteries-Included Runtime

The dominant async runtime in the Rust ecosystem. Used by Axum, Hyper, Tonic, and most production Rust servers.

// Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }

#[tokio::main]
async fn main() {
    // Spawns a multi-threaded runtime with work-stealing scheduler
    let handle = tokio::spawn(async {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        "done"
    });

    let result = handle.await.unwrap();
    println!("{result}");
}

tokio features: Timer, I/O, TCP/UDP, Unix sockets, signal handling, sync primitives (Mutex, RwLock, Semaphore, channels), fs, process, tracing integration.

async-std: The Standard Library Mirror

Mirrors the std API with async versions. Less popular than tokio but simpler for beginners.

// Cargo.toml:
// [dependencies]
// async-std = { version = "1", features = ["attributes"] }

#[async_std::main]
async fn main() {
    use async_std::fs;
    let content = fs::read_to_string("hello.txt").await.unwrap();
    println!("{content}");
}

smol: The Minimalist Runtime

Small, zero-dependency async runtime. Great for libraries that want async without pulling in tokio.

// Cargo.toml:
// [dependencies]
// smol = "2"

fn main() {
    smol::block_on(async {
        let result = smol::unblock(|| {
            // Runs blocking code on a thread pool
            std::fs::read_to_string("hello.txt")
        }).await.unwrap();
        println!("{result}");
    });
}

embassy: Async for Embedded (no_std)

Async runtime for embedded systems. No heap allocation, no std required.

// Runs on microcontrollers (e.g., STM32, nRF52, RP2040)
#[embassy_executor::main]
async fn main(spawner: embassy_executor::Spawner) {
    // Blink an LED with async/await — no RTOS needed!
    let mut led = Output::new(p.PA5, Level::Low, Speed::Low);
    loop {
        led.set_high();
        Timer::after(Duration::from_millis(500)).await;
        led.set_low();
        Timer::after(Duration::from_millis(500)).await;
    }
}

Runtime Decision Tree

graph TD
    START["Choosing a Runtime"]

    Q1{"Building a<br/>network server?"}
    Q2{"Need tokio ecosystem<br/>(Axum, Tonic, Hyper)?"}
    Q3{"Building a library?"}
    Q4{"Embedded /<br/>no_std?"}
    Q5{"Want minimal<br/>dependencies?"}

    TOKIO["🟢 tokio<br/>Best ecosystem, most popular"]
    SMOL["🔵 smol<br/>Minimal, no ecosystem lock-in"]
    EMBASSY["🟠 embassy<br/>Embedded-first, no alloc"]
    ASYNC_STD["🟣 async-std<br/>std-like API, good for learning"]
    AGNOSTIC["🔵 runtime-agnostic<br/>Use futures crate only"]

    START --> Q1
    Q1 -->|Yes| Q2
    Q1 -->|No| Q3
    Q2 -->|Yes| TOKIO
    Q2 -->|No| Q5
    Q3 -->|Yes| AGNOSTIC
    Q3 -->|No| Q4
    Q4 -->|Yes| EMBASSY
    Q4 -->|No| Q5
    Q5 -->|Yes| SMOL
    Q5 -->|No| ASYNC_STD

    style TOKIO fill:#c8e6c9,color:#000
    style SMOL fill:#bbdefb,color:#000
    style EMBASSY fill:#ffe0b2,color:#000
    style ASYNC_STD fill:#e1bee7,color:#000
    style AGNOSTIC fill:#bbdefb,color:#000

Runtime Comparison Table

Featuretokioasync-stdsmolembassy
EcosystemDominantSmallMinimalEmbedded
Multi-threaded✅ Work-stealing❌ (single-core)
no_std
Timer✅ Built-in✅ Built-inVia async-io✅ HAL-based
I/O✅ Own abstractions✅ std mirror✅ Via async-io✅ HAL drivers
Channels✅ Rich setVia async-channel
Learning curveMediumLowLowHigh (HW)
Binary sizeLargeMediumSmallTiny
🏋️ Exercise: Runtime Comparison (click to expand)

Challenge: Write the same program using three different runtimes (tokio, smol, and async-std). The program should:

  1. Fetch a URL (simulate with a sleep)
  2. Read a file (simulate with a sleep)
  3. Print both results

This exercise demonstrates that the async/await code is the same — only the runtime setup differs.

🔑 Solution
// ----- tokio version -----
// Cargo.toml: tokio = { version = "1", features = ["full"] }
#[tokio::main]
async fn main() {
    let (url_result, file_result) = tokio::join!(
        async {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            "Response from URL"
        },
        async {
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
            "Contents of file"
        },
    );
    println!("URL: {url_result}, File: {file_result}");
}

// ----- smol version -----
// Cargo.toml: smol = "2", futures-lite = "2"
fn main() {
    smol::block_on(async {
        let (url_result, file_result) = futures_lite::future::zip(
            async {
                smol::Timer::after(std::time::Duration::from_millis(100)).await;
                "Response from URL"
            },
            async {
                smol::Timer::after(std::time::Duration::from_millis(50)).await;
                "Contents of file"
            },
        ).await;
        println!("URL: {url_result}, File: {file_result}");
    });
}

// ----- async-std version -----
// Cargo.toml: async-std = { version = "1", features = ["attributes"] }
#[async_std::main]
async fn main() {
    let (url_result, file_result) = futures::future::join(
        async {
            async_std::task::sleep(std::time::Duration::from_millis(100)).await;
            "Response from URL"
        },
        async {
            async_std::task::sleep(std::time::Duration::from_millis(50)).await;
            "Contents of file"
        },
    ).await;
    println!("URL: {url_result}, File: {file_result}");
}

Key takeaway: The async business logic is identical across runtimes. Only the entry point and timer/IO APIs differ. This is why writing runtime-agnostic libraries (using only std::future::Future) is valuable.

Key Takeaways — Executors and Runtimes

  • An executor’s job: poll futures when woken, sleep efficiently using OS I/O APIs
  • tokio is the default for servers; smol for minimal footprint; embassy for embedded
  • Your business logic should depend on std::future::Future, not a specific runtime
  • io_uring (Linux 5.1+) is the future of high-perf I/O but the ecosystem is still maturing

See also: Ch 8 — Tokio Deep Dive for tokio specifics, Ch 9 — When Tokio Isn’t the Right Fit for alternatives


8. Tokio Deep Dive 🟡

What you’ll learn:

  • Runtime flavors: multi-thread vs current-thread and when to use each
  • tokio::spawn, the 'static requirement, and JoinHandle
  • Task cancellation semantics (cancel-on-drop)
  • Sync primitives: Mutex, RwLock, Semaphore, and all four channel types

Runtime Flavors: Multi-Thread vs Current-Thread

Tokio offers two runtime configurations:

// Multi-threaded (default with #[tokio::main])
// Uses a work-stealing thread pool — tasks can move between threads
#[tokio::main]
async fn main() {
    // N worker threads (default = number of CPU cores)
    // Tasks are Send + 'static
}

// Current-thread — everything runs on one thread
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // Single-threaded — tasks don't need to be Send
    // Lighter weight, good for simple tools or WASM
}

// Manual runtime construction:
let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(4)
    .enable_all()
    .build()
    .unwrap();

rt.block_on(async {
    println!("Running on custom runtime");
});
graph TB
    subgraph "Multi-Thread (default)"
        MT_Q1["Thread 1<br/>Task A, Task D"]
        MT_Q2["Thread 2<br/>Task B"]
        MT_Q3["Thread 3<br/>Task C, Task E"]
        STEAL["Work Stealing:<br/>idle threads steal from busy ones"]
        MT_Q1 <--> STEAL
        MT_Q2 <--> STEAL
        MT_Q3 <--> STEAL
    end

    subgraph "Current-Thread"
        ST_Q["Single Thread<br/>Task A → Task B → Task C → Task D"]
    end

    style MT_Q1 fill:#c8e6c9,color:#000
    style MT_Q2 fill:#c8e6c9,color:#000
    style MT_Q3 fill:#c8e6c9,color:#000
    style ST_Q fill:#bbdefb,color:#000

tokio::spawn and the ’static Requirement

tokio::spawn puts a future onto the runtime’s task queue. Because it might run on any worker thread at any time, the future must be Send + 'static:

#![allow(unused)]
fn main() {
use tokio::task;

async fn example() {
    let data = String::from("hello");

    // ✅ Works: move ownership into the task
    let handle = task::spawn(async move {
        println!("{data}");
        data.len()
    });

    let len = handle.await.unwrap();
    println!("Length: {len}");
}

async fn problem() {
    let data = String::from("hello");

    // ❌ FAILS: data is borrowed, not 'static
    // task::spawn(async {
    //     println!("{data}"); // borrows `data` — not 'static
    // });

    // ❌ FAILS: Rc is not Send
    // let rc = std::rc::Rc::new(42);
    // task::spawn(async move {
    //     println!("{rc}"); // Rc is !Send — can't cross thread boundary
    // });
}
}

Why 'static? The spawned task runs independently — it might outlive the scope that created it. The compiler can’t prove the references will remain valid, so it requires owned data.

Why Send? The task might be resumed on a different thread than where it was suspended. All data held across .await points must be safe to send between threads.

#![allow(unused)]
fn main() {
// Common pattern: clone shared data into the task
let shared = Arc::new(config);

for i in 0..10 {
    let shared = Arc::clone(&shared); // Clone the Arc, not the data
    tokio::spawn(async move {
        process_item(i, &shared).await;
    });
}
}

JoinHandle and Task Cancellation

#![allow(unused)]
fn main() {
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};

async fn cancellation_example() {
    let handle: JoinHandle<String> = tokio::spawn(async {
        sleep(Duration::from_secs(10)).await;
        "completed".to_string()
    });

    // Cancel the task by dropping the handle? NO — task keeps running!
    // drop(handle); // Task continues in the background

    // To actually cancel, call abort():
    handle.abort();

    // Awaiting an aborted task returns JoinError
    match handle.await {
        Ok(val) => println!("Got: {val}"),
        Err(e) if e.is_cancelled() => println!("Task was cancelled"),
        Err(e) => println!("Task panicked: {e}"),
    }
}
}

Important: Dropping a JoinHandle does NOT cancel the task in tokio. The task becomes detached and keeps running. You must explicitly call .abort() to cancel it. This is different from dropping a Future directly, which does cancel/drop the underlying computation.

Tokio Sync Primitives

Tokio provides async-aware synchronization primitives. The key principle: don’t use std::sync::Mutex across .await points.

#![allow(unused)]
fn main() {
use tokio::sync::{Mutex, RwLock, Semaphore, mpsc, oneshot, broadcast, watch};

// --- Mutex ---
// Async mutex: the lock() method is async and won't block the thread
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
{
    let mut guard = data.lock().await; // Non-blocking lock
    guard.push(4);
} // Guard dropped here — lock released

// --- Channels ---
// mpsc: Multiple producer, single consumer
let (tx, mut rx) = mpsc::channel::<String>(100); // Bounded buffer

tokio::spawn(async move {
    tx.send("hello".into()).await.unwrap();
});

let msg = rx.recv().await.unwrap();

// oneshot: Single value, single consumer
let (tx, rx) = oneshot::channel::<i32>();
tx.send(42).unwrap(); // No await needed — either sends or fails
let val = rx.await.unwrap();

// broadcast: Multiple producers, multiple consumers (all get every message)
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

// watch: Single value, multiple consumers (only latest value)
let (tx, rx) = watch::channel(0u64);
tx.send(42).unwrap();
println!("Latest: {}", *rx.borrow());
}

Note: .unwrap() is used for brevity throughout these channel examples. In production, handle send/receive errors gracefully — a failed .send() means the receiver was dropped, and a failed .recv() means the channel is closed.

graph LR
    subgraph "Channel Types"
        direction TB
        MPSC["mpsc<br/>N→1<br/>Buffered queue"]
        ONESHOT["oneshot<br/>1→1<br/>Single value"]
        BROADCAST["broadcast<br/>N→N<br/>All receivers get all"]
        WATCH["watch<br/>1→N<br/>Latest value only"]
    end

    P1["Producer 1"] --> MPSC
    P2["Producer 2"] --> MPSC
    MPSC --> C1["Consumer"]

    P3["Producer"] --> ONESHOT
    ONESHOT --> C2["Consumer"]

    P4["Producer"] --> BROADCAST
    BROADCAST --> C3["Consumer 1"]
    BROADCAST --> C4["Consumer 2"]

    P5["Producer"] --> WATCH
    WATCH --> C5["Consumer 1"]
    WATCH --> C6["Consumer 2"]

Case Study: Choosing the Right Channel for a Notification Service

You’re building a notification service where:

  • Multiple API handlers produce events
  • A single background task batches and sends them
  • A config watcher updates rate limits at runtime
  • A shutdown signal must reach all components

Which channels for each?

RequirementChannelWhy
API handlers → Batchermpsc (bounded)N producers, 1 consumer. Bounded for backpressure — if the batcher falls behind, API handlers slow down instead of OOM
Config watcher → Rate limiterwatchOnly the latest config matters. Multiple readers (each worker) see the current value
Shutdown signal → All componentsbroadcastEvery component must receive the shutdown notification independently
Single health-check responseoneshotRequest/response pattern — one value, then done
graph LR
    subgraph "Notification Service"
        direction TB
        API1["API Handler 1"] -->|mpsc| BATCH["Batcher"]
        API2["API Handler 2"] -->|mpsc| BATCH
        CONFIG["Config Watcher"] -->|watch| RATE["Rate Limiter"]
        CTRL["Ctrl+C"] -->|broadcast| API1
        CTRL -->|broadcast| BATCH
        CTRL -->|broadcast| RATE
    end

    style API1 fill:#d4efdf,stroke:#27ae60,color:#000
    style API2 fill:#d4efdf,stroke:#27ae60,color:#000
    style BATCH fill:#e8f4f8,stroke:#2980b9,color:#000
    style CONFIG fill:#fef9e7,stroke:#f39c12,color:#000
    style RATE fill:#fef9e7,stroke:#f39c12,color:#000
    style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000
🏋️ Exercise: Build a Task Pool (click to expand)

Challenge: Build a function run_with_limit that accepts a list of async closures and a concurrency limit, executing at most N tasks simultaneously. Use tokio::sync::Semaphore.

🔑 Solution
#![allow(unused)]
fn main() {
use std::future::Future;
use std::sync::Arc;
use tokio::sync::Semaphore;

async fn run_with_limit<F, Fut, T>(tasks: Vec<F>, limit: usize) -> Vec<T>
where
    F: FnOnce() -> Fut + Send + 'static,
    Fut: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let semaphore = Arc::new(Semaphore::new(limit));
    let mut handles = Vec::new();

    for task in tasks {
        let permit = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            // Permit is held while task runs, then dropped
            task().await
        });
        handles.push(handle);
    }

    let mut results = Vec::new();
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}

// Usage:
// let tasks: Vec<_> = urls.into_iter().map(|url| {
//     move || async move { fetch(url).await }
// }).collect();
// let results = run_with_limit(tasks, 10).await; // Max 10 concurrent
}

Key takeaway: Semaphore is the standard way to limit concurrency in tokio. Each task acquires a permit before starting work. When the semaphore is full, new tasks wait asynchronously (non-blocking) until a slot opens.

Key Takeaways — Tokio Deep Dive

  • Use multi_thread for servers (default); current_thread for CLI tools, tests, or !Send types
  • tokio::spawn requires 'static futures — use Arc or channels to share data
  • Dropping a JoinHandle does not cancel the task — call .abort() explicitly
  • Choose sync primitives by need: Mutex for shared state, Semaphore for concurrency limits, mpsc/oneshot/broadcast/watch for communication

See also: Ch 9 — When Tokio Isn’t the Right Fit for alternatives to spawn, Ch 12 — Common Pitfalls for MutexGuard-across-await bugs


9. When Tokio Isn’t the Right Fit 🟡

What you’ll learn:

  • The 'static problem: when tokio::spawn forces you into Arc everywhere
  • LocalSet for !Send futures
  • FuturesUnordered for borrow-friendly concurrency (no spawn needed)
  • JoinSet for managed task groups
  • Writing runtime-agnostic libraries
graph TD
    START["Need concurrent futures?"] --> STATIC{"Can futures be 'static?"}
    STATIC -->|Yes| SEND{"Are futures Send?"}
    STATIC -->|No| FU["FuturesUnordered<br/>Runs on current task"]
    SEND -->|Yes| SPAWN["tokio::spawn<br/>Multi-threaded"]
    SEND -->|No| LOCAL["LocalSet<br/>Single-threaded"]
    SPAWN --> MANAGE{"Need to track/abort tasks?"}
    MANAGE -->|Yes| JOINSET["JoinSet / TaskTracker"]
    MANAGE -->|No| HANDLE["JoinHandle"]

    style START fill:#f5f5f5,stroke:#333,color:#000
    style FU fill:#d4efdf,stroke:#27ae60,color:#000
    style SPAWN fill:#e8f4f8,stroke:#2980b9,color:#000
    style LOCAL fill:#fef9e7,stroke:#f39c12,color:#000
    style JOINSET fill:#e8daef,stroke:#8e44ad,color:#000
    style HANDLE fill:#e8f4f8,stroke:#2980b9,color:#000

The ’static Future Problem

Tokio’s spawn requires 'static futures. This means you can’t borrow local data in spawned tasks:

#![allow(unused)]
fn main() {
async fn process_items(items: &[String]) {
    // ❌ Can't do this — items is borrowed, not 'static
    // for item in items {
    //     tokio::spawn(async {
    //         process(item).await;
    //     });
    // }

    // 😐 Workaround 1: Clone everything
    for item in items {
        let item = item.clone();
        tokio::spawn(async move {
            process(&item).await;
        });
    }

    // 😐 Workaround 2: Use Arc
    let items = Arc::new(items.to_vec());
    for i in 0..items.len() {
        let items = Arc::clone(&items);
        tokio::spawn(async move {
            process(&items[i]).await;
        });
    }
}
}

This is annoying! In Go, you can just go func() { use(item) } with a closure. In Rust, the ownership system forces you to think about who owns what and how long it lives.

Scoped Tasks and Alternatives

Several solutions exist for the 'static problem:

#![allow(unused)]
fn main() {
// 1. tokio::task::LocalSet — run !Send futures on current thread
use tokio::task::LocalSet;

let local_set = LocalSet::new();
local_set.run_until(async {
    tokio::task::spawn_local(async {
        // Can use Rc, Cell, and other !Send types here
        let rc = std::rc::Rc::new(42);
        println!("{rc}");
    }).await.unwrap();
}).await;

// 2. FuturesUnordered — concurrent without spawning
use futures::stream::{FuturesUnordered, StreamExt};

async fn process_items(items: &[String]) {
    let futures: FuturesUnordered<_> = items
        .iter()
        .map(|item| async move {
            // ✅ Can borrow item — no spawn, no 'static needed!
            process(item).await
        })
        .collect();

    // Drive all futures to completion
    futures.for_each(|result| async {
        println!("Result: {result:?}");
    }).await;
}

// 3. tokio JoinSet (tokio 1.21+) — managed set of spawned tasks
use tokio::task::JoinSet;

async fn with_joinset() {
    let mut set = JoinSet::new();

    for i in 0..10 {
        set.spawn(async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            i * 2
        });
    }

    while let Some(result) = set.join_next().await {
        println!("Task completed: {:?}", result.unwrap());
    }
}
}

Lightweight Runtimes for Libraries

If you’re writing a library — don’t force users into tokio:

#![allow(unused)]
fn main() {
// ❌ BAD: Library forces tokio on users
pub async fn my_lib_function() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    // Now your users MUST use tokio
}

// ✅ GOOD: Library is runtime-agnostic
pub async fn my_lib_function() {
    // Use only types from std::future and futures crate
    do_computation().await;
}

// ✅ GOOD: Accept a generic future for I/O operations
pub async fn fetch_with_retry<F, Fut, T, E>(
    operation: F,
    max_retries: usize,
) -> Result<T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    for attempt in 0..max_retries {
        match operation().await {
            Ok(val) => return Ok(val),
            Err(e) if attempt == max_retries - 1 => return Err(e),
            Err(_) => continue,
        }
    }
    unreachable!()
}
}

Rule of thumb: Libraries should depend on futures crate, not tokio. Applications should depend on tokio (or their chosen runtime). This keeps the ecosystem composable.

🏋️ Exercise: FuturesUnordered vs Spawn (click to expand)

Challenge: Write the same function two ways — once using tokio::spawn (requires 'static) and once using FuturesUnordered (borrows data). The function receives &[String] and returns the length of each string after a simulated async lookup.

Compare: Which approach requires .clone()? Which can borrow the input slice?

🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, Duration};

// Version 1: tokio::spawn — requires 'static, must clone
async fn lengths_with_spawn(items: &[String]) -> Vec<usize> {
    let mut handles = Vec::new();
    for item in items {
        let owned = item.clone(); // Must clone — spawn requires 'static
        handles.push(tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            owned.len()
        }));
    }

    let mut results = Vec::new();
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}

// Version 2: FuturesUnordered — borrows data, no clone needed
async fn lengths_without_spawn(items: &[String]) -> Vec<usize> {
    let futures: FuturesUnordered<_> = items
        .iter()
        .map(|item| async move {
            sleep(Duration::from_millis(10)).await;
            item.len() // ✅ Borrows item — no clone!
        })
        .collect();

    futures.collect().await
}

#[tokio::test]
async fn test_both_versions() {
    let items = vec!["hello".into(), "world".into(), "rust".into()];

    let v1 = lengths_with_spawn(&items).await;
    // Note: v1 preserves insertion order (sequential join)

    let mut v2 = lengths_without_spawn(&items).await;
    v2.sort(); // FuturesUnordered returns in completion order

    assert_eq!(v1, vec![5, 5, 4]);
    assert_eq!(v2, vec![4, 5, 5]);
}
}

Key takeaway: FuturesUnordered avoids the 'static requirement by running all futures on the current task (no thread migration). The trade-off: all futures share one task — if one blocks, the others stall. Use spawn for CPU-heavy work that should run on separate threads.

Key Takeaways — When Tokio Isn’t the Right Fit

  • FuturesUnordered runs futures concurrently on the current task — no 'static requirement
  • LocalSet enables !Send futures on a single-threaded executor
  • JoinSet (tokio 1.21+) provides managed task groups with automatic cleanup
  • For libraries: depend only on std::future::Future + futures crate, not tokio directly

See also: Ch 8 — Tokio Deep Dive for when spawn is the right tool, Ch 11 — Streams for buffer_unordered() as another concurrency limiter


10. Async Traits 🟡

What you’ll learn:

  • Why async methods in traits took years to stabilize
  • RPITIT: native async trait methods (Rust 1.75+)
  • The dyn dispatch challenge and trait_variant workaround
  • Async closures (Rust 1.85+): async Fn() and async FnOnce()
graph TD
    subgraph "Async Trait Approaches"
        direction TB
        RPITIT["RPITIT (Rust 1.75+)<br/>async fn in trait<br/>Static dispatch only"]
        VARIANT["trait_variant<br/>Auto-generates Send variant<br/>Enables dyn dispatch"]
        BOXED["Box&lt;dyn Future&gt;<br/>Manual boxing<br/>Works everywhere"]
        CLOSURE["Async Closures (1.85+)<br/>async Fn() / async FnOnce()<br/>Callbacks & middleware"]
    end

    RPITIT -->|"Need dyn?"| VARIANT
    RPITIT -->|"Pre-1.75?"| BOXED
    CLOSURE -->|"Replaces"| BOXED

    style RPITIT fill:#d4efdf,stroke:#27ae60,color:#000
    style VARIANT fill:#e8f4f8,stroke:#2980b9,color:#000
    style BOXED fill:#fef9e7,stroke:#f39c12,color:#000
    style CLOSURE fill:#e8daef,stroke:#8e44ad,color:#000

The History: Why It Took So Long

Async methods in traits were Rust’s most requested feature for years. The problem:

#![allow(unused)]
fn main() {
// This didn't compile until Rust 1.75 (Dec 2023):
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
}
// Why? Because async fn returns `impl Future<Output = T>`,
// and `impl Trait` in trait return position wasn't supported.
}

The fundamental challenge: when a trait method returns impl Future, each implementor returns a different concrete type. The compiler needs to know the size of the return type, but trait methods are dynamically dispatched.

RPITIT: Return Position Impl Trait in Trait

Since Rust 1.75, this just works for static dispatch:

#![allow(unused)]
fn main() {
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
    // Desugars to:
    // fn get(&self, key: &str) -> impl Future<Output = Option<String>>;
}

struct InMemoryStore {
    data: std::collections::HashMap<String, String>,
}

impl DataStore for InMemoryStore {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.get(key).cloned()
    }
}

// ✅ Works with generics (static dispatch):
async fn lookup<S: DataStore>(store: &S, key: &str) {
    if let Some(val) = store.get(key).await {
        println!("{key} = {val}");
    }
}
}

dyn Dispatch and Send Bounds

The limitation: you can’t use dyn DataStore directly because the compiler doesn’t know the size of the returned future:

#![allow(unused)]
fn main() {
// ❌ Doesn't work:
// async fn lookup_dyn(store: &dyn DataStore, key: &str) { ... }
// Error: the trait `DataStore` is not dyn-compatible because method `get`
//        is `async`

// ✅ Workaround: Return a boxed future
trait DynDataStore {
    fn get(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send + '_>>;
}

// Or use the trait_variant macro (see below)
}

The Send problem: In multi-threaded runtimes, spawned tasks must be Send. But async trait methods don’t automatically add Send bounds:

#![allow(unused)]
fn main() {
trait Worker {
    async fn run(&self); // Future might or might not be Send
}

struct MyWorker;

impl Worker for MyWorker {
    async fn run(&self) {
        // If this uses !Send types, the future is !Send
        let rc = std::rc::Rc::new(42);
        some_work().await;
        println!("{rc}");
    }
}

// ❌ This fails if the future isn't Send:
// tokio::spawn(worker.run()); // Requires Send + 'static
}

The trait_variant Crate

The trait_variant crate (from the Rust async working group) generates a Send variant automatically:

#![allow(unused)]
fn main() {
// Cargo.toml: trait-variant = "0.1"

#[trait_variant::make(SendDataStore: Send)]
trait DataStore {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// Now you have two traits:
// - DataStore: no Send bound on the futures
// - SendDataStore: all futures are Send
// Both have the same methods, implementors implement DataStore
// and get SendDataStore for free if their futures are Send.

// Use SendDataStore when you need to spawn:
async fn spawn_lookup(store: Arc<dyn SendDataStore>) {
    tokio::spawn(async move {
        store.get("key").await;
    });
}
}

Quick Reference: Async Traits

ApproachStatic DispatchDynamic DispatchSendSyntax Overhead
Native async fn in traitImplicitNone
trait_variantExplicit#[trait_variant::make]
Manual Box::pinExplicitHigh
async-trait crate#[async_trait]Medium (proc macro)

Recommendation: For new code (Rust 1.75+), use native async traits with trait_variant when you need dyn dispatch. The async-trait crate is still widely used but boxes every future — the native approach is zero-cost for static dispatch.

Async Closures (Rust 1.85+)

Since Rust 1.85, async closures are stable — closures that capture their environment and return a future:

#![allow(unused)]
fn main() {
// Before 1.85: awkward workaround
let urls = vec!["https://a.com", "https://b.com"];
let fetchers: Vec<_> = urls.iter().map(|url| {
    let url = url.to_string();
    // Returns a non-async closure that returns an async block
    move || async move { reqwest::get(&url).await }
}).collect();

// After 1.85: async closures just work
let fetchers: Vec<_> = urls.iter().map(|url| {
    async move || { reqwest::get(url).await }
    // ↑ This is an async closure — captures url, returns a Future
}).collect();
}

Async closures implement the new AsyncFn, AsyncFnMut, and AsyncFnOnce traits, which mirror Fn, FnMut, FnOnce:

#![allow(unused)]
fn main() {
// Generic function accepting an async closure
async fn retry<F>(max: usize, f: F) -> Result<String, Error>
where
    F: AsyncFn() -> Result<String, Error>,
{
    for _ in 0..max {
        if let Ok(val) = f().await {
            return Ok(val);
        }
    }
    f().await
}
}

Migration tip: If you have code using Fn() -> impl Future<Output = T>, consider switching to AsyncFn() -> T for cleaner signatures.

🏋️ Exercise: Design an Async Service Trait (click to expand)

Challenge: Design a Cache trait with async get and set methods. Implement it twice: once with a HashMap (in-memory) and once with a simulated Redis backend (use tokio::time::sleep to simulate network latency). Write a generic function that works with both.

🔑 Solution
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

trait Cache {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// --- In-memory implementation ---
struct MemoryCache {
    store: Mutex<HashMap<String, String>>,
}

impl MemoryCache {
    fn new() -> Self {
        MemoryCache {
            store: Mutex::new(HashMap::new()),
        }
    }
}

impl Cache for MemoryCache {
    async fn get(&self, key: &str) -> Option<String> {
        self.store.lock().await.get(key).cloned()
    }

    async fn set(&self, key: &str, value: String) {
        self.store.lock().await.insert(key.to_string(), value);
    }
}

// --- Simulated Redis implementation ---
struct RedisCache {
    store: Mutex<HashMap<String, String>>,
    latency: Duration,
}

impl RedisCache {
    fn new(latency_ms: u64) -> Self {
        RedisCache {
            store: Mutex::new(HashMap::new()),
            latency: Duration::from_millis(latency_ms),
        }
    }
}

impl Cache for RedisCache {
    async fn get(&self, key: &str) -> Option<String> {
        sleep(self.latency).await; // Simulate network round-trip
        self.store.lock().await.get(key).cloned()
    }

    async fn set(&self, key: &str, value: String) {
        sleep(self.latency).await;
        self.store.lock().await.insert(key.to_string(), value);
    }
}

// --- Generic function working with any Cache ---
async fn cache_demo<C: Cache>(cache: &C, label: &str) {
    cache.set("greeting", "Hello, async!".into()).await;
    let val = cache.get("greeting").await;
    println!("[{label}] greeting = {val:?}");
}

#[tokio::main]
async fn main() {
    let mem = MemoryCache::new();
    cache_demo(&mem, "memory").await;

    let redis = RedisCache::new(50);
    cache_demo(&redis, "redis").await;
}

Key takeaway: The same generic function works with both implementations through static dispatch. No boxing, no allocation overhead. For dynamic dispatch, add trait_variant::make(SendCache: Send).

Key Takeaways — Async Traits

  • Since Rust 1.75, you can write async fn directly in traits (no #[async_trait] crate needed)
  • trait_variant::make auto-generates a Send variant for dynamic dispatch
  • Async closures (async Fn()) stabilized in 1.85 — use for callbacks and middleware
  • Prefer static dispatch (<S: Service>) over dyn for performance-critical code

See also: Ch 13 — Production Patterns for Tower’s Service trait, Ch 6 — Building Futures by Hand for manual trait implementations


11. Streams and AsyncIterator 🟡

What you’ll learn:

  • The Stream trait: async iteration over multiple values
  • Creating streams: stream::iter, async_stream, unfold
  • Stream combinators: map, filter, buffer_unordered, fold
  • Async I/O traits: AsyncRead, AsyncWrite, AsyncBufRead

Stream Trait Overview

A Stream is to Iterator what Future is to a single value — it yields multiple values asynchronously:

#![allow(unused)]
fn main() {
// std::iter::Iterator (synchronous, multiple values)
trait Iterator {
    type Item;
    fn next(&mut self) -> Option<Self::Item>;
}

// futures::Stream (async, multiple values)
trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
}
graph LR
    subgraph "Sync"
        VAL["Value<br/>(T)"]
        ITER["Iterator<br/>(multiple T)"]
    end

    subgraph "Async"
        FUT["Future<br/>(async T)"]
        STREAM["Stream<br/>(async multiple T)"]
    end

    VAL -->|"make async"| FUT
    ITER -->|"make async"| STREAM
    VAL -->|"make multiple"| ITER
    FUT -->|"make multiple"| STREAM

    style VAL fill:#e3f2fd,color:#000
    style ITER fill:#e3f2fd,color:#000
    style FUT fill:#c8e6c9,color:#000
    style STREAM fill:#c8e6c9,color:#000

Creating Streams

#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;

// 1. From an iterator
let s = stream::iter(vec![1, 2, 3]);

// 2. From an async generator (using async_stream crate)
// Cargo.toml: async-stream = "0.3"
use async_stream::stream;

fn countdown(from: u32) -> impl futures::Stream<Item = u32> {
    stream! {
        for i in (0..=from).rev() {
            tokio::time::sleep(Duration::from_millis(500)).await;
            yield i;
        }
    }
}

// 3. From a tokio interval
let tick_stream = IntervalStream::new(interval(Duration::from_secs(1)));

// 4. From a channel receiver (tokio_stream::wrappers)
let (tx, rx) = tokio::sync::mpsc::channel::<String>(100);
let rx_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

// 5. From unfold (generate from async state)
let s = stream::unfold(0u32, |state| async move {
    if state >= 5 {
        None // Stream ends
    } else {
        let next = state + 1;
        Some((state, next)) // yield `state`, new state is `next`
    }
});
}

Consuming Streams

#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};

async fn stream_examples() {
    let s = stream::iter(vec![1, 2, 3, 4, 5]);

    // for_each — process each item
    s.for_each(|x| async move {
        println!("{x}");
    }).await;

    // map + collect
    let doubled: Vec<i32> = stream::iter(vec![1, 2, 3])
        .map(|x| x * 2)
        .collect()
        .await;

    // filter
    let evens: Vec<i32> = stream::iter(1..=10)
        .filter(|x| futures::future::ready(x % 2 == 0))
        .collect()
        .await;

    // buffer_unordered — process N items concurrently
    let results: Vec<_> = stream::iter(vec!["url1", "url2", "url3"])
        .map(|url| async move {
            // Simulate HTTP fetch
            tokio::time::sleep(Duration::from_millis(100)).await;
            format!("response from {url}")
        })
        .buffer_unordered(10) // Up to 10 concurrent fetches
        .collect()
        .await;

    // take, skip, zip, chain — just like Iterator
    let first_three: Vec<i32> = stream::iter(1..=100)
        .take(3)
        .collect()
        .await;
}
}

Comparison with C# IAsyncEnumerable

FeatureRust StreamC# IAsyncEnumerable<T>
Syntaxstream! { yield x; }await foreach / yield return
CancellationDrop the streamCancellationToken
BackpressureConsumer controls poll rateConsumer controls MoveNextAsync
Built-inNo (needs futures crate)Yes (since C# 8.0)
Combinators.map(), .filter(), .buffer_unordered()LINQ + System.Linq.Async
Error handlingStream<Item = Result<T, E>>Throw in async iterator
#![allow(unused)]
fn main() {
// Rust: Stream of database rows
// NOTE: try_stream! (not stream!) is required when using ? inside the body.
// stream! doesn't propagate errors — try_stream! yields Err(e) and ends.
fn get_users(db: &Database) -> impl Stream<Item = Result<User, DbError>> + '_ {
    try_stream! {
        let mut cursor = db.query("SELECT * FROM users").await?;
        while let Some(row) = cursor.next().await {
            yield User::from_row(row?);
        }
    }
}

// Consume:
let mut users = pin!(get_users(&db));
while let Some(result) = users.next().await {
    match result {
        Ok(user) => println!("{}", user.name),
        Err(e) => eprintln!("Error: {e}"),
    }
}
}
// C# equivalent:
async IAsyncEnumerable<User> GetUsers() {
    await using var reader = await db.QueryAsync("SELECT * FROM users");
    while (await reader.ReadAsync()) {
        yield return User.FromRow(reader);
    }
}

// Consume:
await foreach (var user in GetUsers()) {
    Console.WriteLine(user.Name);
}
🏋️ Exercise: Build an Async Stats Aggregator (click to expand)

Challenge: Given a stream of sensor readings Stream<Item = f64>, write an async function that consumes the stream and returns (count, min, max, average). Use StreamExt combinators — don’t just collect into a Vec.

Hint: Use .fold() to accumulate state across the stream.

🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};

#[derive(Debug)]
struct Stats {
    count: usize,
    min: f64,
    max: f64,
    sum: f64,
}

impl Stats {
    fn average(&self) -> f64 {
        if self.count == 0 { 0.0 } else { self.sum / self.count as f64 }
    }
}

async fn compute_stats<S: futures::Stream<Item = f64> + Unpin>(stream: S) -> Stats {
    stream
        .fold(
            Stats { count: 0, min: f64::INFINITY, max: f64::NEG_INFINITY, sum: 0.0 },
            |mut acc, value| async move {
                acc.count += 1;
                acc.min = acc.min.min(value);
                acc.max = acc.max.max(value);
                acc.sum += value;
                acc
            },
        )
        .await
}

#[tokio::test]
async fn test_stats() {
    let readings = stream::iter(vec![23.5, 24.1, 22.8, 25.0, 23.9]);
    let stats = compute_stats(readings).await;

    assert_eq!(stats.count, 5);
    assert!((stats.min - 22.8).abs() < f64::EPSILON);
    assert!((stats.max - 25.0).abs() < f64::EPSILON);
    assert!((stats.average() - 23.86).abs() < 0.01);
}
}

Key takeaway: Stream combinators like .fold() process items one-at-a-time without collecting into memory — essential for processing large or unbounded data streams.

Async I/O Traits: AsyncRead, AsyncWrite, AsyncBufRead

Just as std::io::Read/Write are the foundation of synchronous I/O, their async counterparts are the foundation of async I/O. These traits are provided by tokio::io (or futures::io for runtime-agnostic code):

#![allow(unused)]
fn main() {
// tokio::io — the async versions of std::io traits

/// Read bytes from a source asynchronously
pub trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,  // Tokio's safe wrapper around uninitialized memory
    ) -> Poll<io::Result<()>>;
}

/// Write bytes to a sink asynchronously
pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>>;

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}

/// Buffered reading with line support
pub trait AsyncBufRead: AsyncRead {
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
    fn consume(self: Pin<&mut Self>, amt: usize);
}
}

In practice, you rarely call these poll_* methods directly. Instead, use the extension traits AsyncReadExt and AsyncWriteExt which provide .await-friendly helper methods:

#![allow(unused)]
fn main() {
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt};
use tokio::net::TcpStream;
use tokio::io::BufReader;

async fn io_examples() -> tokio::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;

    // AsyncWriteExt: write_all, write_u32, write_buf, etc.
    stream.write_all(b"GET / HTTP/1.0\r\n\r\n").await?;

    // AsyncReadExt: read, read_exact, read_to_end, read_to_string
    let mut response = Vec::new();
    stream.read_to_end(&mut response).await?;

    // AsyncBufReadExt: read_line, lines(), split()
    let file = tokio::fs::File::open("config.txt").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    while let Some(line) = lines.next_line().await? {
        println!("{line}");
    }

    Ok(())
}
}

Implementing custom async I/O — wrap a protocol over raw TCP:

#![allow(unused)]
fn main() {
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::pin::Pin;
use std::task::{Context, Poll};

/// A length-prefixed protocol: [u32 length][payload bytes]
struct FramedStream<T> {
    inner: T,
}

impl<T: AsyncRead + AsyncReadExt + Unpin> FramedStream<T> {
    /// Read one complete frame
    async fn read_frame(&mut self) -> tokio::io::Result<Vec<u8>>
    {
        // Read the 4-byte length prefix
        let len = self.inner.read_u32().await? as usize;

        // Read exactly that many bytes
        let mut payload = vec![0u8; len];
        self.inner.read_exact(&mut payload).await?;
        Ok(payload)
    }
}

impl<T: AsyncWrite + AsyncWriteExt + Unpin> FramedStream<T> {
    /// Write one complete frame
    async fn write_frame(&mut self, data: &[u8]) -> tokio::io::Result<()>
    {
        self.inner.write_u32(data.len() as u32).await?;
        self.inner.write_all(data).await?;
        self.inner.flush().await?;
        Ok(())
    }
}
}
Sync TraitAsync Trait (tokio)Async Trait (futures)Extension Trait
std::io::Readtokio::io::AsyncReadfutures::io::AsyncReadAsyncReadExt
std::io::Writetokio::io::AsyncWritefutures::io::AsyncWriteAsyncWriteExt
std::io::BufReadtokio::io::AsyncBufReadfutures::io::AsyncBufReadAsyncBufReadExt
std::io::Seektokio::io::AsyncSeekfutures::io::AsyncSeekAsyncSeekExt

tokio vs futures I/O traits: They’re similar but not identical — tokio’s AsyncRead uses ReadBuf (handles uninitialized memory safely), while futures::AsyncRead uses &mut [u8]. Use tokio_util::compat to convert between them.

Copy utilities: tokio::io::copy(&mut reader, &mut writer) is the async equivalent of std::io::copy — useful for proxy servers or file transfers. tokio::io::copy_bidirectional copies both directions concurrently.

🏋️ Exercise: Build an Async Line Counter (click to expand)

Challenge: Write an async function that takes any AsyncBufRead source and returns the number of non-empty lines. It should work with files, TCP streams, or any buffered reader.

Hint: Use AsyncBufReadExt::lines() and count lines where !line.is_empty().

🔑 Solution
#![allow(unused)]
fn main() {
use tokio::io::AsyncBufReadExt;

async fn count_non_empty_lines<R: tokio::io::AsyncBufRead + Unpin>(
    reader: R,
) -> tokio::io::Result<usize> {
    let mut lines = reader.lines();
    let mut count = 0;
    while let Some(line) = lines.next_line().await? {
        if !line.is_empty() {
            count += 1;
        }
    }
    Ok(count)
}

// Works with any AsyncBufRead:
// let file = tokio::io::BufReader::new(tokio::fs::File::open("data.txt").await?);
// let count = count_non_empty_lines(file).await?;
//
// let tcp = tokio::io::BufReader::new(TcpStream::connect("...").await?);
// let count = count_non_empty_lines(tcp).await?;
}

Key takeaway: By programming against AsyncBufRead instead of a concrete type, your I/O code is reusable across files, sockets, pipes, and even in-memory buffers (tokio::io::BufReader::new(std::io::Cursor::new(data))).

Key Takeaways — Streams and AsyncIterator

  • Stream is the async equivalent of Iterator — yields Poll::Ready(Some(item)) or Poll::Ready(None)
  • .buffer_unordered(N) processes N stream items concurrently — the key concurrency tool for streams
  • async_stream::stream! is the easiest way to create custom streams (uses yield)
  • AsyncRead/AsyncBufRead enable generic, reusable I/O code across files, sockets, and pipes

See also: Ch 9 — When Tokio Isn’t the Right Fit for FuturesUnordered (related pattern), Ch 13 — Production Patterns for backpressure with bounded channels


12. Common Pitfalls 🔴

What you’ll learn:

  • 9 common async Rust bugs and how to fix each one
  • Why blocking the executor is the #1 mistake (and how spawn_blocking fixes it)
  • Cancellation hazards: what happens when a future is dropped mid-await
  • Debugging: tokio-console, tracing, #[instrument]
  • Testing: #[tokio::test], time::pause(), trait-based mocking

Blocking the Executor

The #1 mistake in async Rust: running blocking code on the async executor thread. This starves other tasks.

#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the entire executor thread
async fn bad_handler() -> String {
    let data = std::fs::read_to_string("big_file.txt").unwrap(); // BLOCKS!
    process(&data)
}

// ✅ CORRECT: Offload blocking work to a dedicated thread pool
async fn good_handler() -> String {
    let data = tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("big_file.txt").unwrap()
    }).await.unwrap();
    process(&data)
}

// ✅ ALSO CORRECT: Use tokio's async fs
async fn also_good_handler() -> String {
    let data = tokio::fs::read_to_string("big_file.txt").await.unwrap();
    process(&data)
}
}
graph TB
    subgraph "❌ Blocking Call on Executor"
        T1_BAD["Thread 1: std::fs::read()<br/>🔴 BLOCKED for 500ms"]
        T2_BAD["Thread 2: handling requests<br/>🟢 Working alone"]
        TASKS_BAD["100 pending tasks<br/>⏳ Starved"]
        T1_BAD -->|"can't poll"| TASKS_BAD
    end

    subgraph "✅ spawn_blocking"
        T1_GOOD["Thread 1: polling futures<br/>🟢 Available"]
        T2_GOOD["Thread 2: polling futures<br/>🟢 Available"]
        BT["Blocking pool thread:<br/>std::fs::read()<br/>🔵 Separate pool"]
        TASKS_GOOD["100 tasks<br/>✅ All making progress"]
        T1_GOOD -->|"polls"| TASKS_GOOD
        T2_GOOD -->|"polls"| TASKS_GOOD
    end

std::thread::sleep vs tokio::time::sleep

#![allow(unused)]
fn main() {
// ❌ WRONG: Blocks the executor thread for 5 seconds
async fn bad_delay() {
    std::thread::sleep(Duration::from_secs(5)); // Thread can't poll anything else!
}

// ✅ CORRECT: Yields to the executor, other tasks can run
async fn good_delay() {
    tokio::time::sleep(Duration::from_secs(5)).await; // Non-blocking!
}
}

Holding MutexGuard Across .await

#![allow(unused)]
fn main() {
use std::sync::Mutex; // std Mutex — NOT async-aware

// ❌ WRONG: MutexGuard held across .await
async fn bad_mutex(data: &Mutex<Vec<String>>) {
    let mut guard = data.lock().unwrap();
    guard.push("item".into());
    some_io().await; // 💥 Guard is held here — blocks other threads from locking!
    guard.push("another".into());
}
// Also: std::sync::MutexGuard is !Send, so this won't compile
// with tokio's multi-threaded runtime.

// ✅ FIX 1: Scope the guard to drop before .await
async fn good_mutex_scoped(data: &Mutex<Vec<String>>) {
    {
        let mut guard = data.lock().unwrap();
        guard.push("item".into());
    } // Guard dropped here
    some_io().await; // Safe — lock is released
    {
        let mut guard = data.lock().unwrap();
        guard.push("another".into());
    }
}

// ✅ FIX 2: Use tokio::sync::Mutex (async-aware)
use tokio::sync::Mutex as AsyncMutex;

async fn good_async_mutex(data: &AsyncMutex<Vec<String>>) {
    let mut guard = data.lock().await; // Async lock — doesn't block the thread
    guard.push("item".into());
    some_io().await; // OK — tokio Mutex guard is Send
    guard.push("another".into());
}
}

When to use which Mutex:

  • std::sync::Mutex: Short critical sections with no .await inside
  • tokio::sync::Mutex: When you must hold the lock across .await points
  • parking_lot::Mutex: Drop-in std replacement, faster, smaller, still no .await

Cancellation Hazards

Dropping a future cancels it — but this can leave things in an inconsistent state:

#![allow(unused)]
fn main() {
// ❌ DANGEROUS: Resource leak on cancellation
async fn transfer(from: &Account, to: &Account, amount: u64) {
    from.debit(amount).await;  // If cancelled HERE...
    to.credit(amount).await;   // ...money vanishes!
}

// ✅ SAFE: Make operations atomic or use compensation
async fn safe_transfer(from: &Account, to: &Account, amount: u64) -> Result<(), Error> {
    // Use a database transaction (all-or-nothing)
    let tx = db.begin_transaction().await?;
    tx.debit(from, amount).await?;
    tx.credit(to, amount).await?;
    tx.commit().await?; // Only commits if everything succeeded
    Ok(())
}

// ✅ ALSO SAFE: Use tokio::select! with cancellation awareness
tokio::select! {
    result = transfer(from, to, amount) => {
        // Transfer completed
    }
    _ = shutdown_signal() => {
        // Don't cancel mid-transfer — let it finish
        // Or: roll back explicitly
    }
}
}

No Async Drop

Rust’s Drop trait is synchronous — you cannot .await inside drop(). This is a frequent source of confusion:

#![allow(unused)]
fn main() {
struct DbConnection { /* ... */ }

impl Drop for DbConnection {
    fn drop(&mut self) {
        // ❌ Can't do this — drop() is sync!
        // self.connection.shutdown().await;

        // ✅ Workaround 1: Spawn a cleanup task (fire-and-forget)
        let conn = self.connection.take();
        tokio::spawn(async move {
            let _ = conn.shutdown().await;
        });

        // ✅ Workaround 2: Use a synchronous close
        // self.connection.blocking_close();
    }
}
}

Best practice: Provide an explicit async fn close(self) method and document that callers should use it. Rely on Drop only as a safety net, not the primary cleanup path.

select! Fairness and Starvation

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

// ❌ UNFAIR: busy_stream always wins, slow_stream starves
async fn unfair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
    loop {
        tokio::select! {
            Some(v) = fast.recv() => println!("fast: {v}"),
            Some(v) = slow.recv() => println!("slow: {v}"),
            // If both are ready, tokio randomly picks one.
            // But if `fast` is ALWAYS ready, `slow` rarely gets polled.
        }
    }
}

// ✅ FAIR: Use biased select or drain in batches
async fn fair(mut fast: mpsc::Receiver<i32>, mut slow: mpsc::Receiver<i32>) {
    loop {
        tokio::select! {
            biased; // Always check in order — explicit priority

            Some(v) = slow.recv() => println!("slow: {v}"),  // Priority!
            Some(v) = fast.recv() => println!("fast: {v}"),
        }
    }
}
}

Accidental Sequential Execution

#![allow(unused)]
fn main() {
// ❌ SEQUENTIAL: Takes 2 seconds total
async fn slow() {
    let a = fetch("url_a").await; // 1 second
    let b = fetch("url_b").await; // 1 second (waits for a to finish first!)
}

// ✅ CONCURRENT: Takes 1 second total
async fn fast() {
    let (a, b) = tokio::join!(
        fetch("url_a"), // Both start immediately
        fetch("url_b"),
    );
}

// ✅ ALSO CONCURRENT: Using let + join
async fn also_fast() {
    let fut_a = fetch("url_a"); // Create future (lazy — not started yet)
    let fut_b = fetch("url_b"); // Create future
    let (a, b) = tokio::join!(fut_a, fut_b); // NOW both run concurrently
}
}

Trap: let a = fetch(url).await; let b = fetch(url).await; is sequential! The second .await doesn’t start until the first finishes. Use join! or spawn for concurrency.

Case Study: Debugging a Hung Production Service

A real-world scenario: a service handles requests fine for 10 minutes, then stops responding. No errors in logs. CPU at 0%.

Diagnosis steps:

  1. Attach tokio-console — reveals 200+ tasks stuck in Pending state
  2. Check task details — all waiting on the same Mutex::lock().await
  3. Root cause — one task held a std::sync::MutexGuard across an .await and panicked, poisoning the mutex. All other tasks now fail on lock().unwrap()

The fix:

Before (broken)After (fixed)
std::sync::Mutextokio::sync::Mutex
.lock().unwrap() across .awaitScope lock before .await
No timeout on lock acquisitiontokio::time::timeout(dur, mutex.lock())
No recovery on poisoned mutextokio::sync::Mutex doesn’t poison

Prevention checklist:

  • Use tokio::sync::Mutex if the guard crosses any .await
  • Add #[tracing::instrument] to async functions for span tracking
  • Run tokio-console in staging to catch hung tasks early
  • Add health check endpoints that verify task responsiveness
🏋️ Exercise: Spot the Bugs (click to expand)

Challenge: Find all the async pitfalls in this code and fix them.

#![allow(unused)]
fn main() {
use std::sync::Mutex;

async fn process_requests(urls: Vec<String>) -> Vec<String> {
    let results = Mutex::new(Vec::new());
    
    for url in &urls {
        let response = reqwest::get(url).await.unwrap().text().await.unwrap();
        std::thread::sleep(std::time::Duration::from_millis(100)); // Rate limit
        let mut guard = results.lock().unwrap();
        guard.push(response);
        expensive_parse(&guard).await; // Parse all results so far
    }
    
    results.into_inner().unwrap()
}
}
🔑 Solution

Bugs found:

  1. Sequential fetches — URLs are fetched one at a time instead of concurrently
  2. std::thread::sleep — Blocks the executor thread
  3. MutexGuard held across .awaitguard is alive when expensive_parse is awaited
  4. No concurrency — Should use join! or FuturesUnordered
#![allow(unused)]
fn main() {
use tokio::sync::Mutex;
use std::sync::Arc;
use futures::stream::{self, StreamExt};

async fn process_requests(urls: Vec<String>) -> Vec<String> {
    // Fix 4: Process URLs concurrently with buffer_unordered
    let results: Vec<String> = stream::iter(urls)
        .map(|url| async move {
            let response = reqwest::get(&url).await.unwrap().text().await.unwrap();
            // Fix 2: Use tokio::time::sleep instead of std::thread::sleep
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            response
        })
        .buffer_unordered(10) // Up to 10 concurrent requests
        .collect()
        .await;

    // Fix 3: Parse after collecting — no mutex needed at all!
    for result in &results {
        expensive_parse(result).await;
    }

    results
}
}

Key takeaway: Often you can restructure async code to eliminate mutexes entirely. Collect results with streams/join, then process. Simpler, faster, no deadlock risk.


Debugging Async Code

Async stack traces are notoriously cryptic — they show the executor’s poll loop rather than your logical call chain. Here are the essential debugging tools.

tokio-console: Real-Time Task Inspector

tokio-console gives you an htop-like view of every spawned task: its state, poll duration, waker activity, and resource usage.

# Cargo.toml
[dependencies]
console-subscriber = "0.4"
tokio = { version = "1", features = ["full", "tracing"] }
#[tokio::main]
async fn main() {
    console_subscriber::init(); // Replaces the default tracing subscriber
    // ... rest of your application
}

Then in another terminal:

$ RUSTFLAGS="--cfg tokio_unstable" cargo run   # Required compile-time flag
$ tokio-console                                # Connects to 127.0.0.1:6669

tracing + #[instrument]: Structured Logging for Async

The tracing crate understands Future lifetimes. Spans stay open across .await points, giving you a logical call stack even when the OS thread has moved on:

#![allow(unused)]
fn main() {
use tracing::{info, instrument};

#[instrument(skip(db_pool), fields(user_id = %user_id))]
async fn handle_request(user_id: u64, db_pool: &Pool) -> Result<Response> {
    info!("looking up user");
    let user = db_pool.get_user(user_id).await?;  // span stays open across .await
    info!(email = %user.email, "found user");
    let orders = fetch_orders(user_id).await?;     // still the same span
    Ok(build_response(user, orders))
}
}

Output (with tracing_subscriber::fmt::json()):

{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"message":"looking up user"}
{"timestamp":"...","level":"INFO","span":{"name":"handle_request","user_id":"42"},"fields":{"email":"a@b.com"},"message":"found user"}

Debugging Checklist

SymptomLikely CauseTool
Task hangs foreverMissing .await or deadlocked Mutextokio-console task view
Low throughputBlocking call on async threadtokio-console poll-time histogram
Future is not SendNon-Send type held across .awaitCompiler error + #[instrument] to locate
Mysterious cancellationParent select! dropped a branchtracing span lifecycle events

Tip: Enable RUSTFLAGS="--cfg tokio_unstable" to get task-level metrics in tokio-console. This is a compile-time flag, not a runtime one.

Testing Async Code

Async code introduces unique testing challenges — you need a runtime, time control, and strategies for testing concurrent behavior.

Basic async tests with #[tokio::test]:

#![allow(unused)]
fn main() {
// Cargo.toml
// [dev-dependencies]
// tokio = { version = "1", features = ["full", "test-util"] }

#[tokio::test]
async fn test_basic_async() {
    let result = fetch_data().await;
    assert_eq!(result, "expected");
}

// Single-threaded test (useful for !Send types):
#[tokio::test(flavor = "current_thread")]
async fn test_single_threaded() {
    let rc = std::rc::Rc::new(42);
    let val = async { *rc }.await;
    assert_eq!(val, 42);
}

// Multi-threaded with explicit worker count:
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_behavior() {
    // Tests race conditions with real concurrency
    let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
    let c1 = counter.clone();
    let c2 = counter.clone();
    let (a, b) = tokio::join!(
        tokio::spawn(async move { c1.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
        tokio::spawn(async move { c2.fetch_add(1, std::sync::atomic::Ordering::SeqCst) }),
    );
    a.unwrap();
    b.unwrap();
    assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 2);
}
}

Time manipulation — test timeouts without actually waiting:

#![allow(unused)]
fn main() {
use tokio::time::{self, Duration, Instant};

#[tokio::test]
async fn test_timeout_behavior() {
    // Pause time — sleep() advances instantly, no real wall-clock delay
    time::pause();

    let start = Instant::now();
    time::sleep(Duration::from_secs(3600)).await; // "waits" 1 hour — takes 0ms
    assert!(start.elapsed() >= Duration::from_secs(3600));
    // Test ran in milliseconds, not an hour!
}

#[tokio::test]
async fn test_retry_timing() {
    time::pause();

    // Test that our retry logic waits the expected durations
    let start = Instant::now();
    let result = retry_with_backoff(|| async {
        Err::<(), _>("simulated failure")
    }, 3, Duration::from_secs(1))
    .await;

    assert!(result.is_err());
    // 1s + 2s + 4s = 7s of backoff (exponential)
    assert!(start.elapsed() >= Duration::from_secs(7));
}

#[tokio::test]
async fn test_deadline_exceeded() {
    time::pause();

    let result = tokio::time::timeout(
        Duration::from_secs(5),
        async {
            // Simulate slow operation
            time::sleep(Duration::from_secs(10)).await;
            "done"
        }
    ).await;

    assert!(result.is_err()); // Timed out
}
}

Mocking async dependencies — use trait objects or generics:

#![allow(unused)]
fn main() {
// Define a trait for the dependency:
trait Storage {
    async fn get(&self, key: &str) -> Option<String>;
    async fn set(&self, key: &str, value: String);
}

// Production implementation:
struct RedisStorage { /* ... */ }
impl Storage for RedisStorage {
    async fn get(&self, key: &str) -> Option<String> {
        // Real Redis call
        todo!()
    }
    async fn set(&self, key: &str, value: String) {
        todo!()
    }
}

// Test mock:
struct MockStorage {
    data: std::sync::Mutex<std::collections::HashMap<String, String>>,
}

impl MockStorage {
    fn new() -> Self {
        MockStorage { data: std::sync::Mutex::new(std::collections::HashMap::new()) }
    }
}

impl Storage for MockStorage {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.lock().unwrap().get(key).cloned()
    }
    async fn set(&self, key: &str, value: String) {
        self.data.lock().unwrap().insert(key.to_string(), value);
    }
}

// Tested function is generic over Storage:
async fn cache_lookup<S: Storage>(store: &S, key: &str) -> String {
    match store.get(key).await {
        Some(val) => val,
        None => {
            let val = "computed".to_string();
            store.set(key, val.clone()).await;
            val
        }
    }
}

#[tokio::test]
async fn test_cache_miss_then_hit() {
    let mock = MockStorage::new();

    // First call: miss → computes and stores
    let val = cache_lookup(&mock, "key1").await;
    assert_eq!(val, "computed");

    // Second call: hit → returns stored value
    let val = cache_lookup(&mock, "key1").await;
    assert_eq!(val, "computed");
    assert!(mock.data.lock().unwrap().contains_key("key1"));
}
}

Testing channels and task communication:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_producer_consumer() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(10);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
        // tx dropped here — channel closes
    });

    let mut received = Vec::new();
    while let Some(val) = rx.recv().await {
        received.push(val);
    }

    assert_eq!(received, vec![0, 1, 2, 3, 4]);
}
}
Test PatternWhen to UseKey Tool
#[tokio::test]All async teststokio = { features = ["macros", "rt"] }
time::pause()Testing timeouts, retries, periodic taskstokio::time::pause()
Trait mockingTesting business logic without I/OGeneric <S: Storage>
current_thread flavorTesting !Send types or deterministic scheduling#[tokio::test(flavor = "current_thread")]
multi_thread flavorTesting race conditions#[tokio::test(flavor = "multi_thread")]

Key Takeaways — Common Pitfalls

  • Never block the executor — use spawn_blocking for CPU/sync work
  • Never hold a MutexGuard across .await — scope locks tightly or use tokio::sync::Mutex
  • Cancellation drops the future instantly — use “cancel-safe” patterns for partial operations
  • Use tokio-console and #[tracing::instrument] for debugging async code
  • Test async code with #[tokio::test] and time::pause() for deterministic timing

See also: Ch 8 — Tokio Deep Dive for sync primitives, Ch 13 — Production Patterns for graceful shutdown and structured concurrency


13. Production Patterns 🔴

What you’ll learn:

  • Graceful shutdown with watch channels and select!
  • Backpressure: bounded channels prevent OOM
  • Structured concurrency: JoinSet and TaskTracker
  • Timeouts, retries, and exponential backoff
  • Error handling: thiserror vs anyhow, the double-? pattern
  • Tower: the middleware pattern used by axum, tonic, and hyper

Graceful Shutdown

Production servers must shut down cleanly — finish in-flight requests, flush buffers, close connections:

use tokio::signal;
use tokio::sync::watch;

async fn main_server() {
    // Create a shutdown signal channel
    let (shutdown_tx, shutdown_rx) = watch::channel(false);

    // Spawn the server
    let server_handle = tokio::spawn(run_server(shutdown_rx.clone()));

    // Wait for Ctrl+C
    signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
    println!("Shutdown signal received, finishing in-flight requests...");

    // Notify all tasks to shut down
    // NOTE: .unwrap() is used for brevity. Production code should handle
    // the case where all receivers have been dropped.
    shutdown_tx.send(true).unwrap();

    // Wait for server to finish (with timeout)
    match tokio::time::timeout(
        std::time::Duration::from_secs(30),
        server_handle,
    ).await {
        Ok(Ok(())) => println!("Server shut down gracefully"),
        Ok(Err(e)) => eprintln!("Server error: {e}"),
        Err(_) => eprintln!("Server shutdown timed out — forcing exit"),
    }
}

async fn run_server(mut shutdown: watch::Receiver<bool>) {
    loop {
        tokio::select! {
            // Accept new connections
            conn = accept_connection() => {
                let shutdown = shutdown.clone();
                tokio::spawn(handle_connection(conn, shutdown));
            }
            // Shutdown signal
            _ = shutdown.changed() => {
                if *shutdown.borrow() {
                    println!("Stopping accepting new connections");
                    break;
                }
            }
        }
    }
    // In-flight connections will finish on their own
    // because they have their own shutdown_rx clone
}

async fn handle_connection(conn: Connection, mut shutdown: watch::Receiver<bool>) {
    loop {
        tokio::select! {
            request = conn.next_request() => {
                // Process the request fully — don't abandon mid-request
                process_request(request).await;
            }
            _ = shutdown.changed() => {
                if *shutdown.borrow() {
                    // Finish current request, then exit
                    break;
                }
            }
        }
    }
}
sequenceDiagram
    participant OS as OS Signal
    participant Main as Main Task
    participant WCH as watch Channel
    participant W1 as Worker 1
    participant W2 as Worker 2

    OS->>Main: SIGINT (Ctrl+C)
    Main->>WCH: send(true)
    WCH-->>W1: changed()
    WCH-->>W2: changed()

    Note over W1: Finish current request
    Note over W2: Finish current request

    W1-->>Main: Task complete
    W2-->>Main: Task complete
    Main->>Main: All workers done → exit

Backpressure with Bounded Channels

Unbounded channels can lead to OOM if the producer is faster than the consumer. Always use bounded channels in production:

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

async fn backpressure_example() {
    // Bounded channel: max 100 items buffered
    let (tx, mut rx) = mpsc::channel::<WorkItem>(100);

    // Producer: slows down naturally when buffer is full
    let producer = tokio::spawn(async move {
        for i in 0..1_000_000 {
            // send() is async — waits if buffer is full
            // This creates natural backpressure!
            tx.send(WorkItem { id: i }).await.unwrap();
        }
    });

    // Consumer: processes items at its own pace
    let consumer = tokio::spawn(async move {
        while let Some(item) = rx.recv().await {
            process(item).await; // Slow processing is OK — producer waits
        }
    });

    let _ = tokio::join!(producer, consumer);
}

// Compare with unbounded — DANGEROUS:
// let (tx, rx) = mpsc::unbounded_channel(); // No backpressure!
// Producer can fill memory indefinitely
}

Structured Concurrency: JoinSet and TaskTracker

JoinSet groups related tasks and ensures they all complete:

#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};

async fn structured_concurrency() {
    let mut set = JoinSet::new();

    // Spawn a batch of tasks
    for url in get_urls() {
        set.spawn(async move {
            fetch_and_process(url).await
        });
    }

    // Collect all results (order not guaranteed)
    let mut results = Vec::new();
    while let Some(result) = set.join_next().await {
        match result {
            Ok(Ok(data)) => results.push(data),
            Ok(Err(e)) => eprintln!("Task error: {e}"),
            Err(e) => eprintln!("Task panicked: {e}"),
        }
    }

    // ALL tasks are done here — no dangling background work
    println!("Processed {} items", results.len());
}

// TaskTracker (tokio-util 0.7.9+) — wait for all spawned tasks
use tokio_util::task::TaskTracker;

async fn with_tracker() {
    let tracker = TaskTracker::new();

    for i in 0..10 {
        tracker.spawn(async move {
            sleep(Duration::from_millis(100 * i)).await;
            println!("Task {i} done");
        });
    }

    tracker.close(); // No more tasks will be added
    tracker.wait().await; // Wait for ALL tracked tasks
    println!("All tasks finished");
}
}

Timeouts and Retries

#![allow(unused)]
fn main() {
use tokio::time::{timeout, sleep, Duration};

// Simple timeout
async fn with_timeout() -> Result<Response, Error> {
    match timeout(Duration::from_secs(5), fetch_data()).await {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(e)) => Err(Error::Fetch(e)),
        Err(_) => Err(Error::Timeout),
    }
}

// Exponential backoff retry
async fn retry_with_backoff<F, Fut, T, E>(
    max_attempts: u32,
    base_delay_ms: u64,
    operation: F,
) -> Result<T, E>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T, E>>,
    E: std::fmt::Display,
{
    let mut delay = Duration::from_millis(base_delay_ms);

    for attempt in 1..=max_attempts {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) => {
                if attempt == max_attempts {
                    eprintln!("Final attempt {attempt} failed: {e}");
                    return Err(e);
                }
                eprintln!("Attempt {attempt} failed: {e}, retrying in {delay:?}");
                sleep(delay).await;
                delay *= 2; // Exponential backoff
            }
        }
    }
    unreachable!()
}

// Usage:
// let result = retry_with_backoff(3, 100, || async {
//     reqwest::get("https://api.example.com/data").await
// }).await?;
}

Production tip — add jitter: The function above uses pure exponential backoff, but in production many clients failing simultaneously will all retry at the same intervals (thundering herd). Add random jitter — e.g., sleep(delay + rand_jitter) where rand_jitter is 0..delay/4 — so retries spread out over time.

Error Handling in Async Code

Async introduces unique error propagation challenges — spawned tasks create error boundaries, timeout errors wrap inner errors, and ? interacts differently when futures cross task boundaries.

thiserror vs anyhow — choosing the right tool:

#![allow(unused)]
fn main() {
// thiserror: Define typed errors for libraries and public APIs
// Every variant is explicit — callers can match on specific errors
use thiserror::Error;

#[derive(Error, Debug)]
enum DiagError {
    #[error("IPMI command failed: {0}")]
    Ipmi(#[from] IpmiError),

    #[error("Sensor {sensor} out of range: {value}°C (max {max}°C)")]
    OverTemp { sensor: String, value: f64, max: f64 },

    #[error("Operation timed out after {0:?}")]
    Timeout(std::time::Duration),

    #[error("Task panicked: {0}")]
    TaskPanic(#[from] tokio::task::JoinError),
}

// anyhow: Quick error handling for applications and prototypes
// Wraps any error — no need to define types for every case
use anyhow::{Context, Result};

async fn run_diagnostics() -> Result<()> {
    let config = load_config()
        .await
        .context("Failed to load diagnostic config")?;  // Adds context

    let result = run_gpu_test(&config)
        .await
        .context("GPU diagnostic failed")?;              // Chains context

    Ok(())
}
// anyhow prints: "GPU diagnostic failed: IPMI command failed: timeout"
}
CrateUse WhenError TypeMatching
thiserrorLibrary code, public APIsenum MyError { ... }match err { MyError::Timeout => ... }
anyhowApplications, CLI tools, scriptsanyhow::Error (type-erased)err.downcast_ref::<MyError>()
Both togetherLibrary exposes thiserror, app wraps with anyhowBest of bothLibrary errors are typed, app doesn’t care

The double-? pattern with tokio::spawn:

#![allow(unused)]
fn main() {
use thiserror::Error;
use tokio::task::JoinError;

#[derive(Error, Debug)]
enum AppError {
    #[error("HTTP error: {0}")]
    Http(#[from] reqwest::Error),

    #[error("Task panicked: {0}")]
    TaskPanic(#[from] JoinError),
}

async fn spawn_with_errors() -> Result<String, AppError> {
    let handle = tokio::spawn(async {
        let resp = reqwest::get("https://example.com").await?;
        Ok::<_, reqwest::Error>(resp.text().await?)
    });

    // Double ?: First ? unwraps JoinError (task panic), second ? unwraps inner Result
    let result = handle.await??;
    Ok(result)
}
}

The error boundary problemtokio::spawn erases context:

#![allow(unused)]
fn main() {
// ❌ Error context is lost across spawn boundaries:
async fn bad_error_handling() -> Result<()> {
    let handle = tokio::spawn(async {
        some_fallible_work().await  // Returns Result<T, SomeError>
    });

    // handle.await returns Result<Result<T, SomeError>, JoinError>
    // The inner error has no context about what task failed
    let result = handle.await??;
    Ok(())
}

// ✅ Add context at the spawn boundary:
async fn good_error_handling() -> Result<()> {
    let handle = tokio::spawn(async {
        some_fallible_work()
            .await
            .context("worker task failed")  // Context before crossing boundary
    });

    let result = handle.await
        .context("worker task panicked")??;  // Context for JoinError too
    Ok(())
}
}

Timeout errors — wrapping vs replacing:

#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};

async fn with_timeout_context() -> Result<String, DiagError> {
    let dur = Duration::from_secs(30);
    match timeout(dur, fetch_sensor_data()).await {
        Ok(Ok(data)) => Ok(data),
        Ok(Err(e)) => Err(e),                      // Inner error preserved
        Err(_) => Err(DiagError::Timeout(dur)),     // Timeout → typed error
    }
}
}

Tower: The Middleware Pattern

The Tower crate defines a composable Service trait — the backbone of async middleware in Rust (used by axum, tonic, hyper):

#![allow(unused)]
fn main() {
// Tower's core trait (simplified):
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}
}

Middleware wraps a Service to add cross-cutting behavior — logging, timeouts, rate-limiting — without modifying inner logic:

#![allow(unused)]
fn main() {
use tower::{ServiceBuilder, timeout::TimeoutLayer, limit::RateLimitLayer};
use std::time::Duration;

let service = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_secs(10)))       // Outermost: timeout
    .layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Then: rate limit
    .service(my_handler);                                     // Innermost: your code
}

Why this matters: If you’ve used ASP.NET middleware or Express.js middleware, Tower is the Rust equivalent. It’s how production Rust services add cross-cutting concerns without code duplication.

Exercise: Graceful Shutdown with Worker Pool

🏋️ Exercise (click to expand)

Challenge: Build a task processor with a channel-based work queue, N worker tasks, and graceful shutdown on Ctrl+C. Workers should finish in-flight work before exiting.

🔑 Solution
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};

struct WorkItem { id: u64, payload: String }

#[tokio::main]
async fn main() {
    let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
    let (shutdown_tx, shutdown_rx) = watch::channel(false);
    let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));

    let mut handles = Vec::new();
    for id in 0..4 {
        let rx = work_rx.clone();
        let mut shutdown = shutdown_rx.clone();
        handles.push(tokio::spawn(async move {
            loop {
                let item = {
                    let mut rx = rx.lock().await;
                    tokio::select! {
                        item = rx.recv() => item,
                        _ = shutdown.changed() => {
                            if *shutdown.borrow() { None } else { continue }
                        }
                    }
                };
                match item {
                    Some(work) => {
                        println!("Worker {id}: processing {}", work.id);
                        sleep(Duration::from_millis(200)).await;
                    }
                    None => break,
                }
            }
        }));
    }

    // Submit work
    for i in 0..20 {
        let _ = work_tx.send(WorkItem { id: i, payload: format!("task-{i}") }).await;
        sleep(Duration::from_millis(50)).await;
    }

    // On Ctrl+C: signal shutdown, wait for workers
    // NOTE: .unwrap() is used for brevity — handle errors in production.
    tokio::signal::ctrl_c().await.unwrap();
    shutdown_tx.send(true).unwrap();
    for h in handles { let _ = h.await; }
    println!("Shut down cleanly.");
}

Key Takeaways — Production Patterns

  • Use a watch channel + select! for coordinated graceful shutdown
  • Bounded channels (mpsc::channel(N)) provide backpressure — senders block when the buffer is full
  • JoinSet and TaskTracker provide structured concurrency: track, abort, and await task groups
  • Always add timeouts to network operations — tokio::time::timeout(dur, fut)
  • Tower’s Service trait is the standard middleware pattern for production Rust services

See also: Ch 8 — Tokio Deep Dive for channels and sync primitives, Ch 12 — Common Pitfalls for cancellation hazards during shutdown


14. Exercises

Exercises

Exercise 1: Async Echo Server

Build a TCP echo server that handles multiple clients concurrently.

Requirements:

  • Listen on 127.0.0.1:8080
  • Accept connections and echo back each line
  • Handle client disconnections gracefully
  • Print a log when clients connect/disconnect
🔑 Solution
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Echo server listening on :8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("[{addr}] Connected");

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                line.clear();
                match reader.read_line(&mut line).await {
                    Ok(0) => {
                        println!("[{addr}] Disconnected");
                        break;
                    }
                    Ok(_) => {
                        print!("[{addr}] Echo: {line}");
                        if writer.write_all(line.as_bytes()).await.is_err() {
                            println!("[{addr}] Write error, disconnecting");
                            break;
                        }
                    }
                    Err(e) => {
                        eprintln!("[{addr}] Read error: {e}");
                        break;
                    }
                }
            }
        });
    }
}

Exercise 2: Concurrent URL Fetcher with Rate Limiting

Fetch a list of URLs concurrently, with at most 5 concurrent requests.

🔑 Solution
#![allow(unused)]
fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn fetch_urls(urls: Vec<String>) -> Vec<Result<String, String>> {
    // buffer_unordered(5) ensures at most 5 futures are polled
    // concurrently — no separate Semaphore needed here.
    let results: Vec<_> = stream::iter(urls)
        .map(|url| {
            async move {
                println!("Fetching: {url}");

                match reqwest::get(&url).await {
                    Ok(resp) => match resp.text().await {
                        Ok(body) => Ok(body),
                        Err(e) => Err(format!("{url}: {e}")),
                    },
                    Err(e) => Err(format!("{url}: {e}")),
                }
            }
        })
        .buffer_unordered(5) // ← This alone limits concurrency to 5
        .collect()
        .await;

    results
}

// NOTE: Use Semaphore when you need to limit concurrency across
// independently spawned tasks (tokio::spawn). Use buffer_unordered
// when processing a stream. Don't combine both for the same limit.
}

Exercise 3: Graceful Shutdown with Worker Pool

Build a task processor with:

  • A channel-based work queue
  • N worker tasks consuming from the queue
  • Graceful shutdown on Ctrl+C: stop accepting, finish in-flight work
🔑 Solution
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, Duration};

struct WorkItem {
    id: u64,
    payload: String,
}

#[tokio::main]
async fn main() {
    let (work_tx, work_rx) = mpsc::channel::<WorkItem>(100);
    let (shutdown_tx, shutdown_rx) = watch::channel(false);

    // Spawn 4 workers
    let mut worker_handles = Vec::new();
    let work_rx = std::sync::Arc::new(tokio::sync::Mutex::new(work_rx));

    for id in 0..4 {
        let rx = work_rx.clone();
        let mut shutdown = shutdown_rx.clone();
        let handle = tokio::spawn(async move {
            loop {
                let item = {
                    let mut rx = rx.lock().await;
                    tokio::select! {
                        item = rx.recv() => item,
                        _ = shutdown.changed() => {
                            if *shutdown.borrow() { None } else { continue }
                        }
                    }
                };

                match item {
                    Some(work) => {
                        println!("Worker {id}: processing item {}", work.id);
                        sleep(Duration::from_millis(200)).await; // Simulate work
                        println!("Worker {id}: done with item {}", work.id);
                    }
                    None => {
                        println!("Worker {id}: channel closed, exiting");
                        break;
                    }
                }
            }
        });
        worker_handles.push(handle);
    }

    // Producer: submit some work
    let producer = tokio::spawn(async move {
        for i in 0..20 {
            let _ = work_tx.send(WorkItem {
                id: i,
                payload: format!("task-{i}"),
            }).await;
            sleep(Duration::from_millis(50)).await;
        }
    });

    // Wait for Ctrl+C
    tokio::signal::ctrl_c().await.unwrap();
    println!("\nShutdown signal received!");
    shutdown_tx.send(true).unwrap();
    producer.abort(); // Cancel the producer task

    // Wait for workers to finish
    for handle in worker_handles {
        let _ = handle.await;
    }
    println!("All workers shut down. Goodbye!");
}

Exercise 4: Build a Simple Async Mutex from Scratch

Implement an async-aware mutex using channels (without using tokio::sync::Mutex).

Hint: Use a tokio::sync::mpsc channel with capacity 1 as a semaphore.

🔑 Solution
#![allow(unused)]
fn main() {
use std::cell::UnsafeCell;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

pub struct SimpleAsyncMutex<T> {
    data: Arc<UnsafeCell<T>>,
    semaphore: Arc<Semaphore>,
}

// SAFETY: Access to T is serialized by the semaphore (max 1 permit).
unsafe impl<T: Send> Send for SimpleAsyncMutex<T> {}
unsafe impl<T: Send> Sync for SimpleAsyncMutex<T> {}

pub struct SimpleGuard<T> {
    data: Arc<UnsafeCell<T>>,
    _permit: OwnedSemaphorePermit, // Dropped on guard drop → releases lock
}

impl<T> SimpleAsyncMutex<T> {
    pub fn new(value: T) -> Self {
        SimpleAsyncMutex {
            data: Arc::new(UnsafeCell::new(value)),
            semaphore: Arc::new(Semaphore::new(1)),
        }
    }

    pub async fn lock(&self) -> SimpleGuard<T> {
        let permit = self.semaphore.clone().acquire_owned().await.unwrap();
        SimpleGuard {
            data: self.data.clone(),
            _permit: permit,
        }
    }
}

impl<T> std::ops::Deref for SimpleGuard<T> {
    type Target = T;
    fn deref(&self) -> &T {
        // SAFETY: We hold the only semaphore permit, so no other
        // SimpleGuard exists → exclusive access is guaranteed.
        unsafe { &*self.data.get() }
    }
}

impl<T> std::ops::DerefMut for SimpleGuard<T> {
    fn deref_mut(&mut self) -> &mut T {
        // SAFETY: Same reasoning — single permit guarantees exclusivity.
        unsafe { &mut *self.data.get() }
    }
}

// When SimpleGuard is dropped, _permit is dropped,
// which releases the semaphore permit — another lock() can proceed.

// Usage:
// let mutex = SimpleAsyncMutex::new(vec![1, 2, 3]);
// {
//     let mut guard = mutex.lock().await;
//     guard.push(4);
// } // permit released here
}

Key takeaway: Async mutexes are typically built on top of semaphores. The semaphore provides the async wait mechanism — when locked, acquire() suspends the task until the permit is released. This is exactly how tokio::sync::Mutex works internally.

Why UnsafeCell and not std::sync::Mutex? A previous version of this exercise used Arc<Mutex<T>> with Deref/DerefMut calling .lock().unwrap(). That doesn’t compile — the returned &T borrows from a temporary MutexGuard that’s dropped immediately. UnsafeCell avoids the intermediate guard, and the semaphore-based serialization makes the unsafe sound.


Exercise 5: Stream Pipeline

Build a data processing pipeline using streams:

  1. Generate numbers 1..=100
  2. Filter to even numbers
  3. Map each to its square
  4. Process 10 at a time concurrently (simulate with sleep)
  5. Collect results
🔑 Solution
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let results: Vec<u64> = stream::iter(1u64..=100)
        // Step 2: Filter evens
        .filter(|x| futures::future::ready(x % 2 == 0))
        // Step 3: Square each
        .map(|x| x * x)
        // Step 4: Process concurrently (simulate async work)
        .map(|x| async move {
            sleep(Duration::from_millis(50)).await;
            println!("Processed: {x}");
            x
        })
        .buffer_unordered(10) // 10 concurrent
        // Step 5: Collect
        .collect()
        .await;

    println!("Got {} results", results.len());
    println!("Sum: {}", results.iter().sum::<u64>());
}

Exercise 6: Implement Select with Timeout

Without using tokio::select! or tokio::time::timeout, implement a function that races a future against a deadline and returns Either::Left(result) or Either::Right(()) on timeout.

Hint: Build on the Select combinator from Chapter 6 and the TimerFuture from the same chapter.

🔑 Solution
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pub enum Either<A, B> {
    Left(A),
    Right(B),
}

pub struct Timeout<F> {
    future: F,
    timer: TimerFuture, // From Chapter 6
}

impl<F: Future + Unpin> Timeout<F> {
    pub fn new(future: F, duration: Duration) -> Self {
        Timeout {
            future,
            timer: TimerFuture::new(duration),
        }
    }
}

impl<F: Future + Unpin> Future for Timeout<F> {
    type Output = Either<F::Output, ()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Check if the main future is done
        if let Poll::Ready(val) = Pin::new(&mut self.future).poll(cx) {
            return Poll::Ready(Either::Left(val));
        }

        // Check if the timer expired
        if let Poll::Ready(()) = Pin::new(&mut self.timer).poll(cx) {
            return Poll::Ready(Either::Right(()));
        }

        Poll::Pending
    }
}

// Usage:
// match Timeout::new(fetch_data(), Duration::from_secs(5)).await {
//     Either::Left(data) => println!("Got data: {data}"),
//     Either::Right(()) => println!("Timed out!"),
// }

Key takeaway: select/timeout is just polling two futures and seeing which completes first. The entire async ecosystem is built from this simple primitive: poll, Pending/Ready, Waker.


Summary and Reference Card

Quick Reference Card

Async Mental Model

┌─────────────────────────────────────────────────────┐
│  async fn → State Machine (enum) → impl Future     │
│  .await   → poll() the inner future                 │
│  executor → loop { poll(); sleep_until_woken(); }   │
│  waker    → "hey executor, poll me again"           │
│  Pin      → "promise I won't move in memory"        │
└─────────────────────────────────────────────────────┘

Common Patterns Cheat Sheet

GoalUse
Run two futures concurrentlytokio::join!(a, b)
Race two futurestokio::select! { ... }
Spawn a background tasktokio::spawn(async { ... })
Run blocking code in asynctokio::task::spawn_blocking(\|\| { ... })
Limit concurrencySemaphore::new(N)
Collect many task resultsJoinSet
Share state across tasksArc<Mutex<T>> or channels
Graceful shutdownwatch::channel + select!
Process a stream N-at-a-time.buffer_unordered(N)
Timeout a futuretokio::time::timeout(dur, fut)
Retry with backoffCustom combinator (see Ch. 13)

Pinning Quick Reference

SituationUse
Pin a future on the heapBox::pin(fut)
Pin a future on the stacktokio::pin!(fut)
Pin an Unpin typePin::new(&mut val) — safe, free
Return a pinned trait object-> Pin<Box<dyn Future<Output = T> + Send>>

Channel Selection Guide

ChannelProducersConsumersValuesUse When
mpscN1StreamWork queues, event buses
oneshot11SingleRequest/response, completion notification
broadcastNNAll recv allFan-out notifications, shutdown signals
watch1NLatest onlyConfig updates, health status

Mutex Selection Guide

MutexUse When
std::sync::MutexLock is held briefly, never across .await
tokio::sync::MutexLock must be held across .await
parking_lot::MutexHigh contention, no .await, need performance
tokio::sync::RwLockMany readers, few writers, locks cross .await

Decision Quick Reference

Need concurrency?
├── I/O-bound → async/await
├── CPU-bound → rayon / std::thread
└── Mixed → spawn_blocking for CPU parts

Choosing runtime?
├── Server app → tokio
├── Library → runtime-agnostic (futures crate)
├── Embedded → embassy
└── Minimal → smol

Need concurrent futures?
├── Can be 'static + Send → tokio::spawn
├── Can be 'static + !Send → LocalSet
├── Can't be 'static → FuturesUnordered
└── Need to track/abort → JoinSet

Common Error Messages and Fixes

ErrorCauseFix
future is not SendHolding !Send type across .awaitScope the value so it’s dropped before .await, or use current_thread runtime
borrowed value does not live long enough in spawntokio::spawn requires 'staticUse Arc, clone(), or FuturesUnordered
the trait Future is not implemented for ()Missing .awaitAdd .await to the async call
cannot borrow as mutable in pollSelf-referential borrowUse Pin<&mut Self> correctly (see Ch. 4)
Program hangs silentlyForgot to call waker.wake()Ensure every Pending path registers and triggers the waker

Further Reading

ResourceWhy
Tokio TutorialOfficial hands-on guide — excellent for first projects
Async Book (official)Covers Future, Pin, Stream at the language level
Jon Gjengset — Crust of Rust: async/await2-hour deep dive into internals with live coding
Alice Ryhl — Actors with TokioProduction architecture pattern for stateful services
Without Boats — Pin, Unpin, and why Rust needs themThe original motivation from the language designer
Tokio mini-RedisComplete async Rust project — study-quality production code
Tower documentationMiddleware/service architecture used by axum, tonic, hyper

End of Async Rust Training Guide

Capstone Project: Async Chat Server

This project integrates patterns from across the book into a single, production-style application. You’ll build a multi-room async chat server using tokio, channels, streams, graceful shutdown, and proper error handling.

Estimated time: 4–6 hours | Difficulty: ★★★

What you’ll practice:

  • tokio::spawn and the 'static requirement (Ch 8)
  • Channels: mpsc for messages, broadcast for rooms, watch for shutdown (Ch 8)
  • Streams: reading lines from TCP connections (Ch 11)
  • Common pitfalls: cancellation safety, MutexGuard across .await (Ch 12)
  • Production patterns: graceful shutdown, backpressure (Ch 13)
  • Async traits for pluggable backends (Ch 10)

The Problem

Build a TCP chat server where:

  1. Clients connect via TCP and join named rooms
  2. Messages are broadcast to all clients in the same room
  3. Commands: /join <room>, /nick <name>, /rooms, /quit
  4. The server shuts down gracefully on Ctrl+C — finishing in-flight messages
graph LR
    C1["Client 1<br/>(Alice)"] -->|TCP| SERVER["Chat Server"]
    C2["Client 2<br/>(Bob)"] -->|TCP| SERVER
    C3["Client 3<br/>(Carol)"] -->|TCP| SERVER

    SERVER --> R1["#general<br/>broadcast channel"]
    SERVER --> R2["#rust<br/>broadcast channel"]

    R1 -->|msg| C1
    R1 -->|msg| C2
    R2 -->|msg| C3

    CTRL["Ctrl+C"] -->|watch| SERVER

    style SERVER fill:#e8f4f8,stroke:#2980b9,color:#000
    style R1 fill:#d4efdf,stroke:#27ae60,color:#000
    style R2 fill:#d4efdf,stroke:#27ae60,color:#000
    style CTRL fill:#fadbd8,stroke:#e74c3c,color:#000

Step 1: Basic TCP Accept Loop

Start with a server that accepts connections and echoes lines back:

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Chat server listening on :8080");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("[{addr}] Connected");

        tokio::spawn(async move {
            let (reader, mut writer) = socket.into_split();
            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                line.clear();
                match reader.read_line(&mut line).await {
                    Ok(0) | Err(_) => break,
                    Ok(_) => {
                        let _ = writer.write_all(line.as_bytes()).await;
                    }
                }
            }
            println!("[{addr}] Disconnected");
        });
    }
}

Your job: Verify this compiles and works with telnet localhost 8080.

Step 2: Room State with Broadcast Channels

Each room is a broadcast::Sender. All clients in a room subscribe to receive messages.

#![allow(unused)]
fn main() {
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};

type RoomMap = Arc<RwLock<HashMap<String, broadcast::Sender<String>>>>;

fn get_or_create_room(rooms: &mut HashMap<String, broadcast::Sender<String>>, name: &str) -> broadcast::Sender<String> {
    rooms.entry(name.to_string())
        .or_insert_with(|| {
            let (tx, _) = broadcast::channel(100); // 100-message buffer
            tx
        })
        .clone()
}
}

Your job: Implement room state so that:

  • Clients start in #general
  • /join <room> switches rooms (unsubscribe from old, subscribe to new)
  • Messages are broadcast to all clients in the sender’s current room
💡 Hint — Client task structure

Each client task needs two concurrent loops:

  1. Read from TCP → parse commands or broadcast to room
  2. Read from broadcast receiver → write to TCP

Use tokio::select! to run both:

#![allow(unused)]
fn main() {
loop {
    tokio::select! {
        // Client sent us a line
        result = reader.read_line(&mut line) => {
            match result {
                Ok(0) | Err(_) => break,
                Ok(_) => {
                    // Parse command or broadcast message
                }
            }
        }
        // Room broadcast received
        result = room_rx.recv() => {
            match result {
                Ok(msg) => {
                    let _ = writer.write_all(msg.as_bytes()).await;
                }
                Err(_) => break,
            }
        }
    }
}
}

Step 3: Commands

Implement the command protocol:

CommandAction
/join <room>Leave current room, join new room, announce in both
/nick <name>Change display name
/roomsList all active rooms and member counts
/quitDisconnect gracefully
Anything elseBroadcast as a chat message

Your job: Parse commands from the input line. For /rooms, you’ll need to read from the RoomMap — use RwLock::read() to avoid blocking other clients.

Step 4: Graceful Shutdown

Add Ctrl+C handling so the server:

  1. Stops accepting new connections
  2. Sends “Server shutting down…” to all rooms
  3. Waits for in-flight messages to drain
  4. Exits cleanly
#![allow(unused)]
fn main() {
use tokio::sync::watch;

let (shutdown_tx, shutdown_rx) = watch::channel(false);

// In the accept loop:
loop {
    tokio::select! {
        result = listener.accept() => {
            let (socket, addr) = result?;
            // spawn client task with shutdown_rx.clone()
        }
        _ = tokio::signal::ctrl_c() => {
            println!("Shutdown signal received");
            shutdown_tx.send(true)?;
            break;
        }
    }
}
}

Your job: Add shutdown_rx.changed() to each client’s select! loop so clients exit when shutdown is signaled.

Step 5: Error Handling and Edge Cases

Production-harden the server:

  1. Lagging receivers: broadcast::recv() returns RecvError::Lagged(n) if a slow client misses messages. Handle it gracefully (log + continue, don’t crash).
  2. Nickname validation: Reject empty or too-long nicknames.
  3. Backpressure: The broadcast channel buffer is bounded (100). If a client can’t keep up, they get the Lagged error.
  4. Timeout: Disconnect clients that are idle for >5 minutes.
#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};

// Wrap the read in a timeout:
match timeout(Duration::from_secs(300), reader.read_line(&mut line)).await {
    Ok(Ok(0)) | Ok(Err(_)) | Err(_) => break, // EOF, error, or timeout
    Ok(Ok(_)) => { /* process line */ }
}
}

Step 6: Integration Test

Write a test that starts the server, connects two clients, and verifies message delivery:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn two_clients_can_chat() {
    // Start server in background
    let server = tokio::spawn(run_server("127.0.0.1:0")); // Port 0 = OS picks

    // Connect two clients
    let mut client1 = TcpStream::connect(addr).await.unwrap();
    let mut client2 = TcpStream::connect(addr).await.unwrap();

    // Client 1 sends a message
    client1.write_all(b"Hello from client 1\n").await.unwrap();

    // Client 2 should receive it
    let mut buf = vec![0u8; 1024];
    let n = client2.read(&mut buf).await.unwrap();
    let msg = String::from_utf8_lossy(&buf[..n]);
    assert!(msg.contains("Hello from client 1"));
}
}

Evaluation Criteria

CriterionTarget
ConcurrencyMultiple clients in multiple rooms, no blocking
CorrectnessMessages only go to clients in the same room
Graceful shutdownCtrl+C drains messages and exits cleanly
Error handlingLagged receivers, disconnections, timeouts handled
Code organizationClean separation: accept loop, client task, room state
TestingAt least 2 integration tests

Extension Ideas

Once the basic chat server works, try these enhancements:

  1. Persistent history: Store last N messages per room; replay to new joiners
  2. WebSocket support: Accept both TCP and WebSocket clients using tokio-tungstenite
  3. Rate limiting: Use tokio::time::Interval to limit messages per client per second
  4. Metrics: Track connected clients, messages/sec, room count via prometheus crate
  5. TLS: Add tokio-rustls for encrypted connections