Home Wiki Programming & Logic Async Programming with Tokio: Thousands of Tasks on a Single Thread
Programming & Logic

Async Programming with Tokio: Thousands of Tasks on a Single Thread

Async vs Threads: When to Use Which

Rust offers two main concurrency models. Choosing the right one depends on what your code is waiting for.

Threads are best for CPU-bound work like image processing or heavy calculations. Each thread gets its own OS-level execution context.

Async is best for I/O-bound work like reading sensors over a network, querying databases, or handling HTTP requests. Async tasks share a thread pool and yield while waiting, allowing thousands of concurrent operations with minimal overhead.

In an industrial setting, a factory monitoring system that reads from 500 sensors over Modbus TCP is I/O-bound. An analytics module crunching vibration FFT data is CPU-bound.

// CPU-bound: use threads
std::thread::spawn(|| {
    let result = compute_vibration_fft(&samples);
    println!("FFT complete: {} peaks found", result.peaks.len());
});

// I/O-bound: use async
async fn read_sensor(addr: &str) -> Result<f64, SensorError> {
    let response = tokio::net::TcpStream::connect(addr).await?;
    // ... parse response
    Ok(42.5)
}

Futures and the async/await Syntax

A Future in Rust is a value that will produce a result at some point. It does nothing until you .await it. This is called lazy evaluation.

When you mark a function as async, it returns a Future instead of executing immediately. The .await keyword suspends the current task, letting other tasks run until the result is ready.

// This function returns a Future<Output = f64>, not f64 directly
async fn read_temperature(sensor_id: u32) -> f64 {
    // Simulating a network read that takes time
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    22.5 + sensor_id as f64 * 0.1
}

async fn monitor() {
    // Nothing happens until we await
    let temp = read_temperature(1).await;
    println!("Sensor 1: {temp}C");
}

The Tokio Runtime

Async code needs a runtime to execute futures. Tokio is the most widely used async runtime in Rust. The #[tokio::main] macro sets it up for you.

// Multi-threaded runtime (default) - uses all CPU cores
#[tokio::main]
async fn main() {
    println!("Running on a multi-threaded runtime");
}

// Single-threaded runtime - lighter, good for simple services
#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Running on a single thread");
}

For a factory controller handling many sensors, use the multi-threaded runtime. For a small Raspberry Pi gateway reading a few devices, the single-threaded flavor saves resources.

Add Tokio to your project with: cargo add tokio --features full

Spawning Async Tasks with tokio::spawn

tokio::spawn launches an async task that runs concurrently. Each spawned task is scheduled independently by the runtime.

use tokio::task::JoinHandle;

async fn read_sensor(id: u32) -> f64 {
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    20.0 + id as f64 * 1.5
}

#[tokio::main]
async fn main() {
    // Spawn 10 sensor reads concurrently
    let mut handles: Vec<JoinHandle<f64>> = Vec::new();

    for id in 1..=10 {
        let handle = tokio::spawn(async move {
            read_sensor(id).await
        });
        handles.push(handle);
    }

    // Collect all results
    for (i, handle) in handles.into_iter().enumerate() {
        let value = handle.await.expect("Task failed");
        println!("Sensor {}: {:.1}C", i + 1, value);
    }
}

All 10 reads happen concurrently, completing in roughly 50ms total instead of 500ms sequentially.

Timers and Intervals

tokio::time::interval creates a repeating timer, perfect for periodic sensor polling in production systems.

use tokio::time::{interval, Duration};

async fn poll_pressure_sensor() -> f64 {
    // Simulate reading from hardware
    42.0
}

#[tokio::main]
async fn main() {
    let mut ticker = interval(Duration::from_secs(1));

    loop {
        ticker.tick().await; // Waits until the next tick
        let pressure = poll_pressure_sensor().await;
        println!("Pressure: {pressure} bar");

        if pressure > 50.0 {
            eprintln!("ALARM: Overpressure detected!");
            break;
        }
    }
}

The interval automatically compensates for drift, keeping the polling rate steady even if processing takes variable time.

select: Racing Multiple Futures

tokio::select! waits for the first of several futures to complete. This is useful when you need to handle whichever event occurs first, such as a sensor reading or a timeout.

use tokio::time::{sleep, Duration};

async fn read_slow_sensor() -> f64 {
    sleep(Duration::from_secs(5)).await;
    25.0
}

#[tokio::main]
async fn main() {
    tokio::select! {
        value = read_slow_sensor() => {
            println!("Got reading: {value}C");
        }
        _ = sleep(Duration::from_secs(2)) => {
            eprintln!("TIMEOUT: Sensor did not respond in 2 seconds");
        }
    }
    // The timeout branch wins because the sensor takes 5 seconds
}

Practical Example: Async Sensor Polling System

This example monitors multiple sensors concurrently, applying timeouts and collecting results into a dashboard snapshot.

use std::time::Duration;
use tokio::time::{sleep, timeout};

#[derive(Debug)]
struct SensorReading {
    id: u32,
    value: f64,
    status: &'static str,
}

async fn read_sensor(id: u32) -> f64 {
    // Simulate variable network latency
    let delay = if id % 7 == 0 { 3000 } else { 50 + id as u64 * 10 };
    sleep(Duration::from_millis(delay)).await;
    20.0 + (id as f64 * 0.3)
}

async fn read_with_timeout(id: u32) -> SensorReading {
    match timeout(Duration::from_secs(1), read_sensor(id)).await {
        Ok(value) => SensorReading { id, value, status: "OK" },
        Err(_) => SensorReading { id, value: 0.0, status: "TIMEOUT" },
    }
}

#[tokio::main]
async fn main() {
    println!("Starting sensor poll for 100 sensors...");

    let mut handles = Vec::new();
    for id in 1..=100 {
        handles.push(tokio::spawn(read_with_timeout(id)));
    }

    let mut ok_count = 0;
    let mut fail_count = 0;

    for handle in handles {
        let reading = handle.await.expect("Task panicked");
        if reading.status == "OK" {
            ok_count += 1;
        } else {
            fail_count += 1;
            eprintln!("Sensor {} failed: {}", reading.id, reading.status);
        }
    }

    println!("Poll complete: {ok_count} OK, {fail_count} failed");
}

All 100 sensors are polled concurrently. Sensors that do not respond within one second are marked as timed out. The entire poll completes in about one second regardless of how many sensors exist.

Summary

  • Use async for I/O-bound work (networking, sensors) and threads for CPU-bound work.
  • A Future is lazy and does nothing until .await is called.
  • Tokio provides the runtime; use #[tokio::main] to set it up.
  • tokio::spawn runs tasks concurrently on the runtime.
  • tokio::time::interval handles periodic polling for production monitoring.
  • tokio::select! races multiple futures and handles whichever finishes first.
  • Combine spawning, timeouts, and select to build robust industrial monitoring systems.
async await Tokio Future spawn runtime غير متزامن المهام المستقبل وقت التشغيل المراقبة المتوازية الأداء