Files
drone-detector/test_orlan_detection.py
2025-09-07 12:21:17 +02:00

435 lines
16 KiB
Python
Raw Blame History

#!/usr/bin/env python3
"""
Orlan Detection Test Script
Tests the critical alert system for Orlan military drones by simulating
a long-distance approach from undetectable range to directly overhead.
Test Scenario:
- Starts 50km away (beyond detection range)
- Slowly approaches target device
- Triggers critical alerts as print(f"🛫 {drone_name} Starting Distance: {float(start_distance):.1f}km from device")
print(f"📍 Device Location: {target_lat:.6f}, {target_lon:.6f}")
# Verify starting position is undetectable
if is_detectable(start_distance):
print("⚠️ WARNING: Starting position is within detection range!")
else:
print("✅ Starting position confirmed undetectable")
print("\n" + "=" * 70)
print(f"STARTING {drone_name.upper()} APPROACH SIMULATION")
print("=" * 70)etection range
- Ends with drone hovering directly above target
"""
import requests
import json
import time
import math
import os
from datetime import datetime
# Disable SSL warnings for self-signed certificates
import warnings
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
# Configuration from environment variables
# Tests default to localhost:3002 for local development
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
BASE_PATH = os.getenv('VITE_BASE_PATH', '').rstrip('/')
# Authentication configuration - Optional for local testing
USERNAME = os.getenv('TEST_USERNAME', 'admin')
PASSWORD = os.getenv('TEST_PASSWORD', 'admin123')
SKIP_AUTH = os.getenv('SKIP_AUTH', 'false').lower() == 'true' # Set to 'true' to skip authentication
# If BASE_PATH is set, construct the full URL
if BASE_PATH and not API_BASE_URL.endswith('/api'):
# Extract domain from API_BASE_URL and add base path
domain = API_BASE_URL.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
API_BASE_URL = f"{domain}{BASE_PATH}/api"
print(f"🔗 Using API Base URL: {API_BASE_URL}")
# Global variable to store authentication token
AUTH_TOKEN = None
def authenticate():
"""Authenticate with the API and get access token"""
global AUTH_TOKEN
login_data = {
"username": USERNAME,
"password": PASSWORD
}
try:
print(f"🔐 Authenticating as user: {USERNAME}")
response = requests.post(f"{API_BASE_URL}/users/login", json=login_data, verify=False)
if response.status_code == 200:
data = response.json()
AUTH_TOKEN = data.get('data', {}).get('token')
if AUTH_TOKEN:
print("✅ Authentication successful")
return True
else:
print("❌ No token received in response")
print(f"Response data: {data}")
return False
else:
print(f"❌ Authentication failed: {response.status_code}")
print(f"Response: {response.text}")
return False
except Exception as e:
print(f"❌ Authentication error: {e}")
return False
def get_auth_headers():
"""Get headers with authentication token"""
if SKIP_AUTH:
return {"Content-Type": "application/json"}
if AUTH_TOKEN:
return {
"Authorization": f"Bearer {AUTH_TOKEN}",
"Content-Type": "application/json"
}
return {"Content-Type": "application/json"}
# Detection range parameters (approximate)
MAX_DETECTION_RANGE_KM = 25.0 # Maximum range for drone detection
MIN_RSSI_THRESHOLD = -95 # Minimum RSSI for detection
# Drone types mapping (matches GuessedDroneType enum)
DRONE_TYPES = {
0: "None",
1: "Unknown",
2: "Orlan",
3: "Zala",
4: "Eleron",
5: "ZalaLancet",
6: "Lancet",
7: "FPV_CrossFire",
8: "FPV_ELRS",
9: "MaybeOrlan",
10: "MaybeZala",
11: "MaybeLancet",
12: "MaybeEleron",
13: "DJI",
14: "Supercam",
15: "MaybeSupercam",
16: "REB",
17: "CryptoOrlan",
18: "DJIe"
}
def select_drone_type():
"""Allow user to select drone type for simulation"""
print("\n" + "=" * 50)
print("🚁 DRONE TYPE SELECTION")
print("=" * 50)
print("Available drone types:")
# Group by category for better display
military_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["Orlan", "Zala", "Eleron", "ZalaLancet", "Lancet", "CryptoOrlan"]]
maybe_types = [(k, v) for k, v in DRONE_TYPES.items() if v.startswith("Maybe")]
fpv_types = [(k, v) for k, v in DRONE_TYPES.items() if v.startswith("FPV_")]
commercial_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["DJI", "DJIe", "Supercam", "MaybeSupercam"]]
other_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["None", "Unknown", "REB"]]
print("\n🎖️ MILITARY DRONES:")
for drone_id, name in military_types:
print(f" {drone_id:2d}: {name}")
print("\n🏢 COMMERCIAL DRONES:")
for drone_id, name in commercial_types:
print(f" {drone_id:2d}: {name}")
print("\n🎮 FPV DRONES:")
for drone_id, name in fpv_types:
print(f" {drone_id:2d}: {name}")
print("\n❓ UNCERTAIN TYPES:")
for drone_id, name in maybe_types:
print(f" {drone_id:2d}: {name}")
print("\n🔧 OTHER:")
for drone_id, name in other_types:
print(f" {drone_id:2d}: {name}")
print("\n" + "=" * 50)
while True:
try:
choice = input(f"Select drone type (0-{max(DRONE_TYPES.keys())}) [default: 2-Orlan]: ").strip()
if choice == "":
return 2 # Default to Orlan
drone_type = int(choice)
if drone_type in DRONE_TYPES:
print(f"✅ Selected: {DRONE_TYPES[drone_type]} (Type {drone_type})")
return drone_type
else:
print(f"❌ Invalid choice. Please select 0-{max(DRONE_TYPES.keys())}")
except ValueError:
print("❌ Please enter a valid number")
except KeyboardInterrupt:
print("\n⚠️ Using default: Orlan (Type 2)")
return 2
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate distance between two points in kilometers"""
R = 6371 # Earth's radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
def rssi_from_distance(distance_km):
"""Calculate RSSI based on distance for Orlan drone"""
# Ensure distance_km is a float
distance_km = float(distance_km)
# Orlan drones have powerful military-grade transmission systems
# Base RSSI at 1km = -55 dBm (stronger than civilian drones)
base_rssi = -55
path_loss_exponent = 2.3 # Lower path loss due to military equipment
if distance_km < 0.001: # Less than 1 meter
return -25
rssi = base_rssi - (20 * path_loss_exponent * math.log10(distance_km))
return max(rssi, -100) # Cap at -100 dBm
def is_detectable(distance_km):
"""Check if drone is within detectable range"""
rssi = rssi_from_distance(distance_km)
return distance_km <= MAX_DETECTION_RANGE_KM and rssi >= MIN_RSSI_THRESHOLD
def fetch_devices():
"""Fetch devices from API"""
try:
if SKIP_AUTH:
# Try without authentication first for local testing
response = requests.get(f"{API_BASE_URL}/devices/map", verify=False)
else:
response = requests.get(f"{API_BASE_URL}/devices", headers=get_auth_headers(), verify=False)
if response.status_code == 200:
data = response.json()
return data.get('data', [])
else:
print(f"Failed to fetch devices: {response.status_code}")
print(f"Response: {response.text}")
except Exception as e:
print(f"Error fetching devices: {e}")
return []
def send_detection(device, distance_km, step, total_steps, drone_type=2):
"""Send a detection to the API using EXACT standard payload format"""
# Ensure distance_km is a float
distance_km = float(distance_km)
rssi = rssi_from_distance(distance_km)
# Use the device's coordinates as the detection location
# The RSSI indicates how close/far the drone is from this device
detection_data = {
"device_id": device["id"],
"geo_lat": float(device["geo_lat"]), # Device location
"geo_lon": float(device["geo_lon"]), # Device location
"device_timestamp": int(time.time() * 1000), # Current timestamp in milliseconds
"drone_type": drone_type, # Selected drone type
"rssi": int(rssi), # RSSI indicates distance from device
"freq": 24, # Most drones operate on 2.4 GHz (24 = 2400 MHz)
"drone_id": 2000 # Consistent drone ID to track the same drone
}
try:
response = requests.post(f"{API_BASE_URL}/detectors", json=detection_data, verify=False)
if response.status_code == 201:
status = "🚨 CRITICAL ALERT" if distance_km <= 5 else "⚠️ DETECTED" if is_detectable(distance_km) else "📡 MONITORING"
print(f"{status} - Step {step}/{total_steps}: Distance={float(distance_km):.1f}km, RSSI={float(rssi):.0f}dBm")
return True
else:
print(f"❌ Failed to send detection: {response.status_code}")
print(f"Response: {response.text}")
return False
except Exception as e:
print(f"❌ Error sending detection: {e}")
return False
def run_orlan_detection_test():
"""Run the comprehensive drone detection test"""
print("=" * 70)
print("<EFBFBD> DRONE DETECTION SIMULATION TEST")
print("=" * 70)
print("Test Scenario:")
print("• Starting position: 50km away (undetectable)")
print("• Approach pattern: Gradual approach to device")
print("• Detection threshold: ~25km range")
print("• Critical alerts: <5km from device")
print("• End position: Directly overhead (0m)")
print("=" * 70)
print(f"🔗 API Endpoint: {API_BASE_URL}")
if SKIP_AUTH:
print("⚠️ AUTHENTICATION DISABLED - Running in local test mode")
print()
# Select drone type for simulation
selected_drone_type = select_drone_type()
drone_name = DRONE_TYPES[selected_drone_type]
# Authenticate first (unless skipped)
if not SKIP_AUTH and not authenticate():
print("❌ Authentication failed. Cannot proceed with test.")
print("Please check:")
print("1. Is the server running?")
print("2. Are the credentials correct?")
print(f" Username: {USERNAME}")
print("3. Set TEST_USERNAME and TEST_PASSWORD environment variables if needed")
print("4. Or set SKIP_AUTH=true to skip authentication for local testing")
return
# Test API connectivity first
print("🔍 Testing API connectivity...")
try:
response = requests.get(f"{API_BASE_URL}/health")
if response.status_code == 200:
print("✅ API is accessible")
else:
print(f"❌ API health check failed: {response.status_code}")
return
except Exception as e:
print(f"❌ Cannot connect to API: {e}")
print("Please check:")
print("1. Is the server running?")
print("2. Is the API_BASE_URL correct?")
print(f" Current: {API_BASE_URL}")
return
# Fetch devices
print("📡 Fetching devices...")
devices = fetch_devices()
if not devices:
print("❌ No devices found!")
print("Make sure at least one device is registered in the system.")
return
print(f"✅ Found {len(devices)} device(s)")
# Use the first device as target
target_device = devices[0]
print(f"🎯 Target Device: {target_device['name']}")
# Ensure coordinates are floats
target_lat = float(target_device['geo_lat'])
target_lon = float(target_device['geo_lon'])
print(f"📍 Target Location: {target_lat:.6f}, {target_lon:.6f}")
# Calculate starting position 50km away for distance simulation
start_distance = 50.0 # km
print(f"🛫 Orlan Starting Distance: {float(start_distance):.1f}km from device")
print(f"<EFBFBD> Device Location: {target_lat:.6f}, {target_lon:.6f}")
# Verify starting position is undetectable
if is_detectable(start_distance):
print("⚠️ WARNING: Starting position is within detection range!")
else:
print("✅ Starting position confirmed undetectable")
print("\n" + "=" * 70)
print("STARTING ORLAN APPROACH SIMULATION")
print("=" * 70)
# Simulation parameters
total_steps = 50 # More steps for gradual approach
final_distance = 0.0 # Directly overhead
detection_started = False
critical_alerts_started = False
for step in range(1, total_steps + 1):
# Calculate current distance using exponential approach for realistic acceleration
progress = step / total_steps
# Use exponential curve for realistic approach - slower at distance, faster when close
distance_km = start_distance * (1 - progress) ** 1.8 + final_distance * progress ** 1.8
# Check detection status
detectable = is_detectable(distance_km)
# Send detection only if within detectable range
if detectable:
if not detection_started:
print(f"\n🔍 FIRST DETECTION at {float(distance_km):.1f}km - {drone_name} has entered detection range!")
detection_started = True
success = send_detection(target_device, distance_km, step, total_steps, selected_drone_type)
if not success:
print(f"❌ Failed to send detection at step {step}")
continue
# Show escalation messages
if distance_km <= 5 and not critical_alerts_started:
print(f"🚨 CRITICAL ALERT THRESHOLD REACHED at {float(distance_km):.1f}km!")
critical_alerts_started = True
elif distance_km <= 1:
print(f"🔥 IMMEDIATE THREAT: {drone_name} within {float(distance_km):.1f}km!")
elif distance_km <= 0.1:
print(f"💥 DIRECTLY OVERHEAD: {drone_name} at {float(distance_km)*1000:.0f}m altitude!")
else:
# Outside detection range
print(f"📡 Step {step}/{total_steps}: Distance={float(distance_km):.1f}km (undetectable, RSSI={float(rssi_from_distance(distance_km)):.0f}dBm)")
# Variable delay based on distance
if distance_km > 30:
delay = 2.0 # 2 seconds for very long range
elif distance_km > 15:
delay = 1.5 # 1.5 seconds for long range
elif distance_km > 5:
delay = 1.0 # 1 second for medium range
else:
delay = 0.8 # Faster updates for critical proximity
time.sleep(delay)
print("\n" + "=" * 70)
print(f"<EFBFBD> {drone_name.upper()} DETECTION TEST COMPLETED")
print("=" * 70)
print("Test Summary:")
print(f"• Starting distance: {float(start_distance):.1f}km (undetectable)")
print(f"• Detection range entered: ~{float(MAX_DETECTION_RANGE_KM):.1f}km")
print(f"• Critical alerts triggered: <5km")
print(f"• Final position: Directly overhead")
print(f"• Target device: {target_device['name']}")
print(f"• Drone type tested: {drone_name} (Type {selected_drone_type})")
print(f"• Total simulation steps: {total_steps}")
print("")
print("Expected Results:")
print("✅ No detections sent while >25km away")
print("✅ First detection when entering ~25km range")
print(f"✅ Critical alerts triggered for {drone_name} type (drone_type={selected_drone_type})")
print("✅ All alerts escalated regardless of distance/RSSI")
print("✅ Real-time tracking visible on dashboard")
print("=" * 70)
if __name__ == "__main__":
try:
run_orlan_detection_test()
except KeyboardInterrupt:
print("\n\n⚠️ Drone detection test interrupted by user")
except Exception as e:
print(f"\n❌ Error during drone detection test: {e}")