كشف الشذوذ في البيانات الصناعية
كشف الشذوذ: كيف يعرف النظام أن شيئاً ما خطأ؟
تخيّل أنك تستمع لصوت محرك كل يوم — بعد فترة، ستميّز فوراً أي تغيّر غير طبيعي في الصوت حتى لو لم تستطع وصفه بالكلمات. دماغك تعلّم "النمط الطبيعي" وأي انحراف عنه يُثير انتباهك.
كشف الشذوذ (Anomaly Detection) هو نفس الفكرة لكن رياضياً — تعليم الحاسوب ما هو "طبيعي" ثم تنبيهنا عند حدوث أي شيء غير متوقع. في المصانع، هذا يعني كشف أعطال المعدات قبل أن تؤدي إلى توقف مكلف.
الطرق الإحصائية الكلاسيكية
أبسط وأقدم أساليب كشف الشذوذ تعتمد على الإحصاء — لا تحتاج ذكاء اصطناعياً أو شبكات عصبية.
طريقة Z-Score (الدرجة المعيارية)
تقيس كم تبعد القراءة عن المتوسط مقارنة بالانحراف المعياري:
import numpy as np
def z_score_detection(data, threshold=3.0):
"""
كشف الشذوذ باستخدام Z-Score
القاعدة: أي قراءة تبعد أكثر من 3 انحرافات معيارية = شاذة
"""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return [] # لا تنوع في البيانات
anomalies = []
for i, value in enumerate(data):
z = abs(value - mean) / std
if z > threshold:
anomalies.append({
"index": i,
"value": value,
"z_score": z,
"direction": "مرتفع" if value > mean else "منخفض"
})
return anomalies
# بيانات اهتزاز محرك (mm/s RMS)
vibration_data = [2.1, 2.3, 2.0, 2.2, 2.1, 2.4, 2.0, 8.5, 2.2, 2.1, 2.3, 9.1]
anomalies = z_score_detection(vibration_data)
for a in anomalies:
print(f"شذوذ عند النقطة {a['index']}: القيمة={a['value']} (Z={a['z_score']:.1f}, {a['direction']})")
متى تستخدم Z-Score:
- البيانات تتبع التوزيع الطبيعي (جرس غاوس)
- تريد كشفاً سريعاً وبسيطاً
- الحساسات تقيس كمية فيزيائية مستقرة (حرارة ثابتة، ضغط ثابت)
طريقة المدى الرُبعي (IQR)
أكثر مقاومة للقيم الشاذة نفسها — لا تتأثر بها كما يتأثر المتوسط:
def iqr_detection(data, factor=1.5):
"""
كشف الشذوذ باستخدام المدى الرُبعي
أقل تأثراً بالقيم المتطرفة مقارنة بـ Z-Score
"""
q1 = np.percentile(data, 25)
q3 = np.percentile(data, 75)
iqr = q3 - q1
lower_fence = q1 - factor * iqr
upper_fence = q3 + factor * iqr
anomalies = []
for i, value in enumerate(data):
if value < lower_fence or value > upper_fence:
anomalies.append({
"index": i,
"value": value,
"lower_fence": lower_fence,
"upper_fence": upper_fence
})
return anomalies
# بيانات ضغط (bar) مع قيم شاذة
pressure = [4.2, 4.3, 4.1, 4.4, 4.0, 4.3, 1.2, 4.2, 4.5, 4.1, 7.8, 4.3]
results = iqr_detection(pressure)
print(f"عدد الشذوذات المكتشفة: {len(results)}")
| الطريقة | المزايا | العيوب | أفضل استخدام |
|---|---|---|---|
| Z-Score | بسيطة، سريعة | تفترض توزيعاً طبيعياً | بيانات مستقرة، بُعد واحد |
| IQR | مقاومة للمتطرفات | أقل حساسية | بيانات غير طبيعية التوزيع |
| Modified Z-Score (MAD) | الأفضل بين الاثنين | أبطأ قليلاً | بيانات صناعية عامة |
غابة العزل (Isolation Forest)
تصوّر أنك تحاول عزل نقطة واحدة من مجموعة — النقاط الطبيعية محاطة بنقاط مشابهة فتحتاج عدة خطوات لعزلها. لكن النقطة الشاذة معزولة أصلاً فيكفي خطوة أو اثنتين.
Isolation Forest يبني أشجاراً عشوائية ويقيس عدد الخطوات اللازمة لعزل كل نقطة — كلما قلّت الخطوات، زاد احتمال أنها شاذة:
from sklearn.ensemble import IsolationForest
import numpy as np
class IndustrialAnomalyDetector:
"""كاشف شذوذ صناعي باستخدام Isolation Forest"""
def __init__(self, contamination=0.05):
"""
contamination: النسبة المتوقعة للشذوذات (5% افتراضياً)
"""
self.model = IsolationForest(
n_estimators=200, # عدد الأشجار
contamination=contamination,
random_state=42,
n_jobs=-1 # استخدام كل المعالجات
)
def fit(self, normal_data):
"""تدريب على بيانات التشغيل الطبيعي"""
self.model.fit(normal_data)
def detect(self, new_data):
"""كشف الشذوذات في بيانات جديدة"""
# predict: 1 = طبيعي، -1 = شاذ
predictions = self.model.predict(new_data)
scores = self.model.score_samples(new_data)
anomalies = []
for i, (pred, score) in enumerate(zip(predictions, scores)):
if pred == -1:
anomalies.append({
"index": i,
"score": score,
"severity": "حرج" if score < -0.7 else "تحذير"
})
return anomalies
# مثال: مراقبة محرك بثلاثة أبعاد
# [حرارة, اهتزاز, تيار كهربائي]
normal_operation = np.array([
[65, 2.1, 15.2], [67, 2.3, 15.5], [64, 2.0, 15.1],
[66, 2.2, 15.3], [68, 2.4, 15.6], [65, 2.1, 15.0],
[67, 2.3, 15.4], [66, 2.0, 15.2], [64, 2.1, 15.1],
# ... مئات القراءات الطبيعية
])
detector = IndustrialAnomalyDetector(contamination=0.05)
detector.fit(normal_operation)
# بيانات جديدة — بعضها شاذ
new_readings = np.array([
[66, 2.2, 15.3], # طبيعي
[85, 5.8, 22.1], # شاذ: كل القيم مرتفعة
[65, 2.1, 15.0], # طبيعي
[67, 8.5, 15.4], # شاذ: اهتزاز مرتفع جداً
])
anomalies = detector.detect(new_readings)
for a in anomalies:
print(f"شذوذ عند القراءة {a['index']}: خطورة={a['severity']}")
لماذا Isolation Forest ممتاز للصناعة:
- يعمل مع بيانات متعددة الأبعاد (حرارة + اهتزاز + تيار معاً)
- لا يفترض شكلاً محدداً للتوزيع
- سريع في التدريب والتنبؤ
- يكشف أنماط شذوذ معقدة
المشفّر الذاتي (Autoencoder)
تخيّل أنك تطلب من شخص أن يحفظ صورة ثم يعيد رسمها من الذاكرة. إذا كانت الصورة مألوفة (مثل وجه إنسان)، سيرسمها بدقة. لكن إذا أعطيته صورة غريبة لم يرها من قبل، سيرسمها بشكل سيء.
Autoencoder يعمل بنفس المبدأ — يتعلم ضغط البيانات الطبيعية وإعادة بنائها. عندما تأتي بيانات شاذة، يفشل في إعادة بنائها بدقة = خطأ إعادة البناء يرتفع = إنذار:
import numpy as np
class SimpleAutoencoder:
"""
مشفّر ذاتي بسيط لكشف الشذوذ الصناعي
(في الواقع يُبنى بـ PyTorch أو TensorFlow)
"""
def __init__(self, input_dim, encoding_dim):
self.input_dim = input_dim
self.encoding_dim = encoding_dim
# في التنفيذ الحقيقي:
# Encoder: input_dim → 64 → 32 → encoding_dim
# Decoder: encoding_dim → 32 → 64 → input_dim
def train(self, normal_data, epochs=100):
"""
تدريب على بيانات طبيعية فقط
الهدف: تقليل خطأ إعادة البناء (Reconstruction Error)
"""
# loss = MSE(input, reconstructed_output)
pass
def detect_anomaly(self, data, threshold=None):
"""
كشف الشذوذ: إذا كان خطأ إعادة البناء > العتبة = شاذ
"""
reconstructed = self.reconstruct(data)
errors = np.mean((data - reconstructed) ** 2, axis=1)
if threshold is None:
threshold = self.calculate_threshold(errors)
results = []
for i, error in enumerate(errors):
results.append({
"index": i,
"reconstruction_error": error,
"is_anomaly": error > threshold,
"anomaly_score": error / threshold # > 1 = شاذ
})
return results
def calculate_threshold(self, training_errors, percentile=95):
"""حساب العتبة من بيانات التدريب"""
return np.percentile(training_errors, percentile)
بنية Autoencoder للبيانات الصناعية:
المدخلات (10 حساسات)
│
▼
┌─────────────────────┐
│ Encoder (ضغط) │
│ 10 → 64 → 32 → 8 │ ← تمثيل مضغوط (Latent Space)
├─────────────────────┤
│ Decoder (فك ضغط) │
│ 8 → 32 → 64 → 10 │ ← إعادة بناء المدخلات
└─────────────────────┘
│
▼
خطأ إعادة البناء = |مدخلات - مخرجات|²
إذا الخطأ > العتبة → شذوذ مكتشف!
ضبط العتبة (Threshold Setting)
أصعب قرار في كشف الشذوذ: أين نضع الخط الفاصل بين الطبيعي والشاذ؟
عتبة منخفضة جداً = إنذارات كثيرة كاذبة (False Positives) عتبة مرتفعة جداً = أعطال حقيقية لا تُكتشف (False Negatives)
def optimize_threshold(scores, labels, cost_fp=1, cost_fn=10):
"""
تحسين العتبة بناءً على التكلفة الاقتصادية
cost_fp: تكلفة إنذار كاذب (فحص غير ضروري)
cost_fn: تكلفة عطل غير مكتشف (توقف الخط)
"""
best_threshold = None
min_cost = float('inf')
for threshold in np.linspace(min(scores), max(scores), 1000):
fp = sum(1 for s, l in zip(scores, labels) if s > threshold and l == 0)
fn = sum(1 for s, l in zip(scores, labels) if s <= threshold and l == 1)
total_cost = fp * cost_fp + fn * cost_fn
if total_cost < min_cost:
min_cost = total_cost
best_threshold = threshold
return best_threshold, min_cost
استراتيجيات ضبط العتبة:
| الاستراتيجية | الوصف | متى تستخدمها |
|---|---|---|
| ثابتة (Fixed) | عتبة واحدة لا تتغير | أنظمة بسيطة ومستقرة |
| تكيّفية (Adaptive) | تتعدّل مع تغير ظروف التشغيل | أنظمة تعمل بأنماط مختلفة |
| متعددة المستويات | تحذير عند 2σ، إنذار عند 3σ | أنظمة حرجة تحتاج تدرّج |
| قائمة على التكلفة | تقلل التكلفة الإجمالية | عندما تعرف تكلفة كل خطأ |
إدارة الإنذارات الكاذبة (False Positive Management)
في المصانع، الإنذارات الكاذبة مشكلة حقيقية — إذا أطلق النظام 50 إنذاراً يومياً و48 منها كاذبة، سيتجاهل المشغلون كل الإنذارات بما فيها الحقيقية.
class AlertManager:
"""نظام إدارة ذكي للإنذارات الصناعية"""
def __init__(self):
self.alert_history = []
def evaluate_alert(self, anomaly_score, sensor_id, context):
"""تقييم الإنذار قبل إرساله"""
# 1. هل الشذوذ مستمر أم لحظي؟
if not self.is_persistent(sensor_id, duration_seconds=30):
return {"action": "تجاهل", "reason": "شذوذ لحظي (< 30 ثانية)"}
# 2. هل هناك تأكيد من حساسات مجاورة؟
correlated = self.check_correlated_sensors(sensor_id)
if not correlated:
return {"action": "مراقبة", "reason": "لا تأكيد من حساسات أخرى"}
# 3. تصنيف الخطورة
severity = self.classify_severity(anomaly_score, context)
# 4. منع تكرار الإنذارات (Debouncing)
if self.was_recently_alerted(sensor_id, minutes=15):
return {"action": "دمج", "reason": "إنذار مكرر خلال 15 دقيقة"}
return {
"action": "إنذار",
"severity": severity,
"correlated_sensors": correlated,
"recommended_action": self.suggest_action(severity, sensor_id)
}
تطبيقات صناعية حقيقية
مراقبة الاهتزاز (Vibration Monitoring)
# كشف أعطال المحامل من بيانات الاهتزاز
vibration_features = {
"rms": 2.1, # القيمة الجذرية المتوسطة
"peak": 5.8, # القيمة القصوى
"crest_factor": 2.76, # نسبة القمة إلى RMS
"kurtosis": 3.2, # التفرطح (> 3.5 = مشكلة محتملة)
"bpfo": 0.15, # تردد عيب الحلقة الخارجية
"bpfi": 0.08, # تردد عيب الحلقة الداخلية
}
مراقبة الحرارة (Temperature Monitoring)
# كشف ارتفاع حرارة تدريجي في محوّل كهربائي
def detect_thermal_anomaly(temp_history, window=60):
"""كشف ارتفاع حرارة غير طبيعي"""
recent = temp_history[-window:]
baseline = temp_history[-window*3:-window]
rate_of_change = (recent[-1] - recent[0]) / window # °C/دقيقة
baseline_avg = np.mean(baseline)
current_avg = np.mean(recent)
deviation = current_avg - baseline_avg
if rate_of_change > 0.5: # ارتفاع سريع
return "إنذار: ارتفاع حرارة سريع"
elif deviation > 10: # انحراف عن الخط الأساسي
return "تحذير: حرارة أعلى من المعتاد"
return "طبيعي"
مراقبة الضغط (Pressure Monitoring)
| نوع الشذوذ | الوصف | السبب المحتمل |
|---|---|---|
| ارتفاع مفاجئ | قفزة حادة في الضغط | انسداد أو إغلاق صمام |
| انخفاض تدريجي | تسريب بطيء | تآكل أنبوب أو وصلة |
| تذبذب | ضغط غير مستقر | مضخة متعبة أو صمام عالق |
| ثبات غير طبيعي | لا تغيّر رغم تغيّر الحمل | حساس معطل |
مقارنة شاملة بين طرق كشف الشذوذ
| الطريقة | التعقيد | البيانات المطلوبة | أبعاد متعددة | الوقت الحقيقي | أفضل استخدام |
|---|---|---|---|---|---|
| Z-Score | منخفض | قليلة | لا | نعم | حساس واحد، بيانات مستقرة |
| IQR | منخفض | قليلة | لا | نعم | بيانات مع متطرفات |
| Isolation Forest | متوسط | متوسطة | نعم | نعم | أنماط شذوذ معقدة |
| Autoencoder | مرتفع | كثيرة | نعم | نعم (بعد التدريب) | أنظمة معقدة متعددة الحساسات |
نصائح عملية
- ابدأ بالإحصاء البسيط: Z-Score و IQR يحلّان 70% من المشاكل — لا تقفز إلى الشبكات العصبية مباشرة
- اجمع بيانات "طبيعية" كافية: شهر على الأقل يغطي كل أنماط التشغيل
- صنّف الإنذارات: ليس كل شذوذ عطل — بعضها تغييرات تشغيلية مقصودة
- راقب معدل الإنذارات الكاذبة: إذا تجاوز 20% فالنظام يحتاج إعادة ضبط
- استخدم عتبات متعددة: تحذير → إنذار → إيقاف طوارئ
- وثّق كل إنذار حقيقي: لأن هذه البيانات هي ذهب لتحسين النموذج مستقبلاً