Files
drone-detector/health_probe_simulator.py
2025-08-20 06:36:56 +02:00

183 lines
7.0 KiB
Python

#!/usr/bin/env python3
"""
Health Check Probe Simulator
Continuously sends heartbeat probes from all devices to test offline detection
"""
import requests
import time
import random
import os
import json
from datetime import datetime
from typing import List, Dict
class DeviceProbeSimulator:
def __init__(self):
self.api_base_url = os.getenv('API_BASE_URL', 'https://selfservice.cqers.com/drones/api')
self.probe_interval = int(os.getenv('PROBE_INTERVAL_SECONDS', '60')) # 1 minute default
self.probe_failrate = int(os.getenv('PROBE_FAILRATE', '30')) # 30% failure rate
self.devices = []
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'HealthProbeSimulator/1.0'
})
print(f"🔧 Health Probe Simulator Configuration:")
print(f" API URL: {self.api_base_url}")
print(f" Probe Interval: {self.probe_interval} seconds")
print(f" Failure Rate: {self.probe_failrate}%")
print()
def fetch_devices(self) -> bool:
"""Fetch all devices from the API"""
try:
response = self.session.get(f"{self.api_base_url}/devices")
if response.status_code == 200:
data = response.json()
if data.get('success') and 'data' in data:
self.devices = data['data']
print(f"📡 Fetched {len(self.devices)} devices for health monitoring")
for device in self.devices:
print(f" - Device {device['id']}: {device.get('name', 'Unnamed')} ({device.get('location_description', 'No location')})")
return True
else:
print(f"❌ API returned invalid data structure: {data}")
return False
else:
print(f"❌ Failed to fetch devices: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except Exception as e:
print(f"❌ Error fetching devices: {e}")
return False
def send_heartbeat(self, device: Dict) -> bool:
"""Send heartbeat for a specific device"""
device_id = device['id']
# Simulate probe failure based on failure rate
if random.randint(1, 100) <= self.probe_failrate:
print(f"💔 Simulating probe failure for Device {device_id} ({self.probe_failrate}% failure rate)")
return False
try:
# ONLY send the exact format requested: {type:"heartbeat", key:"device_id"}
payload = {
'type': 'heartbeat',
'key': str(device_id) # Use device_id as the key
}
print(f"📡 Sending heartbeat: {payload}")
response = self.session.post(
f"{self.api_base_url}/heartbeat", # Changed to correct endpoint
json=payload,
timeout=10
)
if response.status_code in [200, 201]:
device_name = device.get('name', f'Device {device_id}')
location = device.get('location_description', 'Unknown location')
print(f"💓 Device {device_id}: Heartbeat sent successfully ({device_name} - {location})")
return True
else:
print(f"❌ Failed to send heartbeat for Device {device_id}: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.Timeout:
print(f"⏰ Heartbeat timeout for Device {device_id}")
return False
except Exception as e:
print(f"❌ Error sending heartbeat for Device {device_id}: {e}")
return False
def run_probe_cycle(self):
"""Run one complete probe cycle for all devices"""
if not self.devices:
print("⚠️ No devices to probe, fetching device list...")
if not self.fetch_devices():
print("❌ Cannot continue without device list")
return
print(f"\n🔄 Starting probe cycle at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Probing {len(self.devices)} devices...")
successful_probes = 0
failed_probes = 0
for device in self.devices:
if self.send_heartbeat(device):
successful_probes += 1
else:
failed_probes += 1
# Small delay between probes to avoid overwhelming the server
time.sleep(0.5)
success_rate = (successful_probes / len(self.devices)) * 100 if self.devices else 0
print(f"\n📊 Probe Cycle Summary:")
print(f" ✅ Successful: {successful_probes}")
print(f" ❌ Failed: {failed_probes}")
print(f" 📈 Success Rate: {success_rate:.1f}%")
print(f" 🎯 Target Failure Rate: {self.probe_failrate}%")
print(f" ⏰ Next cycle in {self.probe_interval} seconds")
def run_continuous(self):
"""Run continuous health check probes"""
print("🚀 Starting continuous health probe simulator...")
print(" Press Ctrl+C to stop")
print()
# Initial device fetch
if not self.fetch_devices():
print("❌ Failed to fetch initial device list, exiting")
return
try:
while True:
self.run_probe_cycle()
# Re-fetch devices every 10 cycles to catch new devices
if hasattr(self, 'cycle_count'):
self.cycle_count += 1
else:
self.cycle_count = 1
if self.cycle_count % 10 == 0:
print(f"\n🔄 Refreshing device list (cycle {self.cycle_count})...")
self.fetch_devices()
# Wait for next cycle
time.sleep(self.probe_interval)
except KeyboardInterrupt:
print(f"\n\n🛑 Health probe simulator stopped by user")
except Exception as e:
print(f"\n\n❌ Unexpected error in probe simulator: {e}")
def main():
print("=" * 60)
print("🏥 DRONE DETECTION SYSTEM - HEALTH PROBE SIMULATOR")
print("=" * 60)
print()
# Load environment variables
probe_failrate = os.getenv('PROBE_FAILRATE', '30')
probe_interval = os.getenv('PROBE_INTERVAL_SECONDS', '60')
api_url = os.getenv('API_BASE_URL', 'https://selfservice.cqers.com/drones/api')
print(f"📋 Configuration:")
print(f" PROBE_FAILRATE: {probe_failrate}%")
print(f" PROBE_INTERVAL_SECONDS: {probe_interval}s")
print(f" API_BASE_URL: {api_url}")
print()
simulator = DeviceProbeSimulator()
simulator.run_continuous()
if __name__ == "__main__":
main()