#!/usr/bin/env python3 """ Health Check Probe Simulator Continuously sends heartbeat probes from all devices to test offline detection """ import requests import time import random import os import json from datetime import datetime from typing import List, Dict # Disable SSL warnings for self-signed certificates import warnings warnings.filterwarnings('ignore', message='Unverified HTTPS request') class DeviceProbeSimulator: def __init__(self): # Configuration from environment variables # Tests default to localhost:3002 for local development self.api_base_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api') base_path = os.getenv('VITE_BASE_PATH', '').rstrip('/') # If BASE_PATH is set, construct the full URL if base_path and not self.api_base_url.endswith('/api'): # Extract domain from API_BASE_URL and add base path domain = self.api_base_url.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '') self.api_base_url = f"{domain}{base_path}/api" self.probe_interval = int(os.getenv('PROBE_INTERVAL_SECONDS', '60')) # 1 minute default self.probe_failrate = int(os.getenv('PROBE_FAILRATE', '30')) # 30% failure rate # Authentication configuration - Optional for local testing self.username = os.getenv('TEST_USERNAME', 'admin') self.password = os.getenv('TEST_PASSWORD', 'admin123') self.skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true' self.devices = [] self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'User-Agent': 'HealthProbeSimulator/1.0' }) self.auth_token = None print(f"šŸ”§ Health Probe Simulator Configuration:") print(f" API URL: {self.api_base_url}") print(f" Probe Interval: {self.probe_interval} seconds") print(f" Failure Rate: {self.probe_failrate}%") print(f" Authentication: {'Disabled' if self.skip_auth else 'Enabled'}") print() def authenticate(self) -> bool: """Authenticate with the API and get a token""" if self.skip_auth: print("šŸ”“ Authentication skipped (SKIP_AUTH=true)") return True try: login_data = { "username": self.username, "password": self.password } response = self.session.post( f"{self.api_base_url}/users/login", json=login_data, verify=False, timeout=10 ) if response.status_code == 200: data = response.json() if data.get('success') and 'data' in data and 'token' in data['data']: self.auth_token = data['data']['token'] self.session.headers.update({ 'Authorization': f'Bearer {self.auth_token}' }) print(f"āœ… Authentication successful") return True else: print(f"āŒ Invalid login response format: {data}") return False else: print(f"āŒ Authentication failed: HTTP {response.status_code}") print(f" Response: {response.text}") return False except Exception as e: print(f"āŒ Authentication error: {e}") return False def fetch_devices(self) -> bool: """Fetch all devices from the API""" try: response = self.session.get( f"{self.api_base_url}/devices", verify=False, timeout=10 ) if response.status_code == 200: data = response.json() if data.get('success') and 'data' in data: self.devices = data['data'] print(f"šŸ“” Fetched {len(self.devices)} devices for health monitoring") for device in self.devices: print(f" - Device {device['id']}: {device.get('name', 'Unnamed')} ({device.get('location_description', 'No location')})") return True else: print(f"āŒ API returned invalid data structure: {data}") return False else: print(f"āŒ Failed to fetch devices: HTTP {response.status_code}") print(f" Response: {response.text}") return False except Exception as e: print(f"āŒ Error fetching devices: {e}") return False def send_heartbeat(self, device: Dict) -> bool: """Send heartbeat for a specific device""" device_id = device['id'] # Simulate probe failure based on failure rate if random.randint(1, 100) <= self.probe_failrate: print(f"šŸ’” Simulating probe failure for Device {device_id} ({self.probe_failrate}% failure rate)") return False try: # ONLY send the exact format requested: {type:"heartbeat", key:"device_id"} payload = { 'type': 'heartbeat', 'key': str(device_id) # Use device_id as the key } print(f"šŸ“” Sending heartbeat to {self.api_base_url}/detectors") print(f" Payload: {payload}") print(f" Headers: {dict(self.session.headers)}") response = self.session.post( f"{self.api_base_url}/detectors", # Sending to /detectors endpoint json=payload, verify=False, timeout=10 ) if response.status_code in [200, 201]: device_name = device.get('name', f'Device {device_id}') location = device.get('location_description', 'Unknown location') print(f"āœ… Device {device_id}: Heartbeat sent successfully ({device_name} - {location})") return True else: print(f"āŒ Failed to send heartbeat for Device {device_id}: HTTP {response.status_code}") print(f" Response headers: {dict(response.headers)}") print(f" Response body: {response.text}") return False except requests.exceptions.Timeout: print(f"ā° Heartbeat timeout for Device {device_id}") return False except Exception as e: print(f"āŒ Error sending heartbeat for Device {device_id}: {e}") return False def run_probe_cycle(self): """Run one complete probe cycle for all devices""" if not self.devices: print("āš ļø No devices to probe, fetching device list...") if not self.fetch_devices(): print("āŒ Cannot continue without device list") return print(f"\nšŸ”„ Starting probe cycle at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" Probing {len(self.devices)} devices...") successful_probes = 0 failed_probes = 0 for device in self.devices: if self.send_heartbeat(device): successful_probes += 1 else: failed_probes += 1 # Small delay between probes to avoid overwhelming the server time.sleep(0.5) success_rate = (successful_probes / len(self.devices)) * 100 if self.devices else 0 print(f"\nšŸ“Š Probe Cycle Summary:") print(f" āœ… Successful: {successful_probes}") print(f" āŒ Failed: {failed_probes}") print(f" šŸ“ˆ Success Rate: {success_rate:.1f}%") print(f" šŸŽÆ Target Failure Rate: {self.probe_failrate}%") print(f" ā° Next cycle in {self.probe_interval} seconds") def run_continuous(self): """Run continuous health check probes""" print("šŸš€ Starting continuous health probe simulator...") print(" Press Ctrl+C to stop") print() # Authenticate first if not self.authenticate(): print("āŒ Failed to authenticate, exiting") return # Initial device fetch if not self.fetch_devices(): print("āŒ Failed to fetch initial device list, exiting") return try: while True: self.run_probe_cycle() # Re-fetch devices every 10 cycles to catch new devices if hasattr(self, 'cycle_count'): self.cycle_count += 1 else: self.cycle_count = 1 if self.cycle_count % 10 == 0: print(f"\nšŸ”„ Refreshing device list (cycle {self.cycle_count})...") self.fetch_devices() # Wait for next cycle time.sleep(self.probe_interval) except KeyboardInterrupt: print(f"\n\nšŸ›‘ Health probe simulator stopped by user") except Exception as e: print(f"\n\nāŒ Unexpected error in probe simulator: {e}") def main(): print("=" * 60) print("šŸ„ DRONE DETECTION SYSTEM - HEALTH PROBE SIMULATOR") print("=" * 60) print() # Load environment variables with localhost defaults probe_failrate = os.getenv('PROBE_FAILRATE', '30') probe_interval = os.getenv('PROBE_INTERVAL_SECONDS', '60') api_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api') base_path = os.getenv('VITE_BASE_PATH', '') skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true' print(f"šŸ“‹ Configuration:") print(f" PROBE_FAILRATE: {probe_failrate}%") print(f" PROBE_INTERVAL_SECONDS: {probe_interval}s") print(f" API_BASE_URL: {api_url}") print(f" VITE_BASE_PATH: {base_path}") print(f" SKIP_AUTH: {skip_auth}") print() simulator = DeviceProbeSimulator() simulator.run_continuous() if __name__ == "__main__": main()