304 lines
10 KiB
Python
304 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simplified Orlan Detection Test Script
|
|
Tests the drone detection system by sending drone detections directly to the API
|
|
without authentication. This simulates drone sensor devices sending detections.
|
|
|
|
Test Scenarios:
|
|
- Tests different drone types (Orlan, Zala, DJI, etc.)
|
|
- Tests different distances/RSSI values
|
|
- Tests critical vs normal threat levels
|
|
- Tests system response to detections
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import math
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# Disable SSL warnings for self-signed certificates
|
|
import warnings
|
|
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
|
|
|
# Configuration from environment variables
|
|
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
|
BASE_PATH = os.getenv('VITE_BASE_PATH', '').rstrip('/')
|
|
|
|
# If BASE_PATH is set, construct the full URL
|
|
if BASE_PATH and not API_BASE_URL.endswith('/api'):
|
|
domain = API_BASE_URL.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
|
|
API_BASE_URL = f"{domain}{BASE_PATH}/api"
|
|
|
|
print(f"🔗 Using API Base URL: {API_BASE_URL}")
|
|
|
|
# Drone types mapping (matches GuessedDroneType enum)
|
|
DRONE_TYPES = {
|
|
0: "None",
|
|
1: "Unknown",
|
|
2: "Orlan",
|
|
3: "Zala",
|
|
4: "Eleron",
|
|
5: "ZalaLancet",
|
|
6: "Lancet",
|
|
7: "FPV_CrossFire",
|
|
8: "FPV_ELRS",
|
|
9: "MaybeOrlan",
|
|
10: "MaybeZala",
|
|
11: "MaybeLancet",
|
|
12: "MaybeEleron",
|
|
13: "DJI",
|
|
14: "Supercam",
|
|
15: "MaybeSupercam",
|
|
16: "REB",
|
|
17: "CryptoOrlan",
|
|
18: "DJIe"
|
|
}
|
|
|
|
# Detection range parameters
|
|
MAX_DETECTION_RANGE_KM = 25.0 # Maximum range for drone detection
|
|
MIN_RSSI_THRESHOLD = -95 # Minimum RSSI for detection
|
|
|
|
def rssi_from_distance(distance_km):
|
|
"""Calculate RSSI based on distance for drone detection"""
|
|
distance_km = float(distance_km)
|
|
|
|
# If beyond detection range, return very weak signal
|
|
if distance_km > MAX_DETECTION_RANGE_KM:
|
|
return MIN_RSSI_THRESHOLD - 10 # Undetectable
|
|
|
|
# RSSI calculation based on free space path loss
|
|
# Closer distance = stronger signal (higher RSSI)
|
|
# Formula: RSSI = -40 - 20*log10(distance_km)
|
|
rssi = -40 - (20 * math.log10(max(distance_km, 0.1))) # Avoid log(0)
|
|
|
|
# Clamp to realistic range (-30 to -100 dBm)
|
|
return max(-100, min(-30, rssi))
|
|
|
|
def is_detectable(distance_km):
|
|
"""Check if drone is within detectable range"""
|
|
return distance_km <= MAX_DETECTION_RANGE_KM and rssi_from_distance(distance_km) >= MIN_RSSI_THRESHOLD
|
|
|
|
def get_threat_level(distance_km, drone_type):
|
|
"""Determine threat level based on distance and drone type"""
|
|
if distance_km <= 1:
|
|
return "critical"
|
|
elif distance_km <= 5:
|
|
return "high"
|
|
elif distance_km <= 15:
|
|
return "medium"
|
|
else:
|
|
return "low"
|
|
|
|
def send_drone_detection(device_id, drone_type, distance_km, drone_id=2000):
|
|
"""Send a drone detection to the API"""
|
|
distance_km = float(distance_km)
|
|
rssi = rssi_from_distance(distance_km)
|
|
|
|
# Sample device coordinates (Stockholm Central)
|
|
device_lat = 59.3293
|
|
device_lon = 18.0686
|
|
|
|
detection_data = {
|
|
"device_id": device_id,
|
|
"geo_lat": device_lat,
|
|
"geo_lon": device_lon,
|
|
"device_timestamp": int(time.time()), # Unix timestamp in seconds
|
|
"drone_type": drone_type,
|
|
"rssi": int(rssi),
|
|
"freq": 24, # 2.4 GHz frequency
|
|
"drone_id": drone_id
|
|
}
|
|
|
|
try:
|
|
print(f"📡 Sending detection: Device={device_id}, Drone={DRONE_TYPES.get(drone_type, 'Unknown')}, Distance={distance_km:.1f}km, RSSI={rssi:.0f}dBm")
|
|
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/detectors",
|
|
json=detection_data,
|
|
headers={"Content-Type": "application/json"},
|
|
verify=False
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
threat = get_threat_level(distance_km, drone_type)
|
|
status_icon = "🚨" if threat in ["critical", "high"] else "⚠️" if threat == "medium" else "📡"
|
|
print(f"{status_icon} SUCCESS: Detection sent successfully - Threat Level: {threat.upper()}")
|
|
return True
|
|
else:
|
|
print(f"❌ FAILED: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ ERROR: {e}")
|
|
return False
|
|
|
|
def test_drone_approach_scenario(device_id, drone_type=2):
|
|
"""Test a drone approaching from distance to overhead"""
|
|
print(f"\n{'='*70}")
|
|
print(f"🚁 DRONE APPROACH SIMULATION")
|
|
print(f"Device ID: {device_id}")
|
|
print(f"Drone Type: {DRONE_TYPES.get(drone_type, 'Unknown')} (Type {drone_type})")
|
|
print(f"{'='*70}")
|
|
|
|
# Test distances: 30km -> 20km -> 10km -> 5km -> 1km -> 0.1km
|
|
test_distances = [30.0, 20.0, 10.0, 5.0, 1.0, 0.1]
|
|
drone_id = 2000 + drone_type # Unique drone ID per type
|
|
|
|
successful_detections = 0
|
|
|
|
for i, distance in enumerate(test_distances, 1):
|
|
print(f"\n📍 Step {i}/{len(test_distances)}: Testing {distance}km distance")
|
|
|
|
if is_detectable(distance):
|
|
success = send_drone_detection(device_id, drone_type, distance, drone_id)
|
|
if success:
|
|
successful_detections += 1
|
|
time.sleep(1) # Small delay between detections
|
|
else:
|
|
print(f"📡 Distance {distance}km is beyond detection range ({MAX_DETECTION_RANGE_KM}km)")
|
|
|
|
print(f"\n✅ Approach simulation complete: {successful_detections} successful detections")
|
|
return successful_detections
|
|
|
|
def test_multiple_drone_types(device_id):
|
|
"""Test different drone types at critical distance"""
|
|
print(f"\n{'='*70}")
|
|
print(f"🛩️ MULTIPLE DRONE TYPES TEST")
|
|
print(f"Device ID: {device_id}")
|
|
print(f"Testing at critical distance: 2km")
|
|
print(f"{'='*70}")
|
|
|
|
# Test important drone types
|
|
test_types = [2, 3, 6, 13, 17] # Orlan, Zala, Lancet, DJI, CryptoOrlan
|
|
distance = 2.0 # Critical distance
|
|
|
|
successful_detections = 0
|
|
|
|
for drone_type in test_types:
|
|
drone_name = DRONE_TYPES.get(drone_type, 'Unknown')
|
|
print(f"\n🚁 Testing {drone_name} (Type {drone_type})")
|
|
|
|
drone_id = 3000 + drone_type # Unique drone ID per type
|
|
success = send_drone_detection(device_id, drone_type, distance, drone_id)
|
|
if success:
|
|
successful_detections += 1
|
|
time.sleep(1)
|
|
|
|
print(f"\n✅ Multi-drone test complete: {successful_detections}/{len(test_types)} successful detections")
|
|
return successful_detections
|
|
|
|
def test_rapid_detections(device_id, drone_type=2):
|
|
"""Test rapid successive detections (like drone hovering)"""
|
|
print(f"\n{'='*70}")
|
|
print(f"⚡ RAPID DETECTION TEST")
|
|
print(f"Device ID: {device_id}")
|
|
print(f"Drone Type: {DRONE_TYPES.get(drone_type, 'Unknown')} (Type {drone_type})")
|
|
print(f"Simulating drone hovering at 0.5km distance")
|
|
print(f"{'='*70}")
|
|
|
|
distance = 0.5 # Very close - critical threat
|
|
drone_id = 4000 # Unique drone ID
|
|
detection_count = 5
|
|
successful_detections = 0
|
|
|
|
for i in range(1, detection_count + 1):
|
|
print(f"\n🔄 Rapid detection {i}/{detection_count}")
|
|
success = send_drone_detection(device_id, drone_type, distance, drone_id)
|
|
if success:
|
|
successful_detections += 1
|
|
time.sleep(0.5) # Rapid fire
|
|
|
|
print(f"\n✅ Rapid detection test complete: {successful_detections}/{detection_count} successful detections")
|
|
return successful_detections
|
|
|
|
def run_detection_tests():
|
|
"""Run comprehensive drone detection tests"""
|
|
print("🚀 DRONE DETECTION SYSTEM TEST")
|
|
print("="*70)
|
|
print("This script tests drone detection by sending detection data directly")
|
|
print("to the API without authentication (simulating sensor devices).")
|
|
print("="*70)
|
|
|
|
# Use known device IDs from the setup database
|
|
# These should exist if setup-database.js ran successfully
|
|
test_device_ids = [
|
|
"device-alpha-001",
|
|
"device-beta-002",
|
|
"device-gamma-003"
|
|
]
|
|
|
|
# Try each device until we find one that works
|
|
working_device = None
|
|
for device_id in test_device_ids:
|
|
print(f"\n🔍 Testing device: {device_id}")
|
|
success = send_drone_detection(device_id, 2, 5.0, 9999) # Test detection
|
|
if success:
|
|
working_device = device_id
|
|
print(f"✅ Device {device_id} is working and approved")
|
|
break
|
|
else:
|
|
print(f"❌ Device {device_id} failed (not found or not approved)")
|
|
|
|
if not working_device:
|
|
print("\n❌ No working devices found!")
|
|
print("Make sure the database setup script has run and devices are approved.")
|
|
return False
|
|
|
|
print(f"\n🎯 Using device: {working_device}")
|
|
|
|
# Run test scenarios
|
|
total_tests = 0
|
|
total_successful = 0
|
|
|
|
print(f"\n{'='*70}")
|
|
print("STARTING TEST SCENARIOS")
|
|
print(f"{'='*70}")
|
|
|
|
# Test 1: Drone approach scenario
|
|
successful = test_drone_approach_scenario(working_device, 2) # Orlan
|
|
total_tests += 6 # Number of distance steps
|
|
total_successful += successful
|
|
|
|
# Test 2: Multiple drone types
|
|
successful = test_multiple_drone_types(working_device)
|
|
total_tests += 5 # Number of drone types
|
|
total_successful += successful
|
|
|
|
# Test 3: Rapid detections
|
|
successful = test_rapid_detections(working_device, 6) # Lancet
|
|
total_tests += 5 # Number of rapid detections
|
|
total_successful += successful
|
|
|
|
# Summary
|
|
print(f"\n{'='*70}")
|
|
print("TEST SUMMARY")
|
|
print(f"{'='*70}")
|
|
print(f"Total Tests: {total_tests}")
|
|
print(f"Successful: {total_successful}")
|
|
print(f"Failed: {total_tests - total_successful}")
|
|
print(f"Success Rate: {(total_successful/total_tests)*100:.1f}%")
|
|
|
|
if total_successful == total_tests:
|
|
print("🎉 ALL TESTS PASSED! Detection system is working correctly.")
|
|
elif total_successful > 0:
|
|
print("⚠️ PARTIAL SUCCESS - Some detections failed.")
|
|
else:
|
|
print("❌ ALL TESTS FAILED - Detection system has issues.")
|
|
|
|
print(f"\n💡 Check the backend logs for detailed processing information.")
|
|
print(f"💡 Check the dashboard to see if detections appear in real-time.")
|
|
|
|
return total_successful == total_tests
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
success = run_detection_tests()
|
|
exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n⚠️ Test interrupted by user")
|
|
exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Test failed with error: {e}")
|
|
exit(1) |