261 lines
10 KiB
Python
261 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Health Check Probe Simulator
|
|
Continuously sends heartbeat probes from all devices to test offline detection
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import random
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from typing import List, Dict
|
|
|
|
# Disable SSL warnings for self-signed certificates
|
|
import warnings
|
|
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
|
|
|
class DeviceProbeSimulator:
|
|
def __init__(self):
|
|
# Configuration from environment variables
|
|
# Tests default to localhost:3002 for local development
|
|
self.api_base_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
|
base_path = os.getenv('VITE_BASE_PATH', '').rstrip('/')
|
|
|
|
# If BASE_PATH is set, construct the full URL
|
|
if base_path and not self.api_base_url.endswith('/api'):
|
|
# Extract domain from API_BASE_URL and add base path
|
|
domain = self.api_base_url.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
|
|
self.api_base_url = f"{domain}{base_path}/api"
|
|
|
|
self.probe_interval = int(os.getenv('PROBE_INTERVAL_SECONDS', '60')) # 1 minute default
|
|
self.probe_failrate = int(os.getenv('PROBE_FAILRATE', '30')) # 30% failure rate
|
|
|
|
# Authentication configuration - Optional for local testing
|
|
self.username = os.getenv('TEST_USERNAME', 'admin')
|
|
self.password = os.getenv('TEST_PASSWORD', 'admin123')
|
|
self.skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true'
|
|
|
|
self.devices = []
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'Content-Type': 'application/json',
|
|
'User-Agent': 'HealthProbeSimulator/1.0'
|
|
})
|
|
self.auth_token = None
|
|
|
|
print(f"🔧 Health Probe Simulator Configuration:")
|
|
print(f" API URL: {self.api_base_url}")
|
|
print(f" Probe Interval: {self.probe_interval} seconds")
|
|
print(f" Failure Rate: {self.probe_failrate}%")
|
|
print(f" Authentication: {'Disabled' if self.skip_auth else 'Enabled'}")
|
|
print()
|
|
|
|
def authenticate(self) -> bool:
|
|
"""Authenticate with the API and get a token"""
|
|
if self.skip_auth:
|
|
print("🔓 Authentication skipped (SKIP_AUTH=true)")
|
|
return True
|
|
|
|
try:
|
|
login_data = {
|
|
"username": self.username,
|
|
"password": self.password
|
|
}
|
|
|
|
response = self.session.post(
|
|
f"{self.api_base_url}/users/login",
|
|
json=login_data,
|
|
verify=False,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get('success') and 'data' in data and 'token' in data['data']:
|
|
self.auth_token = data['data']['token']
|
|
self.session.headers.update({
|
|
'Authorization': f'Bearer {self.auth_token}'
|
|
})
|
|
print(f"✅ Authentication successful")
|
|
return True
|
|
else:
|
|
print(f"❌ Invalid login response format: {data}")
|
|
return False
|
|
else:
|
|
print(f"❌ Authentication failed: HTTP {response.status_code}")
|
|
print(f" Response: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Authentication error: {e}")
|
|
return False
|
|
|
|
def fetch_devices(self) -> bool:
|
|
"""Fetch all devices from the API"""
|
|
try:
|
|
response = self.session.get(
|
|
f"{self.api_base_url}/devices",
|
|
verify=False,
|
|
timeout=10
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get('success') and 'data' in data:
|
|
self.devices = data['data']
|
|
print(f"📡 Fetched {len(self.devices)} devices for health monitoring")
|
|
for device in self.devices:
|
|
print(f" - Device {device['id']}: {device.get('name', 'Unnamed')} ({device.get('location_description', 'No location')})")
|
|
return True
|
|
else:
|
|
print(f"❌ API returned invalid data structure: {data}")
|
|
return False
|
|
else:
|
|
print(f"❌ Failed to fetch devices: HTTP {response.status_code}")
|
|
print(f" Response: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error fetching devices: {e}")
|
|
return False
|
|
|
|
def send_heartbeat(self, device: Dict) -> bool:
|
|
"""Send heartbeat for a specific device"""
|
|
device_id = device['id']
|
|
|
|
# Simulate probe failure based on failure rate
|
|
if random.randint(1, 100) <= self.probe_failrate:
|
|
print(f"💔 Simulating probe failure for Device {device_id} ({self.probe_failrate}% failure rate)")
|
|
return False
|
|
|
|
try:
|
|
# ONLY send the exact format requested: {type:"heartbeat", key:"device_id"}
|
|
payload = {
|
|
'type': 'heartbeat',
|
|
'key': str(device_id) # Use device_id as the key
|
|
}
|
|
|
|
print(f"📡 Sending heartbeat to {self.api_base_url}/detectors")
|
|
print(f" Payload: {payload}")
|
|
print(f" Headers: {dict(self.session.headers)}")
|
|
|
|
response = self.session.post(
|
|
f"{self.api_base_url}/detectors", # Sending to /detectors endpoint
|
|
json=payload,
|
|
verify=False,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
device_name = device.get('name', f'Device {device_id}')
|
|
location = device.get('location_description', 'Unknown location')
|
|
print(f"✅ Device {device_id}: Heartbeat sent successfully ({device_name} - {location})")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to send heartbeat for Device {device_id}: HTTP {response.status_code}")
|
|
print(f" Response headers: {dict(response.headers)}")
|
|
print(f" Response body: {response.text}")
|
|
return False
|
|
|
|
except requests.exceptions.Timeout:
|
|
print(f"⏰ Heartbeat timeout for Device {device_id}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error sending heartbeat for Device {device_id}: {e}")
|
|
return False
|
|
|
|
def run_probe_cycle(self):
|
|
"""Run one complete probe cycle for all devices"""
|
|
if not self.devices:
|
|
print("⚠️ No devices to probe, fetching device list...")
|
|
if not self.fetch_devices():
|
|
print("❌ Cannot continue without device list")
|
|
return
|
|
|
|
print(f"\n🔄 Starting probe cycle at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Probing {len(self.devices)} devices...")
|
|
|
|
successful_probes = 0
|
|
failed_probes = 0
|
|
|
|
for device in self.devices:
|
|
if self.send_heartbeat(device):
|
|
successful_probes += 1
|
|
else:
|
|
failed_probes += 1
|
|
|
|
# Small delay between probes to avoid overwhelming the server
|
|
time.sleep(0.5)
|
|
|
|
success_rate = (successful_probes / len(self.devices)) * 100 if self.devices else 0
|
|
print(f"\n📊 Probe Cycle Summary:")
|
|
print(f" ✅ Successful: {successful_probes}")
|
|
print(f" ❌ Failed: {failed_probes}")
|
|
print(f" 📈 Success Rate: {success_rate:.1f}%")
|
|
print(f" 🎯 Target Failure Rate: {self.probe_failrate}%")
|
|
print(f" ⏰ Next cycle in {self.probe_interval} seconds")
|
|
|
|
def run_continuous(self):
|
|
"""Run continuous health check probes"""
|
|
print("🚀 Starting continuous health probe simulator...")
|
|
print(" Press Ctrl+C to stop")
|
|
print()
|
|
|
|
# Authenticate first
|
|
if not self.authenticate():
|
|
print("❌ Failed to authenticate, exiting")
|
|
return
|
|
|
|
# Initial device fetch
|
|
if not self.fetch_devices():
|
|
print("❌ Failed to fetch initial device list, exiting")
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
self.run_probe_cycle()
|
|
|
|
# Re-fetch devices every 10 cycles to catch new devices
|
|
if hasattr(self, 'cycle_count'):
|
|
self.cycle_count += 1
|
|
else:
|
|
self.cycle_count = 1
|
|
|
|
if self.cycle_count % 10 == 0:
|
|
print(f"\n🔄 Refreshing device list (cycle {self.cycle_count})...")
|
|
self.fetch_devices()
|
|
|
|
# Wait for next cycle
|
|
time.sleep(self.probe_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"\n\n🛑 Health probe simulator stopped by user")
|
|
except Exception as e:
|
|
print(f"\n\n❌ Unexpected error in probe simulator: {e}")
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("🏥 DRONE DETECTION SYSTEM - HEALTH PROBE SIMULATOR")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# Load environment variables with localhost defaults
|
|
probe_failrate = os.getenv('PROBE_FAILRATE', '30')
|
|
probe_interval = os.getenv('PROBE_INTERVAL_SECONDS', '60')
|
|
api_url = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
|
base_path = os.getenv('VITE_BASE_PATH', '')
|
|
skip_auth = os.getenv('SKIP_AUTH', 'false').lower() == 'true'
|
|
|
|
print(f"📋 Configuration:")
|
|
print(f" PROBE_FAILRATE: {probe_failrate}%")
|
|
print(f" PROBE_INTERVAL_SECONDS: {probe_interval}s")
|
|
print(f" API_BASE_URL: {api_url}")
|
|
print(f" VITE_BASE_PATH: {base_path}")
|
|
print(f" SKIP_AUTH: {skip_auth}")
|
|
print()
|
|
|
|
simulator = DeviceProbeSimulator()
|
|
simulator.run_continuous()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|