Files
drone-detector/health_probe_simulator.py
2025-09-07 12:29:07 +02:00

258 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Health Check Probe Simulator
Continuously sends heartbeat probes from all devices to test offline detection
"""
import requests
import time
import random
import os
import json
from datetime import datetime
from typing import List, Dict
# Disable SSL warnings for self-signed certificates
import warnings
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class DeviceProbeSimulator:
def __init__(self):
# Configuration from environment variables
# Tests default to localhost:3002 for local development
self.api_base_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
base_path = os.getenv('VITE_BASE_PATH', '').rstrip('/')
# If BASE_PATH is set, construct the full URL
if base_path and not self.api_base_url.endswith('/api'):
# Extract domain from API_BASE_URL and add base path
domain = self.api_base_url.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
self.api_base_url = f"{domain}{base_path}/api"
self.probe_interval = int(os.getenv('PROBE_INTERVAL_SECONDS', '60')) # 1 minute default
self.probe_failrate = int(os.getenv('PROBE_FAILRATE', '30')) # 30% failure rate
# Authentication configuration - Optional for local testing
self.username = os.getenv('TEST_USERNAME', 'admin')
self.password = os.getenv('TEST_PASSWORD', 'admin123')
self.skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true'
self.devices = []
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'HealthProbeSimulator/1.0'
})
self.auth_token = None
print(f"🔧 Health Probe Simulator Configuration:")
print(f" API URL: {self.api_base_url}")
print(f" Probe Interval: {self.probe_interval} seconds")
print(f" Failure Rate: {self.probe_failrate}%")
print(f" Authentication: {'Disabled' if self.skip_auth else 'Enabled'}")
print()
def authenticate(self) -> bool:
"""Authenticate with the API and get a token"""
if self.skip_auth:
print("🔓 Authentication skipped (SKIP_AUTH=true)")
return True
try:
login_data = {
"username": self.username,
"password": self.password
}
response = self.session.post(
f"{self.api_base_url}/users/login",
json=login_data,
verify=False,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('success') and 'data' in data and 'token' in data['data']:
self.auth_token = data['data']['token']
self.session.headers.update({
'Authorization': f'Bearer {self.auth_token}'
})
print(f"✅ Authentication successful")
return True
else:
print(f"❌ Invalid login response format: {data}")
return False
else:
print(f"❌ Authentication failed: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except Exception as e:
print(f"❌ Authentication error: {e}")
return False
def fetch_devices(self) -> bool:
"""Fetch all devices from the API"""
try:
response = self.session.get(
f"{self.api_base_url}/devices",
verify=False,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('success') and 'data' in data:
self.devices = data['data']
print(f"📡 Fetched {len(self.devices)} devices for health monitoring")
for device in self.devices:
print(f" - Device {device['id']}: {device.get('name', 'Unnamed')} ({device.get('location_description', 'No location')})")
return True
else:
print(f"❌ API returned invalid data structure: {data}")
return False
else:
print(f"❌ Failed to fetch devices: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except Exception as e:
print(f"❌ Error fetching devices: {e}")
return False
def send_heartbeat(self, device: Dict) -> bool:
"""Send heartbeat for a specific device"""
device_id = device['id']
# Simulate probe failure based on failure rate
if random.randint(1, 100) <= self.probe_failrate:
print(f"💔 Simulating probe failure for Device {device_id} ({self.probe_failrate}% failure rate)")
return False
try:
# ONLY send the exact format requested: {type:"heartbeat", key:"device_id"}
payload = {
'type': 'heartbeat',
'key': str(device_id) # Use device_id as the key
}
print(f"📡 Sending heartbeat: {payload}")
response = self.session.post(
f"{self.api_base_url}/detectors", # Updated to use detectors endpoint
json=payload,
verify=False,
timeout=10
)
if response.status_code in [200, 201]:
device_name = device.get('name', f'Device {device_id}')
location = device.get('location_description', 'Unknown location')
print(f"💓 Device {device_id}: Heartbeat sent successfully ({device_name} - {location})")
return True
else:
print(f"❌ Failed to send heartbeat for Device {device_id}: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.Timeout:
print(f"⏰ Heartbeat timeout for Device {device_id}")
return False
except Exception as e:
print(f"❌ Error sending heartbeat for Device {device_id}: {e}")
return False
def run_probe_cycle(self):
"""Run one complete probe cycle for all devices"""
if not self.devices:
print("⚠️ No devices to probe, fetching device list...")
if not self.fetch_devices():
print("❌ Cannot continue without device list")
return
print(f"\n🔄 Starting probe cycle at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Probing {len(self.devices)} devices...")
successful_probes = 0
failed_probes = 0
for device in self.devices:
if self.send_heartbeat(device):
successful_probes += 1
else:
failed_probes += 1
# Small delay between probes to avoid overwhelming the server
time.sleep(0.5)
success_rate = (successful_probes / len(self.devices)) * 100 if self.devices else 0
print(f"\n📊 Probe Cycle Summary:")
print(f" ✅ Successful: {successful_probes}")
print(f" ❌ Failed: {failed_probes}")
print(f" 📈 Success Rate: {success_rate:.1f}%")
print(f" 🎯 Target Failure Rate: {self.probe_failrate}%")
print(f" ⏰ Next cycle in {self.probe_interval} seconds")
def run_continuous(self):
"""Run continuous health check probes"""
print("🚀 Starting continuous health probe simulator...")
print(" Press Ctrl+C to stop")
print()
# Authenticate first
if not self.authenticate():
print("❌ Failed to authenticate, exiting")
return
# Initial device fetch
if not self.fetch_devices():
print("❌ Failed to fetch initial device list, exiting")
return
try:
while True:
self.run_probe_cycle()
# Re-fetch devices every 10 cycles to catch new devices
if hasattr(self, 'cycle_count'):
self.cycle_count += 1
else:
self.cycle_count = 1
if self.cycle_count % 10 == 0:
print(f"\n🔄 Refreshing device list (cycle {self.cycle_count})...")
self.fetch_devices()
# Wait for next cycle
time.sleep(self.probe_interval)
except KeyboardInterrupt:
print(f"\n\n🛑 Health probe simulator stopped by user")
except Exception as e:
print(f"\n\n❌ Unexpected error in probe simulator: {e}")
def main():
print("=" * 60)
print("🏥 DRONE DETECTION SYSTEM - HEALTH PROBE SIMULATOR")
print("=" * 60)
print()
# Load environment variables with localhost defaults
probe_failrate = os.getenv('PROBE_FAILRATE', '30')
probe_interval = os.getenv('PROBE_INTERVAL_SECONDS', '60')
api_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
base_path = os.getenv('VITE_BASE_PATH', '')
skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true'
print(f"📋 Configuration:")
print(f" PROBE_FAILRATE: {probe_failrate}%")
print(f" PROBE_INTERVAL_SECONDS: {probe_interval}s")
print(f" API_BASE_URL: {api_url}")
print(f" VITE_BASE_PATH: {base_path}")
print(f" SKIP_AUTH: {skip_auth}")
print()
simulator = DeviceProbeSimulator()
simulator.run_continuous()
if __name__ == "__main__":
main()