الرئيسية قاعدة المعرفة البرمجة والمنطق البرمجة غير المتزامنة مع Tokio: آلاف المهام بخيط واحد
البرمجة والمنطق

البرمجة غير المتزامنة مع Tokio: آلاف المهام بخيط واحد

غير المتزامن مقابل الخيوط: متى تستخدم أيّاً منهما

في المصنع، بعض المهام تنتظر بيانات (I/O-bound) وبعضها تحسب أرقاماً (CPU-bound).

// I/O-bound: انتظار استجابة مستشعر عبر الشبكة ← async
// قراءة 100 مستشعر بخيط واحد أكفأ من 100 خيط
async fn read_sensor(id: u32) -> f64 { /* ... */ }

// CPU-bound: تحليل بيانات اهتزاز ثقيلة ← خيوط
fn analyze_vibration(data: &[f64]) -> Report { /* ... */ }

القاعدة: إذا كان الكود ينتظر شبكة أو قرص، استخدم async. إذا كان يحسب، استخدم خيوطاً.

المستقبلات وصيغة async/await

الدالة async fn لا تُنفَّذ فوراً، بل تُنشئ Future — وعد بقيمة مستقبلية.

// هذه الدالة تُرجع Future<Output = f64>
async fn read_temperature(sensor_id: u32) -> f64 {
    // await تُوقف التنفيذ هنا حتى تصل البيانات
    // لكن الخيط لا يتوقف — ينتقل لمهمة أخرى
    let response = fetch_from_device(sensor_id).await;
    response.value
}

async fn monitor() {
    // .await تُحرّك المستقبل حتى يكتمل
    let temp = read_temperature(42).await;
    println!("الحرارة: {temp}°C");
}

بدون .await، المستقبل لا يعمل — مثل آلة متصلة بالكهرباء لكن لم تُشغّل.

بيئة تشغيل Tokio

Rust لا تحتوي بيئة تشغيل مدمجة. نستخدم Tokio كمحرك للمهام غير المتزامنة.

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
// بيئة متعددة الخيوط (الافتراضي) — للخوادم وأنظمة المراقبة
#[tokio::main]
async fn main() {
    println!("نظام المراقبة يعمل");
}

// بيئة خيط واحد — للأجهزة المدمجة محدودة الموارد
#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("وحدة تحكم مدمجة");
}

إطلاق مهام متزامنة مع tokio::spawn

لقراءة عدة مستشعرات في نفس الوقت بدلاً من واحد تلو الآخر:

use tokio::task::JoinHandle;

async fn read_sensor(id: u32) -> (u32, f64) {
    // محاكاة تأخير شبكة
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    (id, 25.0 + id as f64 * 0.5)
}

#[tokio::main]
async fn main() {
    // إطلاق 5 قراءات متزامنة
    let handles: Vec<JoinHandle<(u32, f64)>> = (1..=5)
        .map(|id| tokio::spawn(read_sensor(id)))
        .collect();

    // انتظار جميع النتائج
    for handle in handles {
        let (id, value) = handle.await.unwrap();
        println!("مستشعر {id}: {value}°C");
    }
}

خمس قراءات تكتمل في ~100ms بدلاً من ~500ms تسلسلياً.

المؤقتات والفترات الزمنية

في المصانع، نحتاج استطلاع دوري للمستشعرات كل ثوانٍ محددة:

use tokio::time::{interval, Duration};

async fn periodic_monitoring() {
    // قراءة كل 5 ثوانٍ
    let mut timer = interval(Duration::from_secs(5));

    loop {
        timer.tick().await; // تنتظر حتى الدورة التالية
        let temp = read_temperature(1).await;
        if temp > 80.0 {
            trigger_alarm("تجاوز حراري").await;
        }
        println!("[دورة] الحرارة: {temp}°C");
    }
}

interval يضمن توقيتاً منتظماً حتى لو استغرقت القراءة وقتاً متفاوتاً.

select: سباق عدة مستقبلات

tokio::select! ينفّذ أول مستقبل يكتمل — مثالي للمهلات وأولويات الإنذار:

use tokio::time::{sleep, Duration};

async fn read_with_timeout(sensor_id: u32) -> Option<f64> {
    tokio::select! {
        // الفرع الأول: القراءة الفعلية
        result = read_sensor(sensor_id) => {
            Some(result.1)
        }
        // الفرع الثاني: مهلة 3 ثوانٍ
        _ = sleep(Duration::from_secs(3)) => {
            eprintln!("مستشعر {sensor_id}: انتهت المهلة!");
            None
        }
    }
}

// سباق بين مصدرين: أيهما يُبلّغ أولاً
async fn wait_for_any_alarm() {
    tokio::select! {
        temp = watch_temperature() => {
            println!("إنذار حرارة: {temp}°C");
        }
        pressure = watch_pressure() => {
            println!("إنذار ضغط: {pressure} bar");
        }
    }
}

مثال عملي: نظام استطلاع مستشعرات غير متزامن

نظام يراقب 100 مستشعر مع مهلات وإنذارات:

use tokio::time::{interval, timeout, Duration};
use std::collections::HashMap;

struct SensorReading {
    id: u32,
    value: f64,
    status: SensorStatus,
}

enum SensorStatus { Ok, Warning, Critical, Timeout }

async fn poll_sensor(id: u32) -> SensorReading {
    // تطبيق مهلة على كل قراءة
    match timeout(Duration::from_secs(2), read_sensor(id)).await {
        Ok((_, value)) => {
            let status = if value > 90.0 { SensorStatus::Critical }
                else if value > 75.0 { SensorStatus::Warning }
                else { SensorStatus::Ok };
            SensorReading { id, value, status }
        }
        Err(_) => SensorReading {
            id, value: 0.0, status: SensorStatus::Timeout
        },
    }
}

#[tokio::main]
async fn main() {
    let mut tick = interval(Duration::from_secs(10));
    let sensor_ids: Vec<u32> = (1..=100).collect();

    loop {
        tick.tick().await;

        // إطلاق 100 قراءة متزامنة
        let handles: Vec<_> = sensor_ids.iter()
            .map(|&id| tokio::spawn(poll_sensor(id)))
            .collect();

        let mut alerts = Vec::new();
        for handle in handles {
            let reading = handle.await.unwrap();
            match reading.status {
                SensorStatus::Critical => {
                    alerts.push(format!(
                        "حرج: مستشعر {} = {:.1}", reading.id, reading.value
                    ));
                }
                SensorStatus::Timeout => {
                    alerts.push(format!("انقطاع: مستشعر {}", reading.id));
                }
                _ => {}
            }
        }

        if !alerts.is_empty() {
            println!("=== إنذارات ({}) ===", alerts.len());
            for alert in &alerts {
                println!("  ⚠ {alert}");
            }
        }
    }
}

الخلاصة

  • async/await مثالي لعمليات الإدخال/الإخراج كقراءة المستشعرات عبر الشبكة
  • Tokio يوفر بيئة تشغيل قوية مع مؤقتات ومهام متزامنة
  • tokio::spawn يُطلق مهاماً متوازية لقراءة عشرات المستشعرات معاً
  • interval يُنظّم الاستطلاع الدوري بتوقيت ثابت
  • select! يُسابق عدة عمليات — مثالي للمهلات والأولويات
  • في الدرس القادم: الاختبارات — كيف نتأكد أن كل هذا الكود يعمل بشكل صحيح
async await Tokio Future spawn runtime غير متزامن المهام المستقبل وقت التشغيل المراقبة المتوازية الأداء