Traffic anomaly detection represents a critical component of modern intelligent transportation systems, serving as the first line of defense against incidents that can cascade into major disruptions. Unlike traditional traffic prediction that focuses on forecasting normal patterns, anomaly detection systems must identify deviations from expected behavior in real-time, often with incomplete information and under severe time constraints.
The economic impact of undetected traffic incidents is substantial. A single major highway incident can cost thousands of dollars per minute in lost productivity, increased fuel consumption, and delayed emergency response. Studies indicate that every minute of incident detection delay increases the total incident impact by 4-8%. This makes real-time anomaly detection not just a technical challenge, but a critical infrastructure need.
Modern anomaly detection systems must handle multiple types of abnormalities: sudden incidents like accidents or vehicle breakdowns, gradual degradations like weather-related slowdowns, recurring but irregular patterns like construction zones, and even coordinated disruptions like protests or emergency evacuations. Each type requires different detection strategies and response protocols.
This comprehensive guide examines the technical foundations, implementation strategies, and operational considerations for building robust traffic anomaly detection systems that can operate reliably in production environments.
Understanding Traffic Anomalies: Types and Characteristics
Temporal Classification of Anomalies
Point Anomalies These represent individual data points that deviate significantly from normal patterns. Examples include sudden speed drops at a specific sensor due to accidents, or unexpected volume spikes during off-peak hours. Point anomalies are often the easiest to detect but require careful filtering to avoid false positives from sensor noise or temporary disruptions.
Contextual Anomalies Data points that appear normal in isolation but are anomalous within their specific context. A moderate traffic volume might be normal during rush hour but highly suspicious at 3 AM. These require sophisticated models that understand temporal and spatial context.
Collective Anomalies Sequences of data points that collectively indicate abnormal behavior, even if individual points appear normal. Examples include gradual speed reductions across multiple sensors indicating an incident downstream, or unusual traffic patterns suggesting coordinated disruptions.
Spatial Distribution Patterns
Localized Incidents Anomalies affecting a single location or small area, typically caused by accidents, breakdowns, or local construction. These create characteristic signatures in upstream and downstream traffic flows.
Corridor-Level Disruptions Anomalies spanning significant distances along a traffic corridor, often caused by weather events, major incidents, or planned closures. Detection requires analyzing correlated patterns across multiple monitoring points.
Network-Wide Disturbances System-level anomalies affecting large portions of a transportation network, such as during major events, emergencies, or infrastructure failures. These require network-level analysis and coordination.
Severity and Impact Classification
Understanding anomaly severity helps prioritize response efforts and resource allocation:
Critical Incidents Complete blockages or major accidents requiring immediate emergency response. Detection latency must be minimized to prevent secondary incidents and enable rapid intervention.
Moderate Disruptions Significant but non-critical slowdowns that impact traffic flow efficiency. These benefit from prompt detection to enable dynamic routing and signal adjustments.
Minor Irregularities Small deviations that may indicate developing problems or normal variations. These require monitoring but may not trigger immediate response.
Statistical Process Control for Traffic Monitoring
Statistical Process Control (SPC) provides the foundational framework for detecting when traffic systems deviate from normal operating conditions. Adapted from manufacturing quality control, SPC techniques offer robust, interpretable methods for real-time anomaly detection.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.preprocessing import StandardScaler
from collections import deque
import warnings
warnings.filterwarnings('ignore')
class TrafficSPCDetector:
"""
Statistical Process Control system for traffic anomaly detection
"""
def __init__(self, window_size=50, control_limit_factor=3):
self.window_size = window_size
self.control_limit_factor = control_limit_factor
self.historical_data = deque(maxlen=window_size)
self.anomaly_threshold = 0.05
self.control_charts = {}
def initialize_control_limits(self, baseline_data):
"""
Initialize control limits based on baseline historical data
"""
baseline_stats = {
'mean': np.mean(baseline_data),
'std': np.std(baseline_data),
'upper_control_limit': np.mean(baseline_data) + self.control_limit_factor * np.std(baseline_data),
'lower_control_limit': np.mean(baseline_data) - self.control_limit_factor * np.std(baseline_data),
'upper_warning_limit': np.mean(baseline_data) + 2 * np.std(baseline_data),
'lower_warning_limit': np.mean(baseline_data) - 2 * np.std(baseline_data)
}
return baseline_stats
def xbar_chart_detection(self, current_value, control_limits):
"""
X-bar control chart for detecting mean shifts
"""
anomaly_type = None
severity = 'normal'
if current_value > control_limits['upper_control_limit']:
anomaly_type = 'high_outlier'
severity = 'critical'
elif current_value < control_limits['lower_control_limit']:
anomaly_type = 'low_outlier'
severity = 'critical'
elif current_value > control_limits['upper_warning_limit']:
anomaly_type = 'high_warning'
severity = 'moderate'
elif current_value < control_limits['lower_warning_limit']:
anomaly_type = 'low_warning'
severity = 'moderate'
return {
'is_anomaly': anomaly_type is not None,
'anomaly_type': anomaly_type,
'severity': severity,
'control_value': current_value,
'deviation': abs(current_value - control_limits['mean']) / control_limits['std']
}
def cusum_detection(self, data_stream, target_mean, std_dev, drift_threshold=1.0):
"""
CUSUM (Cumulative Sum) control chart for detecting persistent shifts
"""
cusum_high = 0
cusum_low = 0
detection_threshold = 5 * std_dev
anomalies = []
for i, value in enumerate(data_stream):
# Calculate deviations
deviation = (value - target_mean) / std_dev
# Update CUSUM statistics
cusum_high = max(0, cusum_high + deviation - drift_threshold)
cusum_low = max(0, cusum_low - deviation - drift_threshold)
# Check for anomalies
anomaly_detected = False
anomaly_type = None
if cusum_high > detection_threshold:
anomaly_detected = True
anomaly_type = 'persistent_increase'
cusum_high = 0 # Reset after detection
elif cusum_low > detection_threshold:
anomaly_detected = True
anomaly_type = 'persistent_decrease'
cusum_low = 0 # Reset after detection
anomalies.append({
'index': i,
'value': value,
'cusum_high': cusum_high,
'cusum_low': cusum_low,
'is_anomaly': anomaly_detected,
'anomaly_type': anomaly_type
})
return anomalies
def ewma_detection(self, data_stream, lambda_factor=0.2, control_factor=3):
"""
Exponentially Weighted Moving Average (EWMA) for detecting small persistent shifts
"""
if len(data_stream) < 2:
return []
# Initialize EWMA
ewma_values = [data_stream[0]]
baseline_mean = np.mean(data_stream[:min(20, len(data_stream))])
baseline_std = np.std(data_stream[:min(20, len(data_stream))])
anomalies = []
for i in range(1, len(data_stream)):
# Calculate EWMA
ewma_current = lambda_factor * data_stream[i] + (1 - lambda_factor) * ewma_values[-1]
ewma_values.append(ewma_current)
# Calculate control limits for EWMA
ewma_std = baseline_std * np.sqrt(lambda_factor / (2 - lambda_factor) *
(1 - (1 - lambda_factor)**(2 * (i + 1))))
upper_limit = baseline_mean + control_factor * ewma_std
lower_limit = baseline_mean - control_factor * ewma_std
# Detect anomalies
is_anomaly = ewma_current > upper_limit or ewma_current < lower_limit
anomaly_type = None
if ewma_current > upper_limit:
anomaly_type = 'ewma_high'
elif ewma_current < lower_limit:
anomaly_type = 'ewma_low'
anomalies.append({
'index': i,
'value': data_stream[i],
'ewma': ewma_current,
'upper_limit': upper_limit,
'lower_limit': lower_limit,
'is_anomaly': is_anomaly,
'anomaly_type': anomaly_type
})
return anomalies
def multivariate_hotelling_t2(self, data_matrix, alpha=0.01):
"""
Hotelling's T² statistic for multivariate anomaly detection
"""
n_samples, n_features = data_matrix.shape
if n_samples < n_features + 1:
raise ValueError("Need more samples than features for reliable covariance estimation")
# Calculate sample statistics
sample_mean = np.mean(data_matrix, axis=0)
sample_cov = np.cov(data_matrix.T)
# Calculate T² statistics
t2_stats = []
for i in range(n_samples):
diff = data_matrix[i] - sample_mean
t2 = n_samples * np.dot(np.dot(diff.T, np.linalg.inv(sample_cov)), diff)
t2_stats.append(t2)
# Calculate control limit
from scipy.stats import f
f_critical = f.ppf(1 - alpha, n_features, n_samples - n_features)
control_limit = ((n_samples - 1) * n_features / (n_samples - n_features)) * f_critical
# Identify anomalies
anomalies = []
for i, t2_value in enumerate(t2_stats):
is_anomaly = t2_value > control_limit
anomalies.append({
'index': i,
't2_statistic': t2_value,
'control_limit': control_limit,
'is_anomaly': is_anomaly,
'data_point': data_matrix[i]
})
return anomalies, control_limit
def generate_traffic_data_with_anomalies(self, n_samples=1000, anomaly_rate=0.05):
"""
Generate synthetic traffic data with embedded anomalies for testing
"""
# Base traffic pattern with daily cycles
time_index = np.arange(n_samples)
base_pattern = 50 + 30 * np.sin(2 * np.pi * time_index / 96) # 96 = 15-min intervals per day
# Add noise
noise = np.random.normal(0, 5, n_samples)
normal_data = base_pattern + noise
# Add anomalies
n_anomalies = int(n_samples * anomaly_rate)
anomaly_indices = np.random.choice(n_samples, n_anomalies, replace=False)
anomaly_data = normal_data.copy()
anomaly_labels = np.zeros(n_samples, dtype=bool)
for idx in anomaly_indices:
anomaly_type = np.random.choice(['spike', 'drop', 'gradual_increase'])
if anomaly_type == 'spike':
anomaly_data[idx] += np.random.uniform(40, 80)
elif anomaly_type == 'drop':
anomaly_data[idx] -= np.random.uniform(30, 60)
elif anomaly_type == 'gradual_increase':
# Gradual increase over next 10 points
for i in range(min(10, n_samples - idx)):
if idx + i < n_samples:
anomaly_data[idx + i] += np.random.uniform(10, 25)
anomaly_labels[idx] = True
return {
'data': anomaly_data,
'labels': anomaly_labels,
'anomaly_indices': anomaly_indices,
'normal_baseline': normal_data
}
def evaluate_detection_performance(self, detected_anomalies, true_labels):
"""
Evaluate the performance of anomaly detection
"""
detected_binary = np.array([det['is_anomaly'] for det in detected_anomalies])
# Calculate performance metrics
true_positives = np.sum(detected_binary & true_labels)
false_positives = np.sum(detected_binary & ~true_labels)
true_negatives = np.sum(~detected_binary & ~true_labels)
false_negatives = np.sum(~detected_binary & true_labels)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'true_positives': true_positives,
'false_positives': false_positives,
'true_negatives': true_negatives,
'false_negatives': false_negatives
}
# Example usage and testing
def demonstrate_spc_detection():
"""
Demonstrate SPC-based anomaly detection methods
"""
detector = TrafficSPCDetector()
# Generate test data
print("Generating synthetic traffic data with anomalies...")
test_data = detector.generate_traffic_data_with_anomalies(n_samples=500, anomaly_rate=0.08)
traffic_data = test_data['data']
true_labels = test_data['labels']
# Initialize control limits based on first 100 samples (assuming they're mostly normal)
baseline_data = traffic_data[:100]
control_limits = detector.initialize_control_limits(baseline_data)
print(f"Control Limits Established:")
print(f"Mean: {control_limits['mean']:.2f}")
print(f"Upper Control Limit: {control_limits['upper_control_limit']:.2f}")
print(f"Lower Control Limit: {control_limits['lower_control_limit']:.2f}")
# Test X-bar chart detection
print("\n1. Testing X-bar Control Chart Detection...")
xbar_results = []
for value in traffic_data:
result = detector.xbar_chart_detection(value, control_limits)
xbar_results.append(result)
xbar_performance = detector.evaluate_detection_performance(xbar_results, true_labels)
print(f"X-bar Chart Performance:")
print(f"Precision: {xbar_performance['precision']:.3f}")
print(f"Recall: {xbar_performance['recall']:.3f}")
print(f"F1-Score: {xbar_performance['f1_score']:.3f}")
# Test CUSUM detection
print("\n2. Testing CUSUM Detection...")
cusum_results = detector.cusum_detection(
traffic_data,
target_mean=control_limits['mean'],
std_dev=control_limits['std']
)
cusum_performance = detector.evaluate_detection_performance(cusum_results, true_labels)
print(f"CUSUM Performance:")
print(f"Precision: {cusum_performance['precision']:.3f}")
print(f"Recall: {cusum_performance['recall']:.3f}")
print(f"F1-Score: {cusum_performance['f1_score']:.3f}")
# Test EWMA detection
print("\n3. Testing EWMA Detection...")
ewma_results = detector.ewma_detection(traffic_data)
ewma_performance = detector.evaluate_detection_performance(ewma_results, true_labels)
print(f"EWMA Performance:")
print(f"Precision: {ewma_performance['precision']:.3f}")
print(f"Recall: {ewma_performance['recall']:.3f}")
print(f"F1-Score: {ewma_performance['f1_score']:.3f}")
# Visualization
plt.figure(figsize=(15, 12))
# Plot original data with anomalies
plt.subplot(3, 1, 1)
plt.plot(traffic_data, 'b-', alpha=0.7, label='Traffic Data')
anomaly_indices = np.where(true_labels)[0]
plt.scatter(anomaly_indices, traffic_data[anomaly_indices], color='red', s=50, label='True Anomalies')
plt.axhline(y=control_limits['upper_control_limit'], color='r', linestyle='--', alpha=0.5, label='Control Limits')
plt.axhline(y=control_limits['lower_control_limit'], color='r', linestyle='--', alpha=0.5)
plt.axhline(y=control_limits['mean'], color='g', linestyle='-', alpha=0.5, label='Mean')
plt.title('Traffic Data with True Anomalies')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot X-bar chart results
plt.subplot(3, 1, 2)
plt.plot(traffic_data, 'b-', alpha=0.7)
xbar_anomalies = [i for i, res in enumerate(xbar_results) if res['is_anomaly']]
plt.scatter(xbar_anomalies, traffic_data[xbar_anomalies], color='orange', s=30, label='Detected Anomalies')
plt.axhline(y=control_limits['upper_control_limit'], color='r', linestyle='--', alpha=0.5)
plt.axhline(y=control_limits['lower_control_limit'], color='r', linestyle='--', alpha=0.5)
plt.title('X-bar Chart Detection Results')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot EWMA results
plt.subplot(3, 1, 3)
ewma_values = [res['ewma'] for res in ewma_results]
upper_limits = [res['upper_limit'] for res in ewma_results]
lower_limits = [res['lower_limit'] for res in ewma_results]
plt.plot(ewma_values, 'g-', label='EWMA Values')
plt.plot(upper_limits, 'r--', alpha=0.5, label='EWMA Control Limits')
plt.plot(lower_limits, 'r--', alpha=0.5)
ewma_anomalies = [i for i, res in enumerate(ewma_results) if res['is_anomaly']]
if ewma_anomalies:
plt.scatter(ewma_anomalies, [ewma_values[i] for i in ewma_anomalies],
color='purple', s=30, label='EWMA Detected')
plt.title('EWMA Detection Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return {
'xbar_performance': xbar_performance,
'cusum_performance': cusum_performance,
'ewma_performance': ewma_performance
}
# Run demonstration
performance_results = demonstrate_spc_detection()
Machine Learning Approaches for Anomaly Detection
While statistical process control provides robust baseline methods, machine learning approaches can capture more complex patterns and adapt to changing traffic conditions. Modern ML-based anomaly detection systems combine multiple techniques to achieve high detection rates while minimizing false positives.
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.svm import OneClassSVM
from sklearn.cluster import DBSCAN
from sklearn.neighbors import LocalOutlierFactor
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, RepeatVector, TimeDistributed
import joblib
class MLAnomalyDetector:
"""
Machine learning based traffic anomaly detection system
"""
def __init__(self):
self.models = {}
self.scalers = {}
self.is_trained = False
def prepare_features(self, traffic_data, temporal_window=12):
"""
Prepare feature matrix for ML models
"""
features = []
# Basic statistical features
features.extend([
traffic_data['volume'],
traffic_data['speed'],
traffic_data['density']
])
# Temporal features
features.extend([
traffic_data['hour'],
traffic_data['day_of_week'],
traffic_data['month']
])
# Cyclical encoding of temporal features
features.extend([
np.sin(2 * np.pi * traffic_data['hour'] / 24),
np.cos(2 * np.pi * traffic_data['hour'] / 24),
np.sin(2 * np.pi * traffic_data['day_of_week'] / 7),
np.cos(2 * np.pi * traffic_data['day_of_week'] / 7)
])
# Rolling statistics (lag features)
for window in [3, 6, 12]:
features.extend([
traffic_data['volume'].rolling(window=window).mean(),
traffic_data['volume'].rolling(window=window).std(),
traffic_data['speed'].rolling(window=window).mean(),
traffic_data['speed'].rolling(window=window).std()
])
# Rate of change features
features.extend([
traffic_data['volume'].diff(),
traffic_data['speed'].diff(),
traffic_data['volume'].diff().rolling(window=3).mean()
])
feature_matrix = np.column_stack(features)
# Remove rows with NaN values
valid_rows = ~np.isnan(feature_matrix).any(axis=1)
feature_matrix = feature_matrix[valid_rows]
return feature_matrix, valid_rows
def isolation_forest_detection(self, train_data, test_data, contamination=0.1):
"""
Isolation Forest for anomaly detection
"""
# Train model
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_jobs=-1
)
iso_forest.fit(train_data)
# Predict anomalies
train_scores = iso_forest.decision_function(train_data)
test_scores = iso_forest.decision_function(test_data)
train_predictions = iso_forest.predict(train_data)
test_predictions = iso_forest.predict(test_data)
# Convert predictions to binary (1 = normal, -1 = anomaly)
train_anomalies = (train_predictions == -1)
test_anomalies = (test_predictions == -1)
self.models['isolation_forest'] = iso_forest
return {
'train_anomalies': train_anomalies,
'test_anomalies': test_anomalies,
'train_scores': train_scores,
'test_scores': test_scores,
'model': iso_forest
}
def one_class_svm_detection(self, train_data, test_data, nu=0.1):
"""
One-Class SVM for novelty detection
"""
# Scale data
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled = scaler.transform(test_data)
# Train One-Class SVM
oc_svm = OneClassSVM(nu=nu, kernel='rbf', gamma='scale')
oc_svm.fit(train_scaled)
# Predict
train_predictions = oc_svm.predict(train_scaled)
test_predictions = oc_svm.predict(test_scaled)
train_scores = oc_svm.decision_function(train_scaled)
test_scores = oc_svm.decision_function(test_scaled)
train_anomalies = (train_predictions == -1)
test_anomalies = (test_predictions == -1)
self.models['one_class_svm'] = oc_svm
self.scalers['one_class_svm'] = scaler
return {
'train_anomalies': train_anomalies,
'test_anomalies': test_anomalies,
'train_scores': train_scores,
'test_scores': test_scores,
'model': oc_svm,
'scaler': scaler
}
def local_outlier_factor_detection(self, data, n_neighbors=20, contamination=0.1):
"""
Local Outlier Factor for density-based anomaly detection
"""
lof = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=contamination,
novelty=False
)
predictions = lof.fit_predict(data)
outlier_scores = lof.negative_outlier_factor_
anomalies = (predictions == -1)
return {
'anomalies': anomalies,
'outlier_scores': outlier_scores,
'model': lof
}
def dbscan_clustering_detection(self, data, eps=0.5, min_samples=5):
"""
DBSCAN clustering for anomaly detection
"""
# Scale data
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
# Apply DBSCAN
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
cluster_labels = dbscan.fit_predict(data_scaled)
# Points labeled as -1 are considered anomalies
anomalies = (cluster_labels == -1)
# Calculate cluster statistics
unique_clusters = np.unique(cluster_labels[cluster_labels != -1])
cluster_sizes = [np.sum(cluster_labels == cluster) for cluster in unique_clusters]
return {
'anomalies': anomalies,
'cluster_labels': cluster_labels,
'n_clusters': len(unique_clusters),
'cluster_sizes': cluster_sizes,
'noise_points': np.sum(anomalies),
'model': dbscan,
'scaler': scaler
}
def autoencoder_detection(self, train_data, test_data, encoding_dim=10, threshold_percentile=95):
"""
Autoencoder-based anomaly detection
"""
# Scale data
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_data)
test_scaled = scaler.transform(test_data)
input_dim = train_scaled.shape[1]
# Build autoencoder
input_layer = Input(shape=(input_dim,))
# Encoder
encoded = Dense(encoding_dim * 2, activation='relu')(input_layer)
encoded = Dense(encoding_dim, activation='relu')(encoded)
# Decoder
decoded = Dense(encoding_dim * 2, activation='relu')(encoded)
decoded = Dense(input_dim, activation='linear')(decoded)
autoencoder = Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
# Train autoencoder
history = autoencoder.fit(
train_scaled, train_scaled,
epochs=100,
batch_size=32,
validation_split=0.1,
verbose=0
)
# Calculate reconstruction errors
train_predictions = autoencoder.predict(train_scaled)
test_predictions = autoencoder.predict(test_scaled)
train_mse = np.mean(np.power(train_scaled - train_predictions, 2), axis=1)
test_mse = np.mean(np.power(test_scaled - test_predictions, 2), axis=1)
# Set threshold based on training data
threshold = np.percentile(train_mse, threshold_percentile)
train_anomalies = train_mse > threshold
test_anomalies = test_mse > threshold
self.models['autoencoder'] = autoencoder
self.scalers['autoencoder'] = scaler
return {
'train_anomalies': train_anomalies,
'test_anomalies': test_anomalies,
'train_mse': train_mse,
'test_mse': test_mse,
'threshold': threshold,
'model': autoencoder,
'scaler': scaler,
'training_history': history
}
def lstm_autoencoder_detection(self, sequence_data, sequence_length=24, encoding_dim=50):
"""
LSTM Autoencoder for temporal anomaly detection
"""
# Prepare sequence data
def create_sequences(data, seq_length):
sequences = []
for i in range(len(data) - seq_length + 1):
sequences.append(data[i:i + seq_length])
return np.array(sequences)
sequences = create_sequences(sequence_data, sequence_length)
# Split data
train_size = int(0.8 * len(sequences))
train_sequences = sequences[:train_size]
test_sequences = sequences[train_size:]
# Scale data
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_sequences.reshape(-1, train_sequences.shape[-1]))
train_scaled = train_scaled.reshape(train_sequences.shape)
test_scaled = scaler.transform(test_sequences.reshape(-1, test_sequences.shape[-1]))
test_scaled = test_scaled.reshape(test_sequences.shape)
# Build LSTM Autoencoder
input_layer = Input(shape=(sequence_length, train_scaled.shape[2]))
# Encoder
encoded = LSTM(encoding_dim, activation='relu')(input_layer)
# Repeat vector for decoder
repeated = RepeatVector(sequence_length)(encoded)
# Decoder
decoded = LSTM(encoding_dim, activation='relu', return_sequences=True)(repeated)
decoded = TimeDistributed(Dense(train_scaled.shape[2]))(decoded)
lstm_autoencoder = Model(input_layer, decoded)
lstm_autoencoder.compile(optimizer='adam', loss='mse')
# Train model
history = lstm_autoencoder.fit(
train_scaled, train_scaled,
epochs=50,
batch_size=32,
validation_split=0.1,
verbose=0
)
# Calculate reconstruction errors
train_pred = lstm_autoencoder.predict(train_scaled)
test_pred = lstm_autoencoder.predict(test_scaled)
train_mse = np.mean(np.power(train_scaled - train_pred, 2), axis=(1, 2))
test_mse = np.mean(np.power(test_scaled - test_pred, 2), axis=(1, 2))
# Set threshold
threshold = np.percentile(train_mse, 95)
train_anomalies = train_mse > threshold
test_anomalies = test_mse > threshold
return {
'train_anomalies': train_anomalies,
'test_anomalies': test_anomalies,
'train_mse': train_mse,
'test_mse': test_mse,
'threshold': threshold,
'model': lstm_autoencoder,
'scaler': scaler
}
def ensemble_detection(self, train_data, test_data, methods=['isolation_forest', 'autoencoder', 'one_class_svm']):
"""
Ensemble approach combining multiple detection methods
"""
results = {}
for method in methods:
if method == 'isolation_forest':
results[method] = self.isolation_forest_detection(train_data, test_data)
elif method == 'autoencoder':
results[method] = self.autoencoder_detection(train_data, test_data)
elif method == 'one_class_svm':
results[method] = self.one_class_svm_detection(train_data, test_data)
elif method == 'local_outlier_factor':
# LOF needs combined data
combined_data = np.vstack([train_data, test_data])
lof_result = self.local_outlier_factor_detection(combined_data)
train_size = len(train_data)
results[method] = {
'train_anomalies': lof_result['anomalies'][:train_size],
'test_anomalies': lof_result['anomalies'][train_size:]
}
# Combine predictions using voting
test_predictions = []
for method in methods:
if method in results:
test_predictions.append(results[method]['test_anomalies'])
if test_predictions:
# Majority voting
test_predictions_array = np.array(test_predictions)
ensemble_test_anomalies = np.sum(test_predictions_array, axis=0) >= len(methods) / 2
# Calculate confidence based on agreement
confidence = np.mean(test_predictions_array, axis=0)
results['ensemble'] = {
'test_anomalies': ensemble_test_anomalies,
'confidence': confidence,
'individual_results': results
}
return results
def demonstrate_ml_detection():
"""
Demonstrate ML-based anomaly detection methods
"""
detector = MLAnomalyDetector()
# Generate synthetic traffic data
print("Generating traffic data with temporal patterns...")
# Create more realistic traffic data
n_days = 30
intervals_per_day = 96 # 15-minute intervals
n_samples = n_days * intervals_per_day
time_index = np.arange(n_samples)
hours = (time_index % intervals_per_day) / 4 # Convert to hours
days = time_index // intervals_per_day
# Base traffic pattern
daily_pattern = 50 + 40 * (np.sin((hours - 6) * np.pi / 12) ** 2)
weekly_pattern = 1.0 - 0.3 * ((days % 7) >= 5) # Lower on weekends
# Traffic volume
volume = daily_pattern * weekly_pattern + np.random.normal(0, 8, n_samples)
volume = np.maximum(volume, 5) # Minimum traffic
# Speed (inversely related to volume with some noise)
speed = 60 - 0.3 * volume + np.random.normal(0, 5, n_samples)
speed = np.clip(speed, 10, 70)
# Density
density = volume / speed
# Create DataFrame
traffic_df = pd.DataFrame({
'volume': volume,
'speed': speed,
'density': density,
'hour': hours,
'day_of_week': days % 7,
'month': 1 # Simplified
})
# Add anomalies
anomaly_indices = np.random.choice(n_samples, size=int(0.05 * n_samples), replace=False)
true_anomalies = np.zeros(n_samples, dtype=bool)
true_anomalies[anomaly_indices] = True
# Inject different types of anomalies
for idx in anomaly_indices:
anomaly_type = np.random.choice(['accident', 'congestion', 'sensor_error'])
if anomaly_type == 'accident':
# Sharp drop in speed, increase in density
traffic_df.loc[idx, 'speed'] *= 0.3
traffic_df.loc[idx, 'density'] *= 2.0
elif anomaly_type == 'congestion':
# Moderate speed reduction, volume increase
traffic_df.loc[idx, 'speed'] *= 0.6
traffic_df.loc[idx, 'volume'] *= 1.5
elif anomaly_type == 'sensor_error':
# Random extreme values
traffic_df.loc[idx, 'volume'] = np.random.uniform(0, 200)
traffic_df.loc[idx, 'speed'] = np.random.uniform(0, 100)
# Prepare features
feature_matrix, valid_rows = detector.prepare_features(traffic_df)
true_labels = true_anomalies[valid_rows]
# Split data
train_size = int(0.7 * len(feature_matrix))
train_data = feature_matrix[:train_size]
test_data = feature_matrix[train_size:]
train_labels = true_labels[:train_size]
test_labels = true_labels[train_size:]
print(f"Dataset prepared: {len(feature_matrix)} samples, {feature_matrix.shape[1]} features")
print(f"Training set: {len(train_data)} samples")
print(f"Test set: {len(test_data)} samples")
print(f"Anomaly rate: {np.mean(true_labels):.2%}")
# Test different methods
results = {}
# 1. Isolation Forest
print("\n1. Testing Isolation Forest...")
iso_result = detector.isolation_forest_detection(train_data, test_data, contamination=0.1)
iso_performance = calculate_performance_metrics(iso_result['test_anomalies'], test_labels)
results['isolation_forest'] = iso_performance
print(f"Isolation Forest - Precision: {iso_performance['precision']:.3f}, Recall: {iso_performance['recall']:.3f}, F1: {iso_performance['f1_score']:.3f}")
# 2. One-Class SVM
print("\n2. Testing One-Class SVM...")
svm_result = detector.one_class_svm_detection(train_data, test_data, nu=0.1)
svm_performance = calculate_performance_metrics(svm_result['test_anomalies'], test_labels)
results['one_class_svm'] = svm_performance
print(f"One-Class SVM - Precision: {svm_performance['precision']:.3f}, Recall: {svm_performance['recall']:.3f}, F1: {svm_performance['f1_score']:.3f}")
# 3. Autoencoder
print("\n3. Testing Autoencoder...")
ae_result = detector.autoencoder_detection(train_data, test_data, encoding_dim=8)
ae_performance = calculate_performance_metrics(ae_result['test_anomalies'], test_labels)
results['autoencoder'] = ae_performance
print(f"Autoencoder - Precision: {ae_performance['precision']:.3f}, Recall: {ae_performance['recall']:.3f}, F1: {ae_performance['f1_score']:.3f}")
# 4. Local Outlier Factor
print("\n4. Testing Local Outlier Factor...")
combined_data = np.vstack([train_data, test_data])
combined_labels = np.concatenate([train_labels, test_labels])
lof_result = detector.local_outlier_factor_detection(combined_data, contamination=0.1)
lof_performance = calculate_performance_metrics(lof_result['anomalies'][train_size:], test_labels)
results['local_outlier_factor'] = lof_performance
print(f"LOF - Precision: {lof_performance['precision']:.3f}, Recall: {lof_performance['recall']:.3f}, F1: {lof_performance['f1_score']:.3f}")
# 5. Ensemble Method
print("\n5. Testing Ensemble Method...")
ensemble_result = detector.ensemble_detection(train_data, test_data,
methods=['isolation_forest', 'autoencoder', 'one_class_svm'])
ensemble_performance = calculate_performance_metrics(ensemble_result['ensemble']['test_anomalies'], test_labels)
results['ensemble'] = ensemble_performance
print(f"Ensemble - Precision: {ensemble_performance['precision']:.3f}, Recall: {ensemble_performance['recall']:.3f}, F1: {ensemble_performance['f1_score']:.3f}")
# Visualization
plt.figure(figsize=(15, 12))
# Plot original traffic data
plt.subplot(3, 2, 1)
test_traffic = traffic_df.iloc[train_size:train_size+len(test_data)]
plt.plot(test_traffic['volume'].values, 'b-', alpha=0.7, label='Volume')
anomaly_test_indices = np.where(test_labels)[0]
if len(anomaly_test_indices) > 0:
plt.scatter(anomaly_test_indices, test_traffic['volume'].values[anomaly_test_indices],
color='red', s=50, label='True Anomalies')
plt.title('Traffic Volume - True Anomalies')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot detection results for different methods
methods_to_plot = ['isolation_forest', 'one_class_svm', 'autoencoder', 'ensemble']
method_results = [iso_result, svm_result, ae_result, ensemble_result]
for i, (method, method_result) in enumerate(zip(methods_to_plot, method_results)):
plt.subplot(3, 2, i + 2)
plt.plot(test_traffic['volume'].values, 'b-', alpha=0.7, label='Volume')
if method == 'ensemble':
detected_anomalies = np.where(method_result['ensemble']['test_anomalies'])[0]
else:
detected_anomalies = np.where(method_result['test_anomalies'])[0]
if len(detected_anomalies) > 0:
plt.scatter(detected_anomalies, test_traffic['volume'].values[detected_anomalies],
color='orange', s=30, label='Detected')
if len(anomaly_test_indices) > 0:
plt.scatter(anomaly_test_indices, test_traffic['volume'].values[anomaly_test_indices],
color='red', s=50, alpha=0.7, label='True')
performance = results[method]
plt.title(f'{method.replace("_", " ").title()}\nF1: {performance["f1_score"]:.3f}')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results
def calculate_performance_metrics(predictions, true_labels):
"""
Calculate performance metrics for anomaly detection
"""
tp = np.sum(predictions & true_labels)
fp = np.sum(predictions & ~true_labels)
tn = np.sum(~predictions & ~true_labels)
fn = np.sum(~predictions & true_labels)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'true_positives': tp,
'false_positives': fp,
'true_negatives': tn,
'false_negatives': fn
}
# Run ML demonstration
ml_results = demonstrate_ml_detection()
## Real-Time Implementation and System Integration
Building a production-ready anomaly detection system requires careful consideration of real-time processing requirements, system integration, and operational workflows.
```python
import asyncio
import json
import logging
import time
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import redis
import websockets
class RealTimeAnomalyDetectionSystem:
"""
Real-time traffic anomaly detection and alert system
"""
def __init__(self, config_path: str = None):
self.config = self.load_config(config_path)
self.setup_logging()
# Detection models
self.spc_detector = TrafficSPCDetector()
self.ml_detector = MLAnomalyDetector()
# Data buffers
self.data_buffer = deque(maxlen=self.config.get('buffer_size', 1000))
self.alert_history = deque(maxlen=self.config.get('alert_history_size', 500))
# Real-time processing
self.processing_queue = asyncio.Queue()
self.alert_subscribers = set()
# Performance monitoring
self.performance_metrics = {
'total_processed': 0,
'anomalies_detected': 0,
'processing_times': deque(maxlen=100),
'false_alarm_rate': 0.0
}
# Redis for caching and coordination
try:
self.redis_client = redis.Redis(
host=self.config.get('redis_host', 'localhost'),
port=self.config.get('redis_port', 6379),
decode_responses=True
)
self.redis_available = True
except:
self.redis_available = False
self.logger.warning("Redis not available - using in-memory storage only")
def load_config(self, config_path: str) -> Dict:
"""Load system configuration"""
default_config = {
'detection_methods': ['spc', 'isolation_forest', 'autoencoder'],
'alert_thresholds': {
'critical': 0.9,
'high': 0.7,
'medium': 0.5
},
'processing_interval': 1.0, # seconds
'buffer_size': 1000,
'alert_history_size': 500,
'performance_logging_interval': 60, # seconds
'redis_host': 'localhost',
'redis_port': 6379
}
if config_path:
try:
with open(config_path, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
except FileNotFoundError:
pass
return default_config
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('anomaly_detection.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def process_traffic_data(self, data_point: Dict):
"""
Process incoming traffic data point for anomaly detection
"""
start_time = time.time()
try:
# Add to processing queue
await self.processing_queue.put(data_point)
# Update buffer
self.data_buffer.append(data_point)
# Perform anomaly detection
detection_results = await self.detect_anomalies(data_point)
# Generate alerts if necessary
if detection_results['is_anomaly']:
await self.generate_alert(data_point, detection_results)
# Update performance metrics
processing_time = time.time() - start_time
self.performance_metrics['total_processed'] += 1
self.performance_metrics['processing_times'].append(processing_time)
if detection_results['is_anomaly']:
self.performance_metrics['anomalies_detected'] += 1
# Cache results
if self.redis_available:
await self.cache_detection_result(data_point, detection_results)
return detection_results
except Exception as e:
self.logger.error(f"Error processing traffic data: {str(e)}")
return {'is_anomaly': False, 'error': str(e)}
async def detect_anomalies(self, data_point: Dict) -> Dict:
"""
Run anomaly detection using configured methods
"""
detection_results = {
'timestamp': data_point.get('timestamp', datetime.now().isoformat()),
'location_id': data_point.get('location_id'),
'is_anomaly': False,
'confidence': 0.0,
'anomaly_type': None,
'severity': 'normal',
'detection_methods': {}
}
try:
# Extract numeric values
volume = data_point.get('volume', 0)
speed = data_point.get('speed', 0)
density = data_point.get('density', 0)
method_scores = []
# SPC-based detection
if 'spc' in self.config['detection_methods'] and len(self.data_buffer) >= 20:
spc_result = self.run_spc_detection(volume)
detection_results['detection_methods']['spc'] = spc_result
if spc_result['is_anomaly']:
method_scores.append(spc_result.get('confidence', 0.5))
# ML-based detection
if len(self.data_buffer) >= 50: # Need sufficient data for ML methods
ml_result = await self.run_ml_detection(data_point)
detection_results['detection_methods']['ml'] = ml_result
if ml_result['is_anomaly']:
method_scores.append(ml_result.get('confidence', 0.5))
# Combine results
if method_scores:
detection_results['is_anomaly'] = True
detection_results['confidence'] = np.mean(method_scores)
detection_results['severity'] = self.determine_severity(detection_results['confidence'])
detection_results['anomaly_type'] = self.classify_anomaly_type(data_point)
except Exception as e:
self.logger.error(f"Error in anomaly detection: {str(e)}")
detection_results['error'] = str(e)
return detection_results
def run_spc_detection(self, value: float) -> Dict:
"""
Run statistical process control detection
"""
try:
# Use recent data to establish control limits
recent_values = [dp.get('volume', 0) for dp in list(self.data_buffer)[-50:]]
control_limits = self.spc_detector.initialize_control_limits(recent_values)
# Detect anomaly
result = self.spc_detector.xbar_chart_detection(value, control_limits)
return {
'is_anomaly': result['is_anomaly'],
'confidence': min(result.get('deviation', 0) / 3.0, 1.0),
'method': 'spc_xbar',
'details': result
}
except Exception as e:
self.logger.error(f"SPC detection error: {str(e)}")
return {'is_anomaly': False, 'error': str(e)}
async def run_ml_detection(self, data_point: Dict) -> Dict:
"""
Run machine learning based detection
"""
try:
# Prepare recent data for ML detection
recent_data = list(self.data_buffer)[-100:] # Last 100 points
# Extract features
feature_matrix = []
for dp in recent_data:
features = [
dp.get('volume', 0),
dp.get('speed', 0),
dp.get('density', 0),
dp.get('hour', datetime.now().hour),
dp.get('day_of_week', datetime.now().weekday())
]
feature_matrix.append(features)
feature_matrix = np.array(feature_matrix)
# Run isolation forest on recent data
if len(feature_matrix) >= 20:
iso_forest = IsolationForest(contamination=0.1, random_state=42)
iso_forest.fit(feature_matrix[:-1]) # Train on all but last point
current_features = feature_matrix[-1].reshape(1, -1)
prediction = iso_forest.predict(current_features)[0]
score = iso_forest.decision_function(current_features)[0]
is_anomaly = prediction == -1
confidence = abs(score) if is_anomaly else 0
return {
'is_anomaly': is_anomaly,
'confidence': min(confidence, 1.0),
'method': 'isolation_forest',
'score': score
}
else:
return {'is_anomaly': False, 'reason': 'insufficient_data'}
except Exception as e:
self.logger.error(f"ML detection error: {str(e)}")
return {'is_anomaly': False, 'error': str(e)}
def determine_severity(self, confidence: float) -> str:
"""
Determine anomaly severity based on confidence score
"""
thresholds = self.config['alert_thresholds']
if confidence >= thresholds['critical']:
return 'critical'
elif confidence >= thresholds['high']:
return 'high'
elif confidence >= thresholds['medium']:
return 'medium'
else:
return 'low'
def classify_anomaly_type(self, data_point: Dict) -> str:
"""
Classify the type of anomaly based on traffic characteristics
"""
volume = data_point.get('volume', 0)
speed = data_point.get('speed', 0)
density = data_point.get('density', 0)
# Simple rule-based classification
if speed < 20 and density > 50:
return 'severe_congestion'
elif volume > 100 and speed < 30:
return 'moderate_congestion'
elif volume < 5 and speed > 70:
return 'unusual_free_flow'
elif density > 80:
return 'potential_incident'
else:
return 'general_anomaly'
async def generate_alert(self, data_point: Dict, detection_results: Dict):
"""
Generate and distribute alerts for detected anomalies
"""
alert = {
'alert_id': f"alert_{int(time.time())}_{data_point.get('location_id', 'unknown')}",
'timestamp': datetime.now().isoformat(),
'location_id': data_point.get('location_id'),
'anomaly_type': detection_results.get('anomaly_type'),
'severity': detection_results.get('severity'),
'confidence': detection_results.get('confidence'),
'traffic_data': data_point,
'detection_methods': detection_results.get('detection_methods', {}),
'recommended_actions': self.get_recommended_actions(detection_results)
}
# Add to alert history
self.alert_history.append(alert)
# Log alert
self.logger.warning(f"ANOMALY ALERT: {alert['alert_id']} - {alert['anomaly_type']} "
f"at {alert['location_id']} (Severity: {alert['severity']}, "
f"Confidence: {alert['confidence']:.2f})")
# Distribute to subscribers
await self.distribute_alert(alert)
# Store in Redis if available
if self.redis_available:
try:
alert_key = f"alert:{alert['alert_id']}"
self.redis_client.setex(alert_key, 3600, json.dumps(alert)) # 1 hour expiry
except Exception as e:
self.logger.error(f"Error storing alert in Redis: {str(e)}")
def get_recommended_actions(self, detection_results: Dict) -> List[str]:
"""
Generate recommended actions based on anomaly type and severity
"""
severity = detection_results.get('severity')
anomaly_type = detection_results.get('anomaly_type')
actions = []
if severity == 'critical':
actions.extend([
"Immediate field verification required",
"Consider emergency response deployment",
"Activate incident management protocols"
])
if anomaly_type in ['severe_congestion', 'potential_incident']:
actions.extend([
"Deploy traffic management team",
"Activate variable message signs",
"Consider alternate route recommendations"
])
if severity in ['high', 'critical']:
actions.extend([
"Notify traffic control center",
"Update real-time traffic information systems",
"Monitor upstream and downstream conditions"
])
return actions
async def distribute_alert(self, alert: Dict):
"""
Distribute alert to all subscribers
"""
if self.alert_subscribers:
alert_message = json.dumps(alert)
# Send to all WebSocket subscribers
disconnected = set()
for websocket in self.alert_subscribers:
try:
await websocket.send(alert_message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(websocket)
except Exception as e:
self.logger.error(f"Error sending alert: {str(e)}")
disconnected.add(websocket)
# Remove disconnected subscribers
self.alert_subscribers -= disconnected
async def cache_detection_result(self, data_point: Dict, detection_results: Dict):
"""
Cache detection results for analysis and debugging
"""
try:
cache_key = f"detection:{data_point.get('location_id')}:{int(time.time())}"
cache_data = {
'data_point': data_point,
'detection_results': detection_results
}
self.redis_client.setex(cache_key, 1800, json.dumps(cache_data)) # 30 minutes
except Exception as e:
self.logger.error(f"Error caching detection result: {str(e)}")
def get_system_status(self) -> Dict:
"""
Get current system status and performance metrics
"""
avg_processing_time = np.mean(self.performance_metrics['processing_times']) if self.performance_metrics['processing_times'] else 0
status = {
'system_status': 'operational',
'total_processed': self.performance_metrics['total_processed'],
'anomalies_detected': self.performance_metrics['anomalies_detected'],
'detection_rate': self.performance_metrics['anomalies_detected'] / max(1, self.performance_metrics['total_processed']),
'avg_processing_time': avg_processing_time,
'buffer_size': len(self.data_buffer),
'alert_subscribers': len(self.alert_subscribers),
'recent_alerts': len([a for a in self.alert_history if
datetime.fromisoformat(a['timestamp']) > datetime.now() - timedelta(hours=1)]),
'uptime': time.time() - getattr(self, 'start_time', time.time()),
'redis_status': 'connected' if self.redis_available else 'disconnected'
}
return status
# WebSocket server for real-time alerts
async def alert_websocket_handler(websocket, path, detection_system):
"""
WebSocket handler for real-time alert distribution
"""
detection_system.alert_subscribers.add(websocket)
try:
await websocket.wait_closed()
finally:
detection_system.alert_subscribers.discard(websocket)
# Example usage
async def simulate_real_time_detection():
"""
Simulate real-time traffic anomaly detection
"""
system = RealTimeAnomalyDetectionSystem()
system.start_time = time.time()
print("Starting real-time anomaly detection simulation...")
# Simulate incoming traffic data
for i in range(100):
# Generate realistic traffic data
hour = (i % 24)
is_weekend = (i // 24) % 7 >= 5
# Base traffic pattern
if 7 <= hour <= 9 or 17 <= hour <= 19: # Rush hours
base_volume = np.random.normal(80, 15)
elif 22 <= hour or hour <= 6: # Night hours
base_volume = np.random.normal(20, 5)
else:
base_volume = np.random.normal(50, 10)
if is_weekend:
base_volume *= 0.7
# Occasionally inject anomalies
if np.random.random() < 0.1: # 10% chance of anomaly
if np.random.random() < 0.5:
base_volume *= 0.3 # Accident scenario
else:
base_volume *= 2.0 # Unusual congestion
volume = max(5, base_volume + np.random.normal(0, 5))
speed = max(10, 60 - 0.4 * volume + np.random.normal(0, 8))
density = volume / speed
data_point = {
'timestamp': datetime.now().isoformat(),
'location_id': f'sensor_{i % 10:03d}',
'volume': volume,
'speed': speed,
'density': density,
'hour': hour,
'day_of_week': (i // 24) % 7
}
# Process data point
result = await system.process_traffic_data(data_point)
if result.get('is_anomaly'):
print(f"ANOMALY DETECTED at {data_point['location_id']}: "
f"Type: {result.get('anomaly_type')}, "
f"Severity: {result.get('severity')}, "
f"Confidence: {result.get('confidence', 0):.2f}")
# Small delay to simulate real-time processing
await asyncio.sleep(0.1)
# Print system status
status = system.get_system_status()
print(f"\nSystem Status:")
print(f"Total Processed: {status['total_processed']}")
print(f"Anomalies Detected: {status['anomalies_detected']}")
print(f"Detection Rate: {status['detection_rate']:.2%}")
print(f"Avg Processing Time: {status['avg_processing_time']:.4f}s")
# Run simulation
# asyncio.run(simulate_real_time_detection())
## Emergency Response Integration and Alert Management
A critical component of any traffic anomaly detection system is its integration with emergency response protocols and traffic management centers. This section explores the operational workflows and system integrations necessary for effective incident response.
```python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from enum import Enum
import requests
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertStatus(Enum):
ACTIVE = "active"
ACKNOWLEDGED = "acknowledged"
INVESTIGATING = "investigating"
RESOLVED = "resolved"
FALSE_ALARM = "false_alarm"
class EmergencyResponseIntegration:
"""
Integration system for emergency response and traffic management
"""
def __init__(self, config: Dict):
self.config = config
self.setup_logging()
self.active_incidents = {}
self.response_teams = {}
self.escalation_rules = self.load_escalation_rules()
def setup_logging(self):
"""Setup logging for emergency response"""
self.logger = logging.getLogger('emergency_response')
def load_escalation_rules(self) -> Dict:
"""Load escalation rules for different alert types"""
return {
AlertSeverity.CRITICAL: {
'max_response_time': 300, # 5 minutes
'auto_escalate_after': 600, # 10 minutes
'notification_channels': ['sms', 'email', 'radio', 'dashboard'],
'required_response_teams': ['traffic_control', 'emergency_services']
},
AlertSeverity.HIGH: {
'max_response_time': 900, # 15 minutes
'auto_escalate_after': 1800, # 30 minutes
'notification_channels': ['email', 'dashboard'],
'required_response_teams': ['traffic_control']
},
AlertSeverity.MEDIUM: {
'max_response_time': 1800, # 30 minutes
'auto_escalate_after': 3600, # 1 hour
'notification_channels': ['email', 'dashboard'],
'required_response_teams': ['traffic_control']
},
AlertSeverity.LOW: {
'max_response_time': 3600, # 1 hour
'auto_escalate_after': None,
'notification_channels': ['dashboard'],
'required_response_teams': []
}
}
async def process_alert(self, alert: Dict) -> Dict:
"""
Process incoming alert and initiate appropriate response
"""
alert_id = alert['alert_id']
severity = AlertSeverity(alert['severity'])
# Create incident record
incident = {
'incident_id': f"INC_{int(time.time())}",
'alert_id': alert_id,
'created_at': datetime.now(),
'location_id': alert['location_id'],
'severity': severity,
'status': AlertStatus.ACTIVE,
'assigned_teams': [],
'response_actions': [],
'estimated_resolution_time': None,
'actual_resolution_time': None
}
self.active_incidents[incident['incident_id']] = incident
# Apply escalation rules
escalation_rule = self.escalation_rules[severity]
# Send notifications
await self.send_notifications(alert, incident, escalation_rule['notification_channels'])
# Assign response teams
await self.assign_response_teams(incident, escalation_rule['required_response_teams'])
# Set up automatic escalation if configured
if escalation_rule['auto_escalate_after']:
await self.schedule_escalation(incident, escalation_rule['auto_escalate_after'])
# Log incident creation
self.logger.info(f"Incident {incident['incident_id']} created for alert {alert_id}")
return incident
async def send_notifications(self, alert: Dict, incident: Dict, channels: List[str]):
"""
Send notifications through configured channels
"""
message_content = self.format_alert_message(alert, incident)
for channel in channels:
try:
if channel == 'email':
await self.send_email_notification(alert, message_content)
elif channel == 'sms':
await self.send_sms_notification(alert, message_content)
elif channel == 'radio':
await self.send_radio_notification(alert, message_content)
elif channel == 'dashboard':
await self.update_dashboard(alert, incident)
except Exception as e:
self.logger.error(f"Failed to send {channel} notification: {str(e)}")
def format_alert_message(self, alert: Dict, incident: Dict) -> str:
"""
Format alert message for human consumption
"""
return f"""
TRAFFIC ANOMALY ALERT - {alert['severity'].upper()}
Incident ID: {incident['incident_id']}
Location: {alert['location_id']}
Time: {alert['timestamp']}
Type: {alert['anomaly_type']}
Confidence: {alert['confidence']:.1%}
Traffic Conditions:
- Volume: {alert['traffic_data'].get('volume', 'N/A')}
- Speed: {alert['traffic_data'].get('speed', 'N/A')} mph
- Density: {alert['traffic_data'].get('density', 'N/A')}
Recommended Actions:
{chr(10).join('- ' + action for action in alert.get('recommended_actions', []))}
Detection Methods: {', '.join(alert.get('detection_methods', {}).keys())}
""".strip()
async def send_email_notification(self, alert: Dict, message: str):
"""
Send email notification to configured recipients
"""
try:
smtp_config = self.config.get('email', {})
if not smtp_config:
return
msg = MIMEMultipart()
msg['From'] = smtp_config.get('from_address')
msg['Subject'] = f"Traffic Alert - {alert['severity'].upper()} - {alert['location_id']}"
# Determine recipients based on severity
if alert['severity'] == AlertSeverity.CRITICAL.value:
recipients = smtp_config.get('critical_recipients', [])
elif alert['severity'] == AlertSeverity.HIGH.value:
recipients = smtp_config.get('high_recipients', [])
else:
recipients = smtp_config.get('general_recipients', [])
msg['To'] = ', '.join(recipients)
msg.attach(MIMEText(message, 'plain'))
# Send email
server = smtplib.SMTP(smtp_config.get('smtp_server'), smtp_config.get('smtp_port', 587))
server.starttls()
server.login(smtp_config.get('username'), smtp_config.get('password'))
server.send_message(msg)
server.quit()
self.logger.info(f"Email notification sent for alert {alert['alert_id']}")
except Exception as e:
self.logger.error(f"Failed to send email notification: {str(e)}")
async def send_sms_notification(self, alert: Dict, message: str):
"""
Send SMS notification using configured service
"""
try:
sms_config = self.config.get('sms', {})
if not sms_config:
return
# Truncate message for SMS
sms_message = message[:160] + "..." if len(message) > 160 else message
# Use SMS service API (example with Twilio-like service)
api_url = sms_config.get('api_url')
api_key = sms_config.get('api_key')
if alert['severity'] in [AlertSeverity.CRITICAL.value, AlertSeverity.HIGH.value]:
phone_numbers = sms_config.get('emergency_numbers', [])
else:
phone_numbers = sms_config.get('general_numbers', [])
for phone_number in phone_numbers:
payload = {
'to': phone_number,
'message': sms_message,
'from': sms_config.get('from_number')
}
headers = {'Authorization': f'Bearer {api_key}'}
response = requests.post(api_url, json=payload, headers=headers)
if response.status_code == 200:
self.logger.info(f"SMS sent to {phone_number} for alert {alert['alert_id']}")
else:
self.logger.error(f"Failed to send SMS to {phone_number}: {response.text}")
except Exception as e:
self.logger.error(f"Failed to send SMS notification: {str(e)}")
async def assign_response_teams(self, incident: Dict, required_teams: List[str]):
"""
Assign appropriate response teams to incident
"""
try:
for team_type in required_teams:
available_team = self.find_available_team(team_type, incident['location_id'])
if available_team:
# Assign team to incident
incident['assigned_teams'].append({
'team_id': available_team['team_id'],
'team_type': team_type,
'assigned_at': datetime.now(),
'status': 'assigned'
})
# Notify team
await self.notify_response_team(available_team, incident)
self.logger.info(f"Assigned {team_type} team {available_team['team_id']} to incident {incident['incident_id']}")
else:
self.logger.warning(f"No available {team_type} team found for incident {incident['incident_id']}")
except Exception as e:
self.logger.error(f"Failed to assign response teams: {str(e)}")
def find_available_team(self, team_type: str, location_id: str) -> Optional[Dict]:
"""
Find available response team closest to incident location
"""
# This would integrate with actual team management system
# For now, return mock team data
teams = self.config.get('response_teams', {}).get(team_type, [])
for team in teams:
if team.get('status') == 'available':
return team
return None
async def notify_response_team(self, team: Dict, incident: Dict):
"""
Notify assigned response team about incident
"""
try:
# Format team notification
notification = {
'incident_id': incident['incident_id'],
'location': incident['location_id'],
'severity': incident['severity'].value,
'assignment_time': datetime.now().isoformat(),
'expected_response_time': self.calculate_expected_response_time(team, incident)
}
# Send to team's communication channel
team_channel = team.get('communication_channel')
if team_channel == 'radio':
await self.send_radio_dispatch(team, notification)
elif team_channel == 'mobile_app':
await self.send_mobile_notification(team, notification)
elif team_channel == 'email':
await self.send_team_email(team, notification)
except Exception as e:
self.logger.error(f"Failed to notify response team: {str(e)}")
def calculate_expected_response_time(self, team: Dict, incident: Dict) -> int:
"""
Calculate expected response time based on team location and incident severity
"""
# Simplified calculation - in reality would use routing and traffic data
base_response_time = team.get('base_response_time', 15) # minutes
if incident['severity'] == AlertSeverity.CRITICAL:
return base_response_time * 0.7 # Expedited response
elif incident['severity'] == AlertSeverity.HIGH:
return base_response_time
else:
return base_response_time * 1.5
async def update_incident_status(self, incident_id: str, new_status: AlertStatus,
notes: str = None) -> Dict:
"""
Update incident status and track response metrics
"""
if incident_id not in self.active_incidents:
raise ValueError(f"Incident {incident_id} not found")
incident = self.active_incidents[incident_id]
old_status = incident['status']
incident['status'] = new_status
incident['last_updated'] = datetime.now()
# Add status change to response actions
status_change = {
'action_type': 'status_change',
'timestamp': datetime.now(),
'from_status': old_status.value,
'to_status': new_status.value,
'notes': notes
}
incident['response_actions'].append(status_change)
# Calculate resolution time if resolved
if new_status in [AlertStatus.RESOLVED, AlertStatus.FALSE_ALARM]:
incident['actual_resolution_time'] = datetime.now()
resolution_duration = (incident['actual_resolution_time'] - incident['created_at']).total_seconds()
incident['resolution_duration_minutes'] = resolution_duration / 60
# Move to resolved incidents
self.logger.info(f"Incident {incident_id} resolved in {incident['resolution_duration_minutes']:.1f} minutes")
# Log status change
self.logger.info(f"Incident {incident_id} status changed from {old_status.value} to {new_status.value}")
return incident
def generate_incident_report(self, incident_id: str) -> Dict:
"""
Generate comprehensive incident report
"""
incident = self.active_incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
report = {
'incident_summary': {
'incident_id': incident['incident_id'],
'alert_id': incident['alert_id'],
'location': incident['location_id'],
'severity': incident['severity'].value,
'created_at': incident['created_at'].isoformat(),
'status': incident['status'].value
},
'response_metrics': {
'total_response_teams': len(incident['assigned_teams']),
'response_actions_count': len(incident['response_actions']),
'resolution_time_minutes': incident.get('resolution_duration_minutes')
},
'timeline': self.build_incident_timeline(incident),
'lessons_learned': self.extract_lessons_learned(incident),
'performance_analysis': self.analyze_response_performance(incident)
}
return report
def build_incident_timeline(self, incident: Dict) -> List[Dict]:
"""
Build chronological timeline of incident events
"""
timeline = []
# Add incident creation
timeline.append({
'timestamp': incident['created_at'],
'event_type': 'incident_created',
'description': f"Incident created with severity {incident['severity'].value}"
})
# Add team assignments
for team_assignment in incident['assigned_teams']:
timeline.append({
'timestamp': team_assignment['assigned_at'],
'event_type': 'team_assigned',
'description': f"{team_assignment['team_type']} team {team_assignment['team_id']} assigned"
})
# Add response actions
for action in incident['response_actions']:
timeline.append({
'timestamp': action['timestamp'],
'event_type': action['action_type'],
'description': action.get('notes', f"Action: {action['action_type']}")
})
return sorted(timeline, key=lambda x: x['timestamp'])
def extract_lessons_learned(self, incident: Dict) -> List[str]:
"""
Extract lessons learned from incident response
"""
lessons = []
# Analyze response time
if incident.get('resolution_duration_minutes'):
expected_resolution = self.escalation_rules[incident['severity']]['max_response_time'] / 60
if incident['resolution_duration_minutes'] > expected_resolution:
lessons.append(f"Response time exceeded target by {incident['resolution_duration_minutes'] - expected_resolution:.1f} minutes")
else:
lessons.append("Response time met or exceeded performance targets")
# Analyze team coordination
if len(incident['assigned_teams']) > 1:
lessons.append("Multi-team coordination required - review communication protocols")
# Analyze false alarm rate
if incident['status'] == AlertStatus.FALSE_ALARM:
lessons.append("False alarm detected - review detection algorithm sensitivity")
return lessons
def analyze_response_performance(self, incident: Dict) -> Dict:
"""
Analyze incident response performance metrics
"""
analysis = {
'response_time_performance': 'unknown',
'team_utilization': len(incident['assigned_teams']),
'communication_effectiveness': 'unknown',
'overall_rating': 'pending'
}
# Analyze response time
if incident.get('resolution_duration_minutes'):
target_time = self.escalation_rules[incident['severity']]['max_response_time'] / 60
if incident['resolution_duration_minutes'] <= target_time:
analysis['response_time_performance'] = 'excellent'
elif incident['resolution_duration_minutes'] <= target_time * 1.5:
analysis['response_time_performance'] = 'good'
else:
analysis['response_time_performance'] = 'needs_improvement'
# Overall rating
if analysis['response_time_performance'] == 'excellent':
analysis['overall_rating'] = 'successful'
elif analysis['response_time_performance'] in ['good', 'unknown']:
analysis['overall_rating'] = 'satisfactory'
else:
analysis['overall_rating'] = 'needs_improvement'
return analysis
# Example usage
emergency_config = {
'email': {
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': 'alerts@traffic.gov',
'password': 'secure_password',
'from_address': 'alerts@traffic.gov',
'critical_recipients': ['emergency@traffic.gov', 'supervisor@traffic.gov'],
'high_recipients': ['control@traffic.gov'],
'general_recipients': ['monitoring@traffic.gov']
},
'response_teams': {
'traffic_control': [
{
'team_id': 'TC001',
'status': 'available',
'location': 'downtown',
'communication_channel': 'radio',
'base_response_time': 15
}
],
'emergency_services': [
{
'team_id': 'ES001',
'status': 'available',
'location': 'central',
'communication_channel': 'radio',
'base_response_time': 8
}
]
}
}
# Integration example
emergency_system = EmergencyResponseIntegration(emergency_config)
print("Emergency Response Integration System initialized successfully!")
Conclusion and Best Practices
Real-time traffic anomaly detection systems represent a critical infrastructure component for modern transportation management. The successful implementation of such systems requires careful consideration of multiple technical, operational, and organizational factors.
Key Technical Considerations
Multi-Method Approach: No single detection method provides optimal performance across all scenarios. Successful systems combine statistical process control for baseline monitoring, machine learning for complex pattern recognition, and domain-specific rules for known incident types.
Real-Time Performance: Production systems must balance detection accuracy with processing speed. Implement efficient algorithms, use appropriate data structures, and consider edge computing for latency-critical applications.
Scalability Architecture: Design systems to handle growing data volumes and geographic coverage. Use distributed processing, efficient caching, and modular component architectures.
Data Quality Management: Implement robust data validation, sensor health monitoring, and graceful degradation when data sources are compromised.
Operational Best Practices
Alert Fatigue Prevention: Carefully tune detection thresholds to minimize false positives while maintaining sensitivity to real incidents. Implement alert suppression and correlation mechanisms.
Response Integration: Ensure seamless integration with existing traffic management and emergency response workflows. Provide clear, actionable information to human operators.
Continuous Improvement: Implement feedback loops to learn from incident outcomes and false alarms. Regular model retraining and threshold adjustment are essential.
Performance Monitoring: Track key metrics including detection latency, false positive rates, missed incidents, and response times. Use this data to continuously optimize system performance.
Future Directions
The field of traffic anomaly detection continues to evolve with advances in artificial intelligence, sensor technology, and communication networks. Future developments will likely include:
- Enhanced Spatial-Temporal Models: Graph neural networks and attention mechanisms for better understanding of traffic network dynamics
- Federated Learning: Privacy-preserving collaborative learning across multiple transportation agencies
- Predictive Anomaly Detection: Systems that can forecast potential incidents before they occur
- Autonomous Response: Integration with automated traffic management systems for immediate response to detected anomalies
Implementation Roadmap
For organizations planning to implement traffic anomaly detection systems:
- Start with Baseline Methods: Begin with statistical process control and simple threshold-based detection
- Integrate Gradually: Add machine learning components incrementally while maintaining operational systems
- Focus on Integration: Ensure seamless integration with existing traffic management infrastructure
- Invest in Training: Provide comprehensive training for operators and maintenance staff
- Plan for Evolution: Design flexible architectures that can accommodate future technological advances
Traffic anomaly detection systems, when properly designed and implemented, provide significant value in reducing incident impact, improving response times, and enhancing overall transportation system reliability. The investment in such systems pays dividends through reduced congestion, improved safety, and more efficient use of transportation infrastructure.