Home Wiki Programming & Logic Concurrency and Threads in Rust: Safe Parallel Processing for Industrial Data
Programming & Logic

Concurrency and Threads in Rust: Safe Parallel Processing for Industrial Data

Why Concurrency Matters in Factories

A modern factory has hundreds of machines generating data simultaneously -- temperature sensors, pressure gauges, vibration monitors, and conveyor speed trackers. A single-threaded program that reads sensors one by one cannot keep up. Concurrency lets your software handle multiple data streams in parallel, ensuring no reading is missed and alarms trigger instantly.

Rust makes concurrency safe at compile time. The ownership system and type checker prevent data races before your code ever runs, which is critical in industrial environments where bugs can cause equipment damage or safety hazards.

Spawning Threads with std::thread

The simplest form of concurrency is spawning operating system threads. Each thread runs independently and can execute its own work.

use std::thread;
use std::time::Duration;

fn main() {
    // Spawn a thread to monitor temperature
    let temp_handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("[TEMP] Reading {}: {:.1} C", i, 70.0 + i as f64 * 0.5);
            thread::sleep(Duration::from_millis(100));
        }
    });

    // Spawn another thread to monitor pressure
    let press_handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("[PRESS] Reading {}: {:.2} bar", i, 2.0 + i as f64 * 0.1);
            thread::sleep(Duration::from_millis(150));
        }
    });

    // Wait for both threads to finish
    temp_handle.join().expect("Temperature thread panicked");
    press_handle.join().expect("Pressure thread panicked");
    println!("All monitoring threads completed.");
}

thread::spawn returns a JoinHandle. Calling .join() blocks the current thread until the spawned thread finishes. If a spawned thread panics, .join() returns an Err.

Message Passing with Channels

Channels provide a safe way to send data between threads. Rust's mpsc module gives you a multi-producer, single-consumer channel.

use std::sync::mpsc;
use std::thread;

struct Reading {
    sensor_id: String,
    value: f64,
}

fn main() {
    // Create a channel: tx sends, rx receives
    let (tx, rx) = mpsc::channel();

    // Clone tx so multiple threads can send
    let tx1 = tx.clone();
    let tx2 = tx.clone();
    drop(tx); // drop original so channel closes when threads finish

    // Thread 1: temperature sensor
    thread::spawn(move || {
        let readings = vec![72.0, 73.5, 74.1];
        for val in readings {
            tx1.send(Reading {
                sensor_id: "TEMP-01".to_string(),
                value: val,
            }).unwrap();
        }
    });

    // Thread 2: pressure sensor
    thread::spawn(move || {
        let readings = vec![2.1, 2.3, 2.0];
        for val in readings {
            tx2.send(Reading {
                sensor_id: "PRESS-03".to_string(),
                value: val,
            }).unwrap();
        }
    });

    // Main thread: receive all readings
    for reading in rx {
        println!("{}: {}", reading.sensor_id, reading.value);
    }
    println!("All readings received.");
}

The rx iterator blocks until all senders are dropped, then the loop ends. This pattern naturally handles an unknown number of producers.

Shared State: Mutex and Arc

Sometimes threads need to write to the same data structure. A Mutex (mutual exclusion) ensures only one thread accesses the data at a time. Arc (atomic reference count) lets multiple threads own the same value.

use std::sync::{Arc, Mutex};
use std::thread;

struct Reading {
    machine: String,
    value: f64,
}

fn main() {
    // Shared log protected by a mutex, wrapped in Arc for multi-ownership
    let shared_log: Arc<Mutex<Vec<Reading>>> = Arc::new(Mutex::new(Vec::new()));

    let mut handles = vec![];

    let machines = vec!["CNC-01", "LATHE-02", "DRILL-03"];
    for name in machines {
        let log = Arc::clone(&shared_log);
        let handle = thread::spawn(move || {
            // Simulate a reading
            let reading = Reading {
                machine: name.to_string(),
                value: 65.0 + rand_value(name),
            };
            // Lock the mutex, push the reading, lock auto-releases
            let mut data = log.lock().unwrap();
            data.push(reading);
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap();
    }

    // All threads done, read the log
    let log = shared_log.lock().unwrap();
    println!("Collected {} readings:", log.len());
    for r in log.iter() {
        println!("  {}: {:.1} C", r.machine, r.value);
    }
}

// Simple deterministic value based on machine name length
fn rand_value(name: &str) -> f64 {
    name.len() as f64 * 1.3
}

Key rules: always clone the Arc before moving it into a thread. The Mutex lock is released automatically when the MutexGuard goes out of scope.

Send and Sync: Rust's Concurrency Guarantees

Rust enforces thread safety through two marker traits:

  • Send -- A type can be transferred to another thread. Most types are Send. A notable exception is Rc<T>, which is not thread-safe. Use Arc<T> instead.
  • Sync -- A type can be referenced from multiple threads simultaneously. Mutex<T> is Sync because it controls access. Raw Cell<T> and RefCell<T> are not Sync.
use std::rc::Rc;
use std::sync::Arc;

fn main() {
    let shared = Rc::new(42);
    // thread::spawn(move || {
    //     println!("{}", shared); // COMPILE ERROR: Rc is not Send
    // });

    // Use Arc instead for thread-safe reference counting
    let safe_shared = Arc::new(42);
    let clone = Arc::clone(&safe_shared);
    std::thread::spawn(move || {
        println!("From thread: {}", clone); // compiles fine
    }).join().unwrap();
}

The compiler checks these traits automatically. If you try to send a non-Send type across threads, the code will not compile. This eliminates data races at compile time rather than at runtime.

Practical Example: Parallel Machine Monitoring

A complete example with one thread per factory section, all reporting to a central aggregator through a channel:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

struct SectionReport {
    section: String,
    avg_temp: f64,
    machine_count: u32,
    has_alarm: bool,
}

fn monitor_section(name: &str, base_temp: f64) -> SectionReport {
    // Simulate reading 3 machines in this section
    let temps = vec![base_temp, base_temp + 2.0, base_temp - 1.5];
    let avg = temps.iter().sum::<f64>() / temps.len() as f64;
    let alarm = avg > 85.0;

    thread::sleep(Duration::from_millis(50)); // simulate I/O delay

    SectionReport {
        section: name.to_string(),
        avg_temp: avg,
        machine_count: temps.len() as u32,
        has_alarm: alarm,
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();

    let sections = vec![
        ("Assembly Line A", 72.0),
        ("Welding Bay B", 88.0),
        ("Paint Room C", 65.0),
        ("CNC Hall D", 78.0),
    ];

    for (name, base) in sections {
        let tx = tx.clone();
        thread::spawn(move || {
            let report = monitor_section(name, base);
            tx.send(report).unwrap();
        });
    }
    drop(tx); // close sender so rx iterator will end

    // Central aggregator collects all section reports
    let mut total_machines = 0u32;
    let mut alarm_sections = vec![];

    for report in rx {
        println!(
            "Section {}: avg {:.1} C, {} machines{}",
            report.section, report.avg_temp, report.machine_count,
            if report.has_alarm { " [ALARM]" } else { "" }
        );
        total_machines += report.machine_count;
        if report.has_alarm {
            alarm_sections.push(report.section);
        }
    }

    println!("\n--- Shift Summary ---");
    println!("Total machines monitored: {}", total_machines);
    if alarm_sections.is_empty() {
        println!("Status: All sections normal.");
    } else {
        println!("ALARMS in: {}", alarm_sections.join(", "));
    }
}

Each factory section runs in its own thread, simulating parallel sensor polling. The main thread aggregates results and prints a shift summary with alarm status.

Summary

  • Factories require concurrency to handle multiple machines and sensors producing data at the same time.
  • std::thread::spawn creates OS threads; use .join() to wait for completion.
  • Channels (mpsc) let threads send data to a central receiver without shared memory.
  • Arc<Mutex<T>> enables safe shared mutable state across threads.
  • Send and Sync traits are checked at compile time, preventing data races before your program runs.
  • Combining threads with channels gives you a clean pattern: one thread per factory section, one central aggregator for reporting and alarms.
threads concurrency Mutex Arc channels Send-Sync الخيوط التزامن القفل المتبادل القنوات المعالجة المتوازية أمان الخيوط