#!/usr/bin/env python3 """ Health Check Probe Simulator Continuously sends heartbeat probes from all devices to test offline detection """ import requests import time import random import os import json from datetime import datetime from typing import List, Dict class DeviceProbeSimulator: def __init__(self): self.api_base_url = os.getenv('API_BASE_URL', 'https://selfservice.cqers.com/drones/api') self.probe_interval = int(os.getenv('PROBE_INTERVAL_SECONDS', '60')) # 1 minute default self.probe_failrate = int(os.getenv('PROBE_FAILRATE', '30')) # 30% failure rate self.devices = [] self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'User-Agent': 'HealthProbeSimulator/1.0' }) print(f"šŸ”§ Health Probe Simulator Configuration:") print(f" API URL: {self.api_base_url}") print(f" Probe Interval: {self.probe_interval} seconds") print(f" Failure Rate: {self.probe_failrate}%") print() def fetch_devices(self) -> bool: """Fetch all devices from the API""" try: response = self.session.get(f"{self.api_base_url}/devices") if response.status_code == 200: data = response.json() if data.get('success') and 'data' in data: self.devices = data['data'] print(f"šŸ“” Fetched {len(self.devices)} devices for health monitoring") for device in self.devices: print(f" - Device {device['id']}: {device.get('name', 'Unnamed')} ({device.get('location_description', 'No location')})") return True else: print(f"āŒ API returned invalid data structure: {data}") return False else: print(f"āŒ Failed to fetch devices: HTTP {response.status_code}") print(f" Response: {response.text}") return False except Exception as e: print(f"āŒ Error fetching devices: {e}") return False def send_heartbeat(self, device: Dict) -> bool: """Send heartbeat for a specific device""" device_id = device['id'] # Simulate probe failure based on failure rate if random.randint(1, 100) <= self.probe_failrate: print(f"šŸ’” Simulating probe failure for Device {device_id} ({self.probe_failrate}% failure rate)") return False try: payload = { 'device_id': device_id, 'timestamp': int(time.time()), 'status': 'online', 'system_info': { 'cpu_usage': random.randint(20, 80), 'memory_usage': random.randint(30, 90), 'disk_usage': random.randint(40, 85), 'temperature': random.randint(25, 65), 'uptime': random.randint(3600, 86400) # 1 hour to 1 day } } response = self.session.post( f"{self.api_base_url}/devices/heartbeat", json=payload, timeout=10 ) if response.status_code in [200, 201]: device_name = device.get('name', f'Device {device_id}') location = device.get('location_description', 'Unknown location') print(f"šŸ’“ Device {device_id}: Heartbeat sent successfully ({device_name} - {location})") return True else: print(f"āŒ Failed to send heartbeat for Device {device_id}: HTTP {response.status_code}") print(f" Response: {response.text}") return False except requests.exceptions.Timeout: print(f"ā° Heartbeat timeout for Device {device_id}") return False except Exception as e: print(f"āŒ Error sending heartbeat for Device {device_id}: {e}") return False def run_probe_cycle(self): """Run one complete probe cycle for all devices""" if not self.devices: print("āš ļø No devices to probe, fetching device list...") if not self.fetch_devices(): print("āŒ Cannot continue without device list") return print(f"\nšŸ”„ Starting probe cycle at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" Probing {len(self.devices)} devices...") successful_probes = 0 failed_probes = 0 for device in self.devices: if self.send_heartbeat(device): successful_probes += 1 else: failed_probes += 1 # Small delay between probes to avoid overwhelming the server time.sleep(0.5) success_rate = (successful_probes / len(self.devices)) * 100 if self.devices else 0 print(f"\nšŸ“Š Probe Cycle Summary:") print(f" āœ… Successful: {successful_probes}") print(f" āŒ Failed: {failed_probes}") print(f" šŸ“ˆ Success Rate: {success_rate:.1f}%") print(f" šŸŽÆ Target Failure Rate: {self.probe_failrate}%") print(f" ā° Next cycle in {self.probe_interval} seconds") def run_continuous(self): """Run continuous health check probes""" print("šŸš€ Starting continuous health probe simulator...") print(" Press Ctrl+C to stop") print() # Initial device fetch if not self.fetch_devices(): print("āŒ Failed to fetch initial device list, exiting") return try: while True: self.run_probe_cycle() # Re-fetch devices every 10 cycles to catch new devices if hasattr(self, 'cycle_count'): self.cycle_count += 1 else: self.cycle_count = 1 if self.cycle_count % 10 == 0: print(f"\nšŸ”„ Refreshing device list (cycle {self.cycle_count})...") self.fetch_devices() # Wait for next cycle time.sleep(self.probe_interval) except KeyboardInterrupt: print(f"\n\nšŸ›‘ Health probe simulator stopped by user") except Exception as e: print(f"\n\nāŒ Unexpected error in probe simulator: {e}") def main(): print("=" * 60) print("šŸ„ DRONE DETECTION SYSTEM - HEALTH PROBE SIMULATOR") print("=" * 60) print() # Load environment variables probe_failrate = os.getenv('PROBE_FAILRATE', '30') probe_interval = os.getenv('PROBE_INTERVAL_SECONDS', '60') api_url = os.getenv('API_BASE_URL', 'https://selfservice.cqers.com/drones/api') print(f"šŸ“‹ Configuration:") print(f" PROBE_FAILRATE: {probe_failrate}%") print(f" PROBE_INTERVAL_SECONDS: {probe_interval}s") print(f" API_BASE_URL: {api_url}") print() simulator = DeviceProbeSimulator() simulator.run_continuous() if __name__ == "__main__": main()