Practical Anomaly Detection: Smart Alerts When Machines Behave Strangely
What Is Anomaly Detection and Why Industry Cares
Anomaly detection identifies data points that deviate significantly from normal behavior. In industrial settings, anomalies are early warning signs: a bearing degrading, a chemical process drifting, or a sensor failing. The challenge is that anomalies are rare -- in 100,000 normal readings, there may be only 50 anomalies.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
Statistical Thresholds: The Simplest Approach
Set boundaries based on statistical properties. Anything outside is flagged.
np.random.seed(42)
hours = 2000
current = np.random.normal(15.0, 1.5, hours)
anomaly_idx = np.random.choice(hours, 20, replace=False)
current[anomaly_idx] += np.random.uniform(5, 10, 20)
mean, std = current.mean(), current.std()
z_scores = np.abs((current - mean) / std)
anomalies = z_scores > 3.0
print(f"Anomalies detected: {anomalies.sum()}")
fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(current, alpha=0.6, linewidth=0.5)
ax.scatter(np.where(anomalies)[0], current[anomalies], c="red", s=30, zorder=5)
ax.axhline(mean + 3 * std, color="orange", linestyle="--")
ax.axhline(mean - 3 * std, color="orange", linestyle="--")
ax.set_title("Motor Current: Z-Score Anomaly Detection")
plt.tight_layout()
plt.show()
Rolling Window Statistics
For time-varying data, use a rolling window instead of a global threshold:
series = pd.Series(current)
rolling_mean = series.rolling(50).mean()
rolling_std = series.rolling(50).std()
anomalies_rolling = (series > rolling_mean + 3 * rolling_std) | \
(series < rolling_mean - 3 * rolling_std)
print(f"Rolling window anomalies: {anomalies_rolling.sum()}")
Isolation Forest: Isolating the Abnormal From the Normal
Isolation Forest builds random trees and measures how few splits isolate each point. Anomalies, being different from the majority, get isolated quickly.
from sklearn.ensemble import IsolationForest
np.random.seed(42)
n = 2000
normal_data = np.column_stack([
np.random.normal(15, 1.5, n),
np.random.normal(72, 3, n),
np.random.normal(4.5, 0.5, n)
])
n_anom = 40
anomaly_data = np.column_stack([
np.random.normal(20, 2, n_anom),
np.random.normal(85, 4, n_anom),
np.random.normal(8, 1, n_anom)
])
X = np.vstack([normal_data, anomaly_data])
true_labels = np.array([0] * n + [1] * n_anom)
iso = IsolationForest(contamination=0.03, random_state=42, n_estimators=200)
predictions = iso.fit_predict(X)
detected = predictions == -1
print(f"Flagged: {detected.sum()}")
print(f"True anomalies caught: {(detected & (true_labels == 1)).sum()} / {n_anom}")
Autoencoder: Deep Learning for Anomaly Detection
An autoencoder learns to compress and reconstruct data. Trained on normal data, it reconstructs normal patterns well but produces high error for anomalies.
from sklearn.neural_network import MLPRegressor
scaler = StandardScaler()
X_normal = scaler.fit_transform(normal_data)
X_all = scaler.transform(X)
autoencoder = MLPRegressor(
hidden_layer_sizes=(16, 8, 3, 8, 16),
activation="relu", max_iter=500, random_state=42
)
autoencoder.fit(X_normal, X_normal)
recon_error = np.mean((X_all - autoencoder.predict(X_all)) ** 2, axis=1)
train_error = np.mean((X_normal - autoencoder.predict(X_normal)) ** 2, axis=1)
threshold = np.percentile(train_error, 97)
detected_ae = recon_error > threshold
print(f"Autoencoder caught: {(detected_ae & (true_labels == 1)).sum()} / {n_anom}")
Tuning Alert Sensitivity: Balancing Precision
Every false alarm costs money, but every missed anomaly risks a breakdown. This is the precision-recall tradeoff.
from sklearn.metrics import precision_score, recall_score
pred_binary = (predictions == -1).astype(int)
print(f"Precision: {precision_score(true_labels, pred_binary):.3f}")
print(f"Recall: {recall_score(true_labels, pred_binary):.3f}")
for cont in [0.01, 0.02, 0.03, 0.05, 0.10]:
iso_test = IsolationForest(contamination=cont, random_state=42)
pred = (iso_test.fit_predict(X) == -1).astype(int)
p = precision_score(true_labels, pred)
r = recall_score(true_labels, pred)
print(f"contamination={cont:.2f}: Precision={p:.3f}, Recall={r:.3f}")
Practical Example: Smart Alert System for an Industrial Motor
Build a complete anomaly detection pipeline for motor monitoring.
np.random.seed(42)
hours = 720
timestamps = pd.date_range("2025-03-01", periods=hours, freq="h")
df = pd.DataFrame({
"timestamp": timestamps,
"current_a": np.random.normal(15, 1.2, hours),
"temp_c": np.random.normal(70, 3, hours),
"vibration_mm_s": np.random.normal(4.0, 0.4, hours),
"power_factor": np.random.normal(0.85, 0.03, hours)
})
# Inject degradation in last 48 hours
df.loc[df.index[-48:], "temp_c"] += np.linspace(0, 15, 48)
df.loc[df.index[-48:], "vibration_mm_s"] += np.linspace(0, 4, 48)
df.loc[df.index[-48:], "current_a"] += np.linspace(0, 5, 48)
df.set_index("timestamp", inplace=True)
train = df.iloc[:504]
scaler = StandardScaler()
X_train = scaler.fit_transform(train)
iso = IsolationForest(contamination=0.02, random_state=42, n_estimators=200)
iso.fit(X_train)
X_all = scaler.transform(df)
df["alert"] = iso.predict(X_all) == -1
alerts = df[df["alert"]]
print(f"Total alerts: {len(alerts)}")
print(f"Alerts in last 48h: {len(alerts.last('48h'))}")
Summary
In this lesson you learned three anomaly detection approaches. Statistical thresholds are simple but limited to single variables. Isolation Forest handles multiple sensors and provides continuous scores. Autoencoders learn complex normal patterns using neural networks. You also learned to balance precision and recall, and built a motor monitoring system that detects developing bearing failures. In the next lesson, you will work with time series forecasting.