نظرية البيان وتطبيقاتها في الشبكات الصناعية
ما هي نظرية البيانات (الرسوم البيانية) ولماذا تهمّ المهندس الصناعي؟
انظر إلى أي مصنع من الأعلى: آلات متصلة بسيور ناقلة، أنابيب تربط مضخات بخزانات، كابلات شبكة تصل أجهزة PLC ببعضها. كل هذه الأنظمة يمكن تمثيلها رياضياً بـبيان (Graph) — مجموعة من العقد (Nodes) والأضلاع (Edges) التي تربط بينها.
نظرية البيانات (Graph Theory) هي فرع الرياضيات الذي يدرس هذه الشبكات. تساعدك في الإجابة على أسئلة مثل:
- ما أقصر مسار لنقل المنتج من المستودع إلى نقطة الشحن؟
- إذا تعطّل هذا المحوّل الكهربائي، أي أقسام ستنقطع عنها الكهرباء؟
- كيف نوزّع المهام على الآلات لتقليل وقت الإنتاج؟
المفاهيم الأساسية: العقد والأضلاع
البيان G = (V, E) يتكوّن من:
V= مجموعة العقد (Vertices/Nodes) — تمثّل الكيانات (آلات، محطات، أجهزة)E= مجموعة الأضلاع (Edges) — تمثّل الاتصالات بين العقد
# تمثيل بيان بسيط لشبكة مصنع
# العقد: محطات الإنتاج
nodes = ["مستودع_مواد", "قص", "لحام", "طلاء", "تجميع", "فحص", "شحن"]
# الأضلاع: خطوط النقل بين المحطات (مع المسافة بالأمتار)
edges = [
("مستودع_مواد", "قص", 30),
("مستودع_مواد", "لحام", 50),
("قص", "لحام", 15),
("قص", "طلاء", 40),
("لحام", "تجميع", 20),
("طلاء", "تجميع", 25),
("تجميع", "فحص", 10),
("فحص", "شحن", 35),
("تجميع", "شحن", 45),
]
print(f"عدد العقد: {len(nodes)}")
print(f"عدد الأضلاع: {len(edges)}")
print("\nالاتصالات:")
for src, dst, weight in edges:
print(f" {src} --({weight}m)--> {dst}")
أنواع البيانات: موجّه وغير موجّه
| النوع | الوصف | مثال صناعي |
|---|---|---|
| غير موجّه (Undirected) | الضلع يعمل بالاتجاهين | شبكة Ethernet في المصنع |
| موجّه (Directed) | الضلع باتجاه واحد | خط إنتاج (المادة تسير باتجاه واحد) |
| موزون (Weighted) | كل ضلع له قيمة (وزن) | مسافة، تكلفة، زمن نقل |
| غير موزون (Unweighted) | كل الأضلاع متساوية | وجود اتصال أو عدمه |
# تمثيل البيان كقائمة جوار (Adjacency List)
def build_adjacency_list(edges, directed=False):
graph = {}
for src, dst, weight in edges:
if src not in graph:
graph[src] = []
graph[src].append((dst, weight))
if not directed:
if dst not in graph:
graph[dst] = []
graph[dst].append((src, weight))
return graph
# بيان غير موجّه (شبكة المصنع)
graph_undirected = build_adjacency_list(edges, directed=False)
# بيان موجّه (خط الإنتاج)
graph_directed = build_adjacency_list(edges, directed=True)
print("جيران محطة 'قص' (غير موجّه):")
for neighbor, weight in graph_undirected.get("قص", []):
print(f" -> {neighbor} ({weight}m)")
print("\nجيران محطة 'قص' (موجّه):")
for neighbor, weight in graph_directed.get("قص", []):
print(f" -> {neighbor} ({weight}m)")
تمثيل البيانات: مصفوفة الجوار مقابل قائمة الجوار
هناك طريقتان رئيسيتان لتمثيل البيان في الذاكرة:
مصفوفة الجوار (Adjacency Matrix): مصفوفة n×n حيث M[i][j] = الوزن إذا يوجد ضلع:
import numpy as np
# ترقيم العقد
node_index = {name: i for i, name in enumerate(nodes)}
n = len(nodes)
# بناء مصفوفة الجوار
adj_matrix = np.zeros((n, n))
for src, dst, weight in edges:
i, j = node_index[src], node_index[dst]
adj_matrix[i][j] = weight
adj_matrix[j][i] = weight # غير موجّه
print("مصفوفة الجوار (الصف = المصدر، العمود = الهدف):")
print(f"{'':>15}", end="")
for name in nodes:
print(f"{name[:4]:>6}", end="")
print()
for i, name in enumerate(nodes):
print(f"{name:>15}", end="")
for j in range(n):
val = int(adj_matrix[i][j])
print(f"{val:>6}", end="")
print()
| التمثيل | الذاكرة | البحث عن ضلع | مناسب لـ |
|---|---|---|---|
| مصفوفة جوار | O(V²) | O(1) | بيانات كثيفة |
| قائمة جوار | O(V+E) | O(الدرجة) | بيانات متفرقة (الأغلب) |
خوارزمية ديكسترا: أقصر مسار
خوارزمية ديكسترا (Dijkstra's Algorithm) تجد أقصر مسار من عقدة مصدر إلى جميع العقد الأخرى في بيان موزون بأوزان موجبة.
الفكرة: ابدأ من المصدر، وفي كل خطوة اختر العقدة الأقرب التي لم تُزَر بعد، وحدّث المسافات عبرها.
import heapq
def dijkstra(graph, start):
"""خوارزمية ديكسترا لأقصر مسار"""
distances = {node: float('inf') for node in graph}
distances[start] = 0
previous = {node: None for node in graph}
visited = set()
# طابور أولوية (min-heap)
pq = [(0, start)]
while pq:
current_dist, current_node = heapq.heappop(pq)
if current_node in visited:
continue
visited.add(current_node)
for neighbor, weight in graph.get(current_node, []):
distance = current_dist + weight
if distance < distances.get(neighbor, float('inf')):
distances[neighbor] = distance
previous[neighbor] = current_node
heapq.heappush(pq, (distance, neighbor))
return distances, previous
def reconstruct_path(previous, start, end):
"""إعادة بناء المسار من البداية إلى النهاية"""
path = []
current = end
while current is not None:
path.append(current)
current = previous.get(current)
path.reverse()
if path[0] == start:
return path
return [] # لا يوجد مسار
# تطبيق على شبكة المصنع
distances, previous = dijkstra(graph_undirected, "مستودع_مواد")
print("أقصر المسافات من المستودع:")
for node in sorted(distances, key=distances.get):
path = reconstruct_path(previous, "مستودع_مواد", node)
print(f" إلى {node}: {distances[node]:.0f}m عبر {' -> '.join(path)}")
التعقيد الزمني: O((V + E) log V) مع طابور الأولوية — سريع جداً حتى لشبكات كبيرة.
درجة العقدة والمركزية: من هو الأهم؟
درجة العقدة (Degree) هي عدد الأضلاع المتصلة بها. العقد ذات الدرجة العالية هي مراكز الشبكة — تعطّلها يسبب أكبر ضرر.
def calculate_degree(graph):
"""حساب درجة كل عقدة"""
degrees = {}
for node in graph:
degrees[node] = len(graph[node])
return degrees
def calculate_betweenness_simple(graph, nodes):
"""مركزية الوساطة المبسّطة (betweenness)
كم مسار أقصر يمر عبر كل عقدة
"""
betweenness = {node: 0 for node in nodes}
for start in nodes:
distances, previous = dijkstra(graph, start)
for end in nodes:
if start != end and distances.get(end, float('inf')) < float('inf'):
path = reconstruct_path(previous, start, end)
for node in path[1:-1]: # بدون المصدر والهدف
betweenness[node] += 1
return betweenness
degrees = calculate_degree(graph_undirected)
betweenness = calculate_betweenness_simple(graph_undirected, nodes)
print("تحليل أهمية العقد:")
print(f"{'المحطة':<15} {'الدرجة':>8} {'الوساطة':>10}")
print("-" * 35)
for node in sorted(nodes, key=lambda n: betweenness.get(n, 0), reverse=True):
print(f"{node:<15} {degrees.get(node, 0):>8} {betweenness.get(node, 0):>10}")
# تحديد نقاط الضعف
max_betweenness_node = max(betweenness, key=betweenness.get)
print(f"\nنقطة الاختناق: {max_betweenness_node} (الوساطة: {betweenness[max_betweenness_node]})")
print("تعطّل هذه المحطة يقطع أكبر عدد من المسارات!")
تحليل الهشاشة: ماذا لو تعطّلت عقدة؟
في المصنع، تحليل الهشاشة (Vulnerability Analysis) يكشف نقاط الفشل الوحيدة (Single Points of Failure):
def analyze_vulnerability(graph, nodes, edges):
"""تحليل تأثير إزالة كل عقدة"""
results = []
for removed_node in nodes:
# بناء بيان بدون العقدة المحذوفة
reduced_edges = [(s, d, w) for s, d, w in edges
if s != removed_node and d != removed_node]
reduced_nodes = [n for n in nodes if n != removed_node]
if not reduced_edges:
results.append((removed_node, len(nodes) - 1, True))
continue
reduced_graph = build_adjacency_list(reduced_edges, directed=False)
# فحص الاتصالية: هل كل العقد المتبقية متصلة؟
visited = set()
start = reduced_nodes[0]
queue = [start]
visited.add(start)
while queue:
current = queue.pop(0)
for neighbor, _ in reduced_graph.get(current, []):
if neighbor not in visited and neighbor in reduced_nodes:
visited.add(neighbor)
queue.append(neighbor)
disconnected = len(reduced_nodes) - len(visited)
is_critical = disconnected > 0
results.append((removed_node, disconnected, is_critical))
print("تحليل الهشاشة — تأثير تعطّل كل محطة:")
print(f"{'المحطة':<15} {'عقد منقطعة':>12} {'حرجة؟':>8}")
print("-" * 37)
for node, disc, critical in sorted(results, key=lambda x: x[1], reverse=True):
status = "نعم!" if critical else "لا"
print(f"{node:<15} {disc:>12} {status:>8}")
critical_nodes = [r[0] for r in results if r[2]]
if critical_nodes:
print(f"\nنقاط الفشل الحرجة: {', '.join(critical_nodes)}")
print("يجب إضافة مسارات بديلة (redundancy) لهذه المحطات!")
else:
print("\nالشبكة متينة — لا توجد نقاط فشل وحيدة")
analyze_vulnerability(graph_undirected, nodes, edges)
تطبيق صناعي: تحسين شبكة المصنع
المسألة: مصنع يريد تحسين تخطيط الشبكة الصناعية (Industrial Ethernet) بين أجهزة التحكم. الهدف: تقليل زمن الاستجابة الأقصى مع ضمان المتانة.
import heapq
# شبكة Ethernet صناعية
network_nodes = ["PLC_main", "PLC_line1", "PLC_line2", "PLC_line3",
"SCADA", "HMI_1", "HMI_2", "Switch_A", "Switch_B"]
network_edges = [
("PLC_main", "Switch_A", 2), # ms (زمن الاستجابة)
("PLC_main", "Switch_B", 3),
("Switch_A", "PLC_line1", 1),
("Switch_A", "PLC_line2", 1),
("Switch_A", "SCADA", 2),
("Switch_B", "PLC_line3", 1),
("Switch_B", "HMI_1", 2),
("Switch_B", "HMI_2", 2),
("Switch_A", "Switch_B", 1), # رابط بين المحوّلين
("SCADA", "HMI_1", 3),
]
network_graph = build_adjacency_list(network_edges, directed=False)
# تحليل: أقصى زمن استجابة من SCADA لكل جهاز
distances, _ = dijkstra(network_graph, "SCADA")
print("زمن الاستجابة من SCADA لكل جهاز:")
print(f"{'الجهاز':<15} {'الزمن (ms)':>12}")
print("-" * 28)
for node in sorted(distances, key=distances.get):
if node != "SCADA":
print(f"{node:<15} {distances[node]:>12.0f}")
max_latency_node = max(
[n for n in distances if n != "SCADA"],
key=lambda n: distances[n]
)
print(f"\nأبطأ جهاز: {max_latency_node} ({distances[max_latency_node]:.0f} ms)")
# فحص: هل Switch_A نقطة فشل حرجة؟
print("\n--- اختبار تعطّل Switch_A ---")
edges_without_switch_a = [(s, d, w) for s, d, w in network_edges
if s != "Switch_A" and d != "Switch_A"]
reduced_graph = build_adjacency_list(edges_without_switch_a, directed=False)
distances_reduced, _ = dijkstra(reduced_graph, "SCADA")
unreachable = [n for n in network_nodes
if n != "SCADA" and n != "Switch_A"
and distances_reduced.get(n, float('inf')) == float('inf')]
if unreachable:
print(f"أجهزة ستنقطع: {', '.join(unreachable)}")
print("توصية: إضافة رابط مباشر بين SCADA و PLC_line1/PLC_line2")
else:
print("جميع الأجهزة لا تزال متصلة (الشبكة متينة)")
الشجرة الممتدة الصغرى: أقل تكلفة لربط الشبكة
الشجرة الممتدة الصغرى (Minimum Spanning Tree - MST) تربط جميع العقد بأقل تكلفة كلية بدون حلقات. مفيدة لتخطيط الكابلات والأنابيب.
def kruskal_mst(nodes, edges):
"""خوارزمية كروسكال للشجرة الممتدة الصغرى"""
# ترتيب الأضلاع بالوزن
sorted_edges = sorted(edges, key=lambda e: e[2])
# Union-Find للكشف عن الحلقات
parent = {node: node for node in nodes}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x, y):
px, py = find(x), find(y)
if px == py:
return False
parent[px] = py
return True
mst_edges = []
total_cost = 0
for src, dst, weight in sorted_edges:
if union(src, dst):
mst_edges.append((src, dst, weight))
total_cost += weight
if len(mst_edges) == len(nodes) - 1:
break
return mst_edges, total_cost
mst, cost = kruskal_mst(nodes, edges)
print(f"الشجرة الممتدة الصغرى (أقل تكلفة كابلات):")
print(f"التكلفة الكلية: {cost}m من الكابلات")
print("\nالاتصالات المطلوبة:")
for src, dst, weight in mst:
print(f" {src} --- {dst} ({weight}m)")
ملخص وقواعد عملية
| المفهوم | الوصف | التطبيق الصناعي |
|---|---|---|
| البيان | عقد + أضلاع | نمذجة أي شبكة |
| ديكسترا | أقصر مسار | تحسين مسارات النقل |
| درجة العقدة | عدد الاتصالات | تحديد المراكز الحرجة |
| تحليل الهشاشة | تأثير تعطّل عقدة | كشف نقاط الفشل |
| MST | أقل تكلفة ربط | تخطيط الكابلات والأنابيب |
نصيحة عملية: قبل تصميم أي شبكة صناعية — كهربائية، بيانات، أنابيب — ارسمها كبيان أولاً. حلّل نقاط الضعف، جد المسارات البديلة، واحسب التكلفة المثلى. ساعة من التحليل على الورق توفّر أسابيع من إعادة التمديد.