diff --git a/test_orlan_detection.py b/test_orlan_detection.py index 3110548..7bdaa17 100644 --- a/test_orlan_detection.py +++ b/test_orlan_detection.py @@ -1,25 +1,14 @@ #!/usr/bin/env python3 """ -Orlan Detection Test Script -Tests the critical alert system for Orlan military drones by simulating -a long-distance approach from undetectable range to directly overhead. +Simplified Orlan Detection Test Script +Tests the drone detection system by sending drone detections directly to the API +without authentication. This simulates drone sensor devices sending detections. -Test Scenario: -- Starts 50km away (beyond detection range) -- Slowly approaches target device -- Triggers critical alerts as print(f"šŸ›« {drone_name} Starting Distance: {float(start_distance):.1f}km from device") - print(f"šŸ“ Device Location: {target_lat:.6f}, {target_lon:.6f}") - - # Verify starting position is undetectable - if is_detectable(start_distance): - print("āš ļø WARNING: Starting position is within detection range!") - else: - print("āœ… Starting position confirmed undetectable") - - print("\n" + "=" * 70) - print(f"STARTING {drone_name.upper()} APPROACH SIMULATION") - print("=" * 70)etection range -- Ends with drone hovering directly above target +Test Scenarios: +- Tests different drone types (Orlan, Zala, DJI, etc.) +- Tests different distances/RSSI values +- Tests critical vs normal threat levels +- Tests system response to detections """ import requests @@ -34,73 +23,16 @@ import warnings warnings.filterwarnings('ignore', message='Unverified HTTPS request') # Configuration from environment variables -# Tests default to localhost:3002 for local development API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api') BASE_PATH = os.getenv('VITE_BASE_PATH', '').rstrip('/') -# Authentication configuration - Optional for local testing -USERNAME = os.getenv('TEST_USERNAME', 'admin') -PASSWORD = os.getenv('TEST_PASSWORD', 'admin123') -SKIP_AUTH = os.getenv('SKIP_AUTH', 'false').lower() == 'true' # Set to 'true' to skip authentication - # If BASE_PATH is set, construct the full URL if BASE_PATH and not API_BASE_URL.endswith('/api'): - # Extract domain from API_BASE_URL and add base path domain = API_BASE_URL.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '') API_BASE_URL = f"{domain}{BASE_PATH}/api" print(f"šŸ”— Using API Base URL: {API_BASE_URL}") -# Global variable to store authentication token -AUTH_TOKEN = None - -def authenticate(): - """Authenticate with the API and get access token""" - global AUTH_TOKEN - - login_data = { - "username": USERNAME, - "password": PASSWORD - } - - try: - print(f"šŸ” Authenticating as user: {USERNAME}") - response = requests.post(f"{API_BASE_URL}/users/login", json=login_data, verify=False) - - if response.status_code == 200: - data = response.json() - AUTH_TOKEN = data.get('data', {}).get('token') - if AUTH_TOKEN: - print("āœ… Authentication successful") - return True - else: - print("āŒ No token received in response") - print(f"Response data: {data}") - return False - else: - print(f"āŒ Authentication failed: {response.status_code}") - print(f"Response: {response.text}") - return False - except Exception as e: - print(f"āŒ Authentication error: {e}") - return False - -def get_auth_headers(): - """Get headers with authentication token""" - if SKIP_AUTH: - return {"Content-Type": "application/json"} - - if AUTH_TOKEN: - return { - "Authorization": f"Bearer {AUTH_TOKEN}", - "Content-Type": "application/json" - } - return {"Content-Type": "application/json"} - -# Detection range parameters (approximate) -MAX_DETECTION_RANGE_KM = 25.0 # Maximum range for drone detection -MIN_RSSI_THRESHOLD = -95 # Minimum RSSI for detection - # Drone types mapping (matches GuessedDroneType enum) DRONE_TYPES = { 0: "None", @@ -124,311 +56,249 @@ DRONE_TYPES = { 18: "DJIe" } -def select_drone_type(): - """Allow user to select drone type for simulation""" - print("\n" + "=" * 50) - print("🚁 DRONE TYPE SELECTION") - print("=" * 50) - print("Available drone types:") - - # Group by category for better display - military_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["Orlan", "Zala", "Eleron", "ZalaLancet", "Lancet", "CryptoOrlan"]] - maybe_types = [(k, v) for k, v in DRONE_TYPES.items() if v.startswith("Maybe")] - fpv_types = [(k, v) for k, v in DRONE_TYPES.items() if v.startswith("FPV_")] - commercial_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["DJI", "DJIe", "Supercam", "MaybeSupercam"]] - other_types = [(k, v) for k, v in DRONE_TYPES.items() if v in ["None", "Unknown", "REB"]] - - print("\nšŸŽ–ļø MILITARY DRONES:") - for drone_id, name in military_types: - print(f" {drone_id:2d}: {name}") - - print("\nšŸ¢ COMMERCIAL DRONES:") - for drone_id, name in commercial_types: - print(f" {drone_id:2d}: {name}") - - print("\nšŸŽ® FPV DRONES:") - for drone_id, name in fpv_types: - print(f" {drone_id:2d}: {name}") - - print("\nā“ UNCERTAIN TYPES:") - for drone_id, name in maybe_types: - print(f" {drone_id:2d}: {name}") - - print("\nšŸ”§ OTHER:") - for drone_id, name in other_types: - print(f" {drone_id:2d}: {name}") - - print("\n" + "=" * 50) - - while True: - try: - choice = input(f"Select drone type (0-{max(DRONE_TYPES.keys())}) [default: 2-Orlan]: ").strip() - - if choice == "": - return 2 # Default to Orlan - - drone_type = int(choice) - if drone_type in DRONE_TYPES: - print(f"āœ… Selected: {DRONE_TYPES[drone_type]} (Type {drone_type})") - return drone_type - else: - print(f"āŒ Invalid choice. Please select 0-{max(DRONE_TYPES.keys())}") - - except ValueError: - print("āŒ Please enter a valid number") - except KeyboardInterrupt: - print("\nāš ļø Using default: Orlan (Type 2)") - return 2 - -def haversine_distance(lat1, lon1, lat2, lon2): - """Calculate distance between two points in kilometers""" - R = 6371 # Earth's radius in kilometers - - dlat = math.radians(lat2 - lat1) - dlon = math.radians(lon2 - lon1) - - a = (math.sin(dlat/2) * math.sin(dlat/2) + - math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * - math.sin(dlon/2) * math.sin(dlon/2)) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - - return R * c +# Detection range parameters +MAX_DETECTION_RANGE_KM = 25.0 # Maximum range for drone detection +MIN_RSSI_THRESHOLD = -95 # Minimum RSSI for detection def rssi_from_distance(distance_km): - """Calculate RSSI based on distance for Orlan drone""" - # Ensure distance_km is a float + """Calculate RSSI based on distance for drone detection""" distance_km = float(distance_km) - # Orlan drones have powerful military-grade transmission systems - # Base RSSI at 1km = -55 dBm (stronger than civilian drones) - base_rssi = -55 - path_loss_exponent = 2.3 # Lower path loss due to military equipment + # If beyond detection range, return very weak signal + if distance_km > MAX_DETECTION_RANGE_KM: + return MIN_RSSI_THRESHOLD - 10 # Undetectable - if distance_km < 0.001: # Less than 1 meter - return -25 + # RSSI calculation based on free space path loss + # Closer distance = stronger signal (higher RSSI) + # Formula: RSSI = -40 - 20*log10(distance_km) + rssi = -40 - (20 * math.log10(max(distance_km, 0.1))) # Avoid log(0) - rssi = base_rssi - (20 * path_loss_exponent * math.log10(distance_km)) - return max(rssi, -100) # Cap at -100 dBm + # Clamp to realistic range (-30 to -100 dBm) + return max(-100, min(-30, rssi)) def is_detectable(distance_km): """Check if drone is within detectable range""" - rssi = rssi_from_distance(distance_km) - return distance_km <= MAX_DETECTION_RANGE_KM and rssi >= MIN_RSSI_THRESHOLD + return distance_km <= MAX_DETECTION_RANGE_KM and rssi_from_distance(distance_km) >= MIN_RSSI_THRESHOLD -def fetch_devices(): - """Fetch devices from API""" - try: - if SKIP_AUTH: - # Try without authentication first for local testing - response = requests.get(f"{API_BASE_URL}/devices/map", verify=False) - else: - response = requests.get(f"{API_BASE_URL}/devices", headers=get_auth_headers(), verify=False) - - if response.status_code == 200: - data = response.json() - return data.get('data', []) - else: - print(f"Failed to fetch devices: {response.status_code}") - print(f"Response: {response.text}") - except Exception as e: - print(f"Error fetching devices: {e}") - return [] +def get_threat_level(distance_km, drone_type): + """Determine threat level based on distance and drone type""" + if distance_km <= 1: + return "critical" + elif distance_km <= 5: + return "high" + elif distance_km <= 15: + return "medium" + else: + return "low" -def send_detection(device, distance_km, step, total_steps, drone_type=2): - """Send a detection to the API using EXACT standard payload format""" - # Ensure distance_km is a float +def send_drone_detection(device_id, drone_type, distance_km, drone_id=2000): + """Send a drone detection to the API""" distance_km = float(distance_km) rssi = rssi_from_distance(distance_km) - # Use the device's coordinates as the detection location - # The RSSI indicates how close/far the drone is from this device + # Sample device coordinates (Stockholm Central) + device_lat = 59.3293 + device_lon = 18.0686 + detection_data = { - "device_id": device["id"], - "geo_lat": float(device["geo_lat"]), # Device location - "geo_lon": float(device["geo_lon"]), # Device location - "device_timestamp": int(time.time() * 1000), # Current timestamp in milliseconds - "drone_type": drone_type, # Selected drone type - "rssi": int(rssi), # RSSI indicates distance from device - "freq": 24, # Most drones operate on 2.4 GHz (24 = 2400 MHz) - "drone_id": 2000 # Consistent drone ID to track the same drone + "device_id": device_id, + "geo_lat": device_lat, + "geo_lon": device_lon, + "device_timestamp": int(time.time()), # Unix timestamp in seconds + "drone_type": drone_type, + "rssi": int(rssi), + "freq": 24, # 2.4 GHz frequency + "drone_id": drone_id } try: - response = requests.post(f"{API_BASE_URL}/detectors", json=detection_data, verify=False) + print(f"šŸ“” Sending detection: Device={device_id}, Drone={DRONE_TYPES.get(drone_type, 'Unknown')}, Distance={distance_km:.1f}km, RSSI={rssi:.0f}dBm") + + response = requests.post( + f"{API_BASE_URL}/detectors", + json=detection_data, + headers={"Content-Type": "application/json"}, + verify=False + ) + if response.status_code == 201: - status = "🚨 CRITICAL ALERT" if distance_km <= 5 else "āš ļø DETECTED" if is_detectable(distance_km) else "šŸ“” MONITORING" - print(f"{status} - Step {step}/{total_steps}: Distance={float(distance_km):.1f}km, RSSI={float(rssi):.0f}dBm") + threat = get_threat_level(distance_km, drone_type) + status_icon = "🚨" if threat in ["critical", "high"] else "āš ļø" if threat == "medium" else "šŸ“”" + print(f"{status_icon} SUCCESS: Detection sent successfully - Threat Level: {threat.upper()}") return True else: - print(f"āŒ Failed to send detection: {response.status_code}") - print(f"Response: {response.text}") + print(f"āŒ FAILED: {response.status_code} - {response.text}") return False + except Exception as e: - print(f"āŒ Error sending detection: {e}") + print(f"āŒ ERROR: {e}") return False -def run_orlan_detection_test(): - """Run the comprehensive drone detection test""" - print("=" * 70) - print("ļæ½ DRONE DETECTION SIMULATION TEST") - print("=" * 70) - print("Test Scenario:") - print("• Starting position: 50km away (undetectable)") - print("• Approach pattern: Gradual approach to device") - print("• Detection threshold: ~25km range") - print("• Critical alerts: <5km from device") - print("• End position: Directly overhead (0m)") - print("=" * 70) - print(f"šŸ”— API Endpoint: {API_BASE_URL}") - if SKIP_AUTH: - print("āš ļø AUTHENTICATION DISABLED - Running in local test mode") - print() +def test_drone_approach_scenario(device_id, drone_type=2): + """Test a drone approaching from distance to overhead""" + print(f"\n{'='*70}") + print(f"🚁 DRONE APPROACH SIMULATION") + print(f"Device ID: {device_id}") + print(f"Drone Type: {DRONE_TYPES.get(drone_type, 'Unknown')} (Type {drone_type})") + print(f"{'='*70}") - # Select drone type for simulation - selected_drone_type = select_drone_type() - drone_name = DRONE_TYPES[selected_drone_type] + # Test distances: 30km -> 20km -> 10km -> 5km -> 1km -> 0.1km + test_distances = [30.0, 20.0, 10.0, 5.0, 1.0, 0.1] + drone_id = 2000 + drone_type # Unique drone ID per type - # Authenticate first (unless skipped) - if not SKIP_AUTH and not authenticate(): - print("āŒ Authentication failed. Cannot proceed with test.") - print("Please check:") - print("1. Is the server running?") - print("2. Are the credentials correct?") - print(f" Username: {USERNAME}") - print("3. Set TEST_USERNAME and TEST_PASSWORD environment variables if needed") - print("4. Or set SKIP_AUTH=true to skip authentication for local testing") - return + successful_detections = 0 - # Test API connectivity first - print("šŸ” Testing API connectivity...") - try: - response = requests.get(f"{API_BASE_URL}/health") - if response.status_code == 200: - print("āœ… API is accessible") + for i, distance in enumerate(test_distances, 1): + print(f"\nšŸ“ Step {i}/{len(test_distances)}: Testing {distance}km distance") + + if is_detectable(distance): + success = send_drone_detection(device_id, drone_type, distance, drone_id) + if success: + successful_detections += 1 + time.sleep(1) # Small delay between detections else: - print(f"āŒ API health check failed: {response.status_code}") - return - except Exception as e: - print(f"āŒ Cannot connect to API: {e}") - print("Please check:") - print("1. Is the server running?") - print("2. Is the API_BASE_URL correct?") - print(f" Current: {API_BASE_URL}") - return + print(f"šŸ“” Distance {distance}km is beyond detection range ({MAX_DETECTION_RANGE_KM}km)") - # Fetch devices - print("šŸ“” Fetching devices...") - devices = fetch_devices() - if not devices: - print("āŒ No devices found!") - print("Make sure at least one device is registered in the system.") - return + print(f"\nāœ… Approach simulation complete: {successful_detections} successful detections") + return successful_detections + +def test_multiple_drone_types(device_id): + """Test different drone types at critical distance""" + print(f"\n{'='*70}") + print(f"šŸ›©ļø MULTIPLE DRONE TYPES TEST") + print(f"Device ID: {device_id}") + print(f"Testing at critical distance: 2km") + print(f"{'='*70}") - print(f"āœ… Found {len(devices)} device(s)") + # Test important drone types + test_types = [2, 3, 6, 13, 17] # Orlan, Zala, Lancet, DJI, CryptoOrlan + distance = 2.0 # Critical distance - # Use the first device as target - target_device = devices[0] - print(f"šŸŽÆ Target Device: {target_device['name']}") + successful_detections = 0 - # Ensure coordinates are floats - target_lat = float(target_device['geo_lat']) - target_lon = float(target_device['geo_lon']) + for drone_type in test_types: + drone_name = DRONE_TYPES.get(drone_type, 'Unknown') + print(f"\n🚁 Testing {drone_name} (Type {drone_type})") + + drone_id = 3000 + drone_type # Unique drone ID per type + success = send_drone_detection(device_id, drone_type, distance, drone_id) + if success: + successful_detections += 1 + time.sleep(1) - print(f"šŸ“ Target Location: {target_lat:.6f}, {target_lon:.6f}") + print(f"\nāœ… Multi-drone test complete: {successful_detections}/{len(test_types)} successful detections") + return successful_detections + +def test_rapid_detections(device_id, drone_type=2): + """Test rapid successive detections (like drone hovering)""" + print(f"\n{'='*70}") + print(f"⚔ RAPID DETECTION TEST") + print(f"Device ID: {device_id}") + print(f"Drone Type: {DRONE_TYPES.get(drone_type, 'Unknown')} (Type {drone_type})") + print(f"Simulating drone hovering at 0.5km distance") + print(f"{'='*70}") - # Calculate starting position 50km away for distance simulation - start_distance = 50.0 # km + distance = 0.5 # Very close - critical threat + drone_id = 4000 # Unique drone ID + detection_count = 5 + successful_detections = 0 - print(f"šŸ›« Orlan Starting Distance: {float(start_distance):.1f}km from device") - print(f"ļæ½ Device Location: {target_lat:.6f}, {target_lon:.6f}") + for i in range(1, detection_count + 1): + print(f"\nšŸ”„ Rapid detection {i}/{detection_count}") + success = send_drone_detection(device_id, drone_type, distance, drone_id) + if success: + successful_detections += 1 + time.sleep(0.5) # Rapid fire - # Verify starting position is undetectable - if is_detectable(start_distance): - print("āš ļø WARNING: Starting position is within detection range!") + print(f"\nāœ… Rapid detection test complete: {successful_detections}/{detection_count} successful detections") + return successful_detections + +def run_detection_tests(): + """Run comprehensive drone detection tests""" + print("šŸš€ DRONE DETECTION SYSTEM TEST") + print("="*70) + print("This script tests drone detection by sending detection data directly") + print("to the API without authentication (simulating sensor devices).") + print("="*70) + + # Use known device IDs from the setup database + # These should exist if setup-database.js ran successfully + test_device_ids = [ + "device-alpha-001", + "device-beta-002", + "device-gamma-003" + ] + + # Try each device until we find one that works + working_device = None + for device_id in test_device_ids: + print(f"\nšŸ” Testing device: {device_id}") + success = send_drone_detection(device_id, 2, 5.0, 9999) # Test detection + if success: + working_device = device_id + print(f"āœ… Device {device_id} is working and approved") + break + else: + print(f"āŒ Device {device_id} failed (not found or not approved)") + + if not working_device: + print("\nāŒ No working devices found!") + print("Make sure the database setup script has run and devices are approved.") + return False + + print(f"\nšŸŽÆ Using device: {working_device}") + + # Run test scenarios + total_tests = 0 + total_successful = 0 + + print(f"\n{'='*70}") + print("STARTING TEST SCENARIOS") + print(f"{'='*70}") + + # Test 1: Drone approach scenario + successful = test_drone_approach_scenario(working_device, 2) # Orlan + total_tests += 6 # Number of distance steps + total_successful += successful + + # Test 2: Multiple drone types + successful = test_multiple_drone_types(working_device) + total_tests += 5 # Number of drone types + total_successful += successful + + # Test 3: Rapid detections + successful = test_rapid_detections(working_device, 6) # Lancet + total_tests += 5 # Number of rapid detections + total_successful += successful + + # Summary + print(f"\n{'='*70}") + print("TEST SUMMARY") + print(f"{'='*70}") + print(f"Total Tests: {total_tests}") + print(f"Successful: {total_successful}") + print(f"Failed: {total_tests - total_successful}") + print(f"Success Rate: {(total_successful/total_tests)*100:.1f}%") + + if total_successful == total_tests: + print("šŸŽ‰ ALL TESTS PASSED! Detection system is working correctly.") + elif total_successful > 0: + print("āš ļø PARTIAL SUCCESS - Some detections failed.") else: - print("āœ… Starting position confirmed undetectable") + print("āŒ ALL TESTS FAILED - Detection system has issues.") - print("\n" + "=" * 70) - print("STARTING ORLAN APPROACH SIMULATION") - print("=" * 70) + print(f"\nšŸ’” Check the backend logs for detailed processing information.") + print(f"šŸ’” Check the dashboard to see if detections appear in real-time.") - # Simulation parameters - total_steps = 50 # More steps for gradual approach - final_distance = 0.0 # Directly overhead - - detection_started = False - critical_alerts_started = False - - for step in range(1, total_steps + 1): - # Calculate current distance using exponential approach for realistic acceleration - progress = step / total_steps - - # Use exponential curve for realistic approach - slower at distance, faster when close - distance_km = start_distance * (1 - progress) ** 1.8 + final_distance * progress ** 1.8 - - # Check detection status - detectable = is_detectable(distance_km) - - # Send detection only if within detectable range - if detectable: - if not detection_started: - print(f"\nšŸ” FIRST DETECTION at {float(distance_km):.1f}km - {drone_name} has entered detection range!") - detection_started = True - - success = send_detection(target_device, distance_km, step, total_steps, selected_drone_type) - - if not success: - print(f"āŒ Failed to send detection at step {step}") - continue - - # Show escalation messages - if distance_km <= 5 and not critical_alerts_started: - print(f"🚨 CRITICAL ALERT THRESHOLD REACHED at {float(distance_km):.1f}km!") - critical_alerts_started = True - elif distance_km <= 1: - print(f"šŸ”„ IMMEDIATE THREAT: {drone_name} within {float(distance_km):.1f}km!") - elif distance_km <= 0.1: - print(f"šŸ’„ DIRECTLY OVERHEAD: {drone_name} at {float(distance_km)*1000:.0f}m altitude!") - else: - # Outside detection range - print(f"šŸ“” Step {step}/{total_steps}: Distance={float(distance_km):.1f}km (undetectable, RSSI={float(rssi_from_distance(distance_km)):.0f}dBm)") - - # Variable delay based on distance - if distance_km > 30: - delay = 2.0 # 2 seconds for very long range - elif distance_km > 15: - delay = 1.5 # 1.5 seconds for long range - elif distance_km > 5: - delay = 1.0 # 1 second for medium range - else: - delay = 0.8 # Faster updates for critical proximity - - time.sleep(delay) - - print("\n" + "=" * 70) - print(f"ļæ½ {drone_name.upper()} DETECTION TEST COMPLETED") - print("=" * 70) - print("Test Summary:") - print(f"• Starting distance: {float(start_distance):.1f}km (undetectable)") - print(f"• Detection range entered: ~{float(MAX_DETECTION_RANGE_KM):.1f}km") - print(f"• Critical alerts triggered: <5km") - print(f"• Final position: Directly overhead") - print(f"• Target device: {target_device['name']}") - print(f"• Drone type tested: {drone_name} (Type {selected_drone_type})") - print(f"• Total simulation steps: {total_steps}") - print("") - print("Expected Results:") - print("āœ… No detections sent while >25km away") - print("āœ… First detection when entering ~25km range") - print(f"āœ… Critical alerts triggered for {drone_name} type (drone_type={selected_drone_type})") - print("āœ… All alerts escalated regardless of distance/RSSI") - print("āœ… Real-time tracking visible on dashboard") - print("=" * 70) + return total_successful == total_tests if __name__ == "__main__": try: - run_orlan_detection_test() + success = run_detection_tests() + exit(0 if success else 1) except KeyboardInterrupt: - print("\n\nāš ļø Drone detection test interrupted by user") + print("\nāš ļø Test interrupted by user") + exit(1) except Exception as e: - print(f"\nāŒ Error during drone detection test: {e}") + print(f"\nāŒ Test failed with error: {e}") + exit(1) \ No newline at end of file