356 lines
13 KiB
Python
356 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Orlan Detection Test Script
|
|
Tests the critical alert system for Orlan military drones by simulating
|
|
a long-distance approach from undetectable range to directly overhead.
|
|
|
|
Test Scenario:
|
|
- Starts 50km away (beyond detection range)
|
|
- Slowly approaches target device
|
|
- Triggers critical alerts as it enters detection range
|
|
- Ends with drone hovering directly above target
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import math
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# Disable SSL warnings for self-signed certificates
|
|
import warnings
|
|
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
|
|
|
# Configuration from environment variables
|
|
# Tests default to localhost:3002 for local development
|
|
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
|
BASE_PATH = os.getenv('VITE_BASE_PATH', '').rstrip('/')
|
|
|
|
# Authentication configuration - Optional for local testing
|
|
USERNAME = os.getenv('TEST_USERNAME', 'admin')
|
|
PASSWORD = os.getenv('TEST_PASSWORD', 'admin123')
|
|
SKIP_AUTH = os.getenv('SKIP_AUTH', 'false').lower() == 'true' # Set to 'true' to skip authentication
|
|
|
|
# If BASE_PATH is set, construct the full URL
|
|
if BASE_PATH and not API_BASE_URL.endswith('/api'):
|
|
# Extract domain from API_BASE_URL and add base path
|
|
domain = API_BASE_URL.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
|
|
API_BASE_URL = f"{domain}{BASE_PATH}/api"
|
|
|
|
print(f"🔗 Using API Base URL: {API_BASE_URL}")
|
|
|
|
# Global variable to store authentication token
|
|
AUTH_TOKEN = None
|
|
|
|
def authenticate():
|
|
"""Authenticate with the API and get access token"""
|
|
global AUTH_TOKEN
|
|
|
|
login_data = {
|
|
"username": USERNAME,
|
|
"password": PASSWORD
|
|
}
|
|
|
|
try:
|
|
print(f"🔐 Authenticating as user: {USERNAME}")
|
|
response = requests.post(f"{API_BASE_URL}/users/login", json=login_data, verify=False)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
AUTH_TOKEN = data.get('data', {}).get('token')
|
|
if AUTH_TOKEN:
|
|
print("✅ Authentication successful")
|
|
return True
|
|
else:
|
|
print("❌ No token received in response")
|
|
print(f"Response data: {data}")
|
|
return False
|
|
else:
|
|
print(f"❌ Authentication failed: {response.status_code}")
|
|
print(f"Response: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Authentication error: {e}")
|
|
return False
|
|
|
|
def get_auth_headers():
|
|
"""Get headers with authentication token"""
|
|
if SKIP_AUTH:
|
|
return {"Content-Type": "application/json"}
|
|
|
|
if AUTH_TOKEN:
|
|
return {
|
|
"Authorization": f"Bearer {AUTH_TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
return {"Content-Type": "application/json"}
|
|
|
|
# Detection range parameters (approximate)
|
|
MAX_DETECTION_RANGE_KM = 25.0 # Maximum range for drone detection
|
|
MIN_RSSI_THRESHOLD = -95 # Minimum RSSI for detection
|
|
|
|
def haversine_distance(lat1, lon1, lat2, lon2):
|
|
"""Calculate distance between two points in kilometers"""
|
|
R = 6371 # Earth's radius in kilometers
|
|
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
|
|
a = (math.sin(dlat/2) * math.sin(dlat/2) +
|
|
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
|
|
math.sin(dlon/2) * math.sin(dlon/2))
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
|
|
|
return R * c
|
|
|
|
def rssi_from_distance(distance_km):
|
|
"""Calculate RSSI based on distance for Orlan drone"""
|
|
# Ensure distance_km is a float
|
|
distance_km = float(distance_km)
|
|
|
|
# Orlan drones have powerful military-grade transmission systems
|
|
# Base RSSI at 1km = -55 dBm (stronger than civilian drones)
|
|
base_rssi = -55
|
|
path_loss_exponent = 2.3 # Lower path loss due to military equipment
|
|
|
|
if distance_km < 0.001: # Less than 1 meter
|
|
return -25
|
|
|
|
rssi = base_rssi - (20 * path_loss_exponent * math.log10(distance_km))
|
|
return max(rssi, -100) # Cap at -100 dBm
|
|
|
|
def is_detectable(distance_km):
|
|
"""Check if drone is within detectable range"""
|
|
rssi = rssi_from_distance(distance_km)
|
|
return distance_km <= MAX_DETECTION_RANGE_KM and rssi >= MIN_RSSI_THRESHOLD
|
|
|
|
def fetch_devices():
|
|
"""Fetch devices from API"""
|
|
try:
|
|
if SKIP_AUTH:
|
|
# Try without authentication first for local testing
|
|
response = requests.get(f"{API_BASE_URL}/devices/map", verify=False)
|
|
else:
|
|
response = requests.get(f"{API_BASE_URL}/devices", headers=get_auth_headers(), verify=False)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get('data', [])
|
|
else:
|
|
print(f"Failed to fetch devices: {response.status_code}")
|
|
print(f"Response: {response.text}")
|
|
except Exception as e:
|
|
print(f"Error fetching devices: {e}")
|
|
return []
|
|
|
|
def send_detection(device, drone_lat, drone_lon, distance_km, step, total_steps):
|
|
"""Send a detection to the API using EXACT standard payload format"""
|
|
# Ensure distance_km is a float
|
|
distance_km = float(distance_km)
|
|
rssi = rssi_from_distance(distance_km)
|
|
|
|
# Use the EXACT standard payload format - NEVER change this!
|
|
detection_data = {
|
|
"device_id": device["id"],
|
|
"geo_lat": drone_lat,
|
|
"geo_lon": drone_lon,
|
|
"device_timestamp": int(time.time() * 1000), # Current timestamp in milliseconds
|
|
"drone_type": 1, # Orlan/Military type
|
|
"rssi": int(rssi),
|
|
"freq": 24, # Orlan operates on 2.4 GHz (24 = 2400 MHz)
|
|
"drone_id": 2000 + step # Unique drone ID for tracking
|
|
}
|
|
|
|
try:
|
|
response = requests.post(f"{API_BASE_URL}/detectors", json=detection_data, verify=False)
|
|
if response.status_code == 201:
|
|
status = "🚨 CRITICAL ALERT" if distance_km <= 5 else "⚠️ DETECTED" if is_detectable(distance_km) else "📡 MONITORING"
|
|
print(f"{status} - Step {step}/{total_steps}: Distance={float(distance_km):.1f}km, RSSI={float(rssi):.0f}dBm")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to send detection: {response.status_code}")
|
|
print(f"Response: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error sending detection: {e}")
|
|
return False
|
|
|
|
def calculate_position_along_path(start_lat, start_lon, end_lat, end_lon, progress):
|
|
"""Calculate position along great circle path"""
|
|
# Simple linear interpolation for this test
|
|
lat = start_lat + (end_lat - start_lat) * progress
|
|
lon = start_lon + (end_lon - start_lon) * progress
|
|
return lat, lon
|
|
|
|
def run_orlan_detection_test():
|
|
"""Run the comprehensive Orlan detection test"""
|
|
print("=" * 70)
|
|
print("🚨 ORLAN MILITARY DRONE DETECTION TEST")
|
|
print("=" * 70)
|
|
print("Test Scenario:")
|
|
print("• Starting position: 50km away (undetectable)")
|
|
print("• Approach pattern: Straight line to target")
|
|
print("• Detection threshold: ~25km range")
|
|
print("• Critical alerts: <5km from target")
|
|
print("• End position: Directly overhead (0m)")
|
|
print("=" * 70)
|
|
print(f"🔗 API Endpoint: {API_BASE_URL}")
|
|
if SKIP_AUTH:
|
|
print("⚠️ AUTHENTICATION DISABLED - Running in local test mode")
|
|
print()
|
|
|
|
# Authenticate first (unless skipped)
|
|
if not SKIP_AUTH and not authenticate():
|
|
print("❌ Authentication failed. Cannot proceed with test.")
|
|
print("Please check:")
|
|
print("1. Is the server running?")
|
|
print("2. Are the credentials correct?")
|
|
print(f" Username: {USERNAME}")
|
|
print("3. Set TEST_USERNAME and TEST_PASSWORD environment variables if needed")
|
|
print("4. Or set SKIP_AUTH=true to skip authentication for local testing")
|
|
return
|
|
|
|
# Test API connectivity first
|
|
print("🔍 Testing API connectivity...")
|
|
try:
|
|
response = requests.get(f"{API_BASE_URL}/health")
|
|
if response.status_code == 200:
|
|
print("✅ API is accessible")
|
|
else:
|
|
print(f"❌ API health check failed: {response.status_code}")
|
|
return
|
|
except Exception as e:
|
|
print(f"❌ Cannot connect to API: {e}")
|
|
print("Please check:")
|
|
print("1. Is the server running?")
|
|
print("2. Is the API_BASE_URL correct?")
|
|
print(f" Current: {API_BASE_URL}")
|
|
return
|
|
|
|
# Fetch devices
|
|
print("📡 Fetching devices...")
|
|
devices = fetch_devices()
|
|
if not devices:
|
|
print("❌ No devices found!")
|
|
print("Make sure at least one device is registered in the system.")
|
|
return
|
|
|
|
print(f"✅ Found {len(devices)} device(s)")
|
|
|
|
# Use the first device as target
|
|
target_device = devices[0]
|
|
print(f"🎯 Target Device: {target_device['name']}")
|
|
|
|
# Ensure coordinates are floats
|
|
target_lat = float(target_device['geo_lat'])
|
|
target_lon = float(target_device['geo_lon'])
|
|
|
|
print(f"📍 Target Location: {target_lat:.6f}, {target_lon:.6f}")
|
|
|
|
# Calculate starting position 50km away (due north for simplicity)
|
|
start_distance = 50.0 # km
|
|
# Move 50km north of target
|
|
start_lat = target_lat + (start_distance / 111.0) # ~111km per degree latitude
|
|
start_lon = target_lon # Same longitude
|
|
|
|
print(f"🛫 Orlan Starting Position: {start_lat:.6f}, {start_lon:.6f}")
|
|
print(f"📏 Initial Distance: {float(start_distance):.1f}km")
|
|
|
|
# Verify starting position is undetectable
|
|
if is_detectable(start_distance):
|
|
print("⚠️ WARNING: Starting position is within detection range!")
|
|
else:
|
|
print("✅ Starting position confirmed undetectable")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("STARTING ORLAN APPROACH SIMULATION")
|
|
print("=" * 70)
|
|
|
|
# Simulation parameters
|
|
total_steps = 50 # More steps for gradual approach
|
|
final_distance = 0.0 # Directly overhead
|
|
|
|
detection_started = False
|
|
critical_alerts_started = False
|
|
|
|
for step in range(1, total_steps + 1):
|
|
# Calculate current distance using exponential approach for realistic acceleration
|
|
progress = step / total_steps
|
|
|
|
# Use exponential curve for realistic approach - slower at distance, faster when close
|
|
distance_km = start_distance * (1 - progress) ** 1.8 + final_distance * progress ** 1.8
|
|
|
|
# Calculate current position
|
|
current_lat, current_lon = calculate_position_along_path(
|
|
start_lat, start_lon,
|
|
target_lat, target_lon,
|
|
progress
|
|
)
|
|
|
|
# Check detection status
|
|
detectable = is_detectable(distance_km)
|
|
|
|
# Send detection only if within detectable range
|
|
if detectable:
|
|
if not detection_started:
|
|
print(f"\n🔍 FIRST DETECTION at {float(distance_km):.1f}km - Orlan has entered detection range!")
|
|
detection_started = True
|
|
|
|
success = send_detection(target_device, current_lat, current_lon, distance_km, step, total_steps)
|
|
|
|
if not success:
|
|
print(f"❌ Failed to send detection at step {step}")
|
|
continue
|
|
|
|
# Show escalation messages
|
|
if distance_km <= 5 and not critical_alerts_started:
|
|
print(f"🚨 CRITICAL ALERT THRESHOLD REACHED at {float(distance_km):.1f}km!")
|
|
critical_alerts_started = True
|
|
elif distance_km <= 1:
|
|
print(f"🔥 IMMEDIATE THREAT: Orlan within {float(distance_km):.1f}km!")
|
|
elif distance_km <= 0.1:
|
|
print(f"💥 DIRECTLY OVERHEAD: Orlan at {float(distance_km)*1000:.0f}m altitude!")
|
|
else:
|
|
# Outside detection range
|
|
print(f"📡 Step {step}/{total_steps}: Distance={float(distance_km):.1f}km (undetectable, RSSI={float(rssi_from_distance(distance_km)):.0f}dBm)")
|
|
|
|
# Variable delay based on distance
|
|
if distance_km > 30:
|
|
delay = 2.0 # 2 seconds for very long range
|
|
elif distance_km > 15:
|
|
delay = 1.5 # 1.5 seconds for long range
|
|
elif distance_km > 5:
|
|
delay = 1.0 # 1 second for medium range
|
|
else:
|
|
delay = 0.8 # Faster updates for critical proximity
|
|
|
|
time.sleep(delay)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("🚨 ORLAN DETECTION TEST COMPLETED")
|
|
print("=" * 70)
|
|
print("Test Summary:")
|
|
print(f"• Starting distance: {float(start_distance):.1f}km (undetectable)")
|
|
print(f"• Detection range entered: ~{float(MAX_DETECTION_RANGE_KM):.1f}km")
|
|
print(f"• Critical alerts triggered: <5km")
|
|
print(f"• Final position: Directly overhead")
|
|
print(f"• Target device: {target_device['name']}")
|
|
print(f"• Total simulation steps: {total_steps}")
|
|
print("")
|
|
print("Expected Results:")
|
|
print("✅ No detections sent while >25km away")
|
|
print("✅ First detection when entering ~25km range")
|
|
print("✅ Critical alerts triggered for Orlan type (drone_type=1)")
|
|
print("✅ All alerts escalated regardless of distance/RSSI")
|
|
print("✅ Real-time tracking visible on dashboard")
|
|
print("=" * 70)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
run_orlan_detection_test()
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Orlan detection test interrupted by user")
|
|
except Exception as e:
|
|
print(f"\n❌ Error during Orlan detection test: {e}")
|