#!/usr/bin/env python3 """ Realistic Drone Detection Test Script Simulates realistic drone scenarios with persistent tracking, RSSI changes, and real-world patterns. Fetches existing devices from the API and simulates drone detections around them. """ import requests import json import time import random import math from datetime import datetime, timedelta # Configuration API_BASE_URL = "http://selfservice.cqers.com/drones/api" # Alternative for local testing: "http://localhost:3002/api" # Debug configuration DEBUG_MODE = False # Set to True to enable payload debugging # Global variable to store devices fetched from API DEVICES = [] # Realistic drone types with characteristics DRONE_TYPES = { 0: {"name": "DJI Mavic (Consumer)", "max_speed": 65, "typical_rssi": -60, "freq": [2400, 2450]}, 1: {"name": "Orlan Military Drone", "max_speed": 150, "typical_rssi": -50, "freq": [900, 2400]}, # High threat 2: {"name": "DJI Matrice (Professional)", "max_speed": 80, "typical_rssi": -55, "freq": [2400, 5800]}, 3: {"name": "Racing Drone", "max_speed": 120, "typical_rssi": -55, "freq": [2400, 5800]}, 4: {"name": "Unknown/Custom", "max_speed": 70, "typical_rssi": -65, "freq": [2400, 2450]} } def fetch_devices(): """Fetch active devices from the API""" global DEVICES try: response = requests.get(f"{API_BASE_URL}/devices/map") if response.status_code == 200: data = response.json() api_devices = data.get('data', []) # Convert API device format to simulator format DEVICES = [] for device in api_devices: DEVICES.append({ "id": device["id"], "name": device["name"], "lat": float(device["geo_lat"]), "lon": float(device["geo_lon"]), "coverage_radius": 5.0 # Default 5km coverage radius }) print(f"āœ… Fetched {len(DEVICES)} active devices from API") for device in DEVICES: print(f" - {device['name']} at ({device['lat']:.4f}, {device['lon']:.4f})") return True else: print(f"āŒ Failed to fetch devices: {response.status_code}") return False except Exception as e: print(f"āŒ Error fetching devices: {e}") return False class DroneSimulator: def __init__(self): self.active_drones = {} # Track persistent drones self.drone_counter = 1000 def calculate_rssi(self, distance_km, base_rssi=-60): """Calculate realistic RSSI based on distance (Free Space Path Loss)""" if distance_km < 0.1: # Very close return max(base_rssi + 20, -30) # Simplified FSPL: RSSI decreases ~20dB per decade of distance rssi = base_rssi - (20 * math.log10(distance_km)) # Add some realistic variation variation = random.uniform(-5, 5) final_rssi = int(rssi + variation) # Clamp to realistic values return max(min(final_rssi, -30), -100) def calculate_distance(self, lat1, lon1, lat2, lon2): """Calculate distance between two points in km""" R = 6371 # Earth radius in km lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = (math.sin(delta_lat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon/2)**2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def create_new_drone(self, device): """Create a new drone with realistic starting position""" self.drone_counter += 1 # Drone type selection with weighted probabilities # Orlan drones are rare (2% chance), consumer drones common (50%), others distributed rand = random.random() if rand < 0.02: # 2% chance for Orlan (military) drone_type = 1 print(f"🚨 GENERATING ORLAN MILITARY DRONE (ID: {self.drone_counter}) - High threat simulation!") elif rand < 0.52: # 50% chance for consumer drone_type = 0 elif rand < 0.72: # 20% chance for professional drone_type = 2 elif rand < 0.92: # 20% chance for racing drone_type = 3 else: # 8% chance for unknown drone_type = 4 # Start at edge of coverage area angle = random.uniform(0, 2 * math.pi) start_distance = device["coverage_radius"] * random.uniform(0.8, 1.0) start_lat = device["lat"] + (start_distance / 111.0) * math.cos(angle) start_lon = device["lon"] + (start_distance / (111.0 * math.cos(math.radians(device["lat"])))) * math.sin(angle) # Movement pattern patterns = ["approaching", "patrolling", "departing", "hovering"] movement_pattern = random.choice(patterns) drone = { "id": self.drone_counter, "type": drone_type, "lat": start_lat, "lon": start_lon, "target_lat": device["lat"] if movement_pattern == "approaching" else start_lat, "target_lon": device["lon"] if movement_pattern == "approaching" else start_lon, "speed_kmh": DRONE_TYPES[drone_type]["max_speed"] * random.uniform(0.3, 0.8), "pattern": movement_pattern, "created_at": time.time(), "last_detection": 0, "detection_count": 0, "is_active": True } return drone def update_drone_position(self, drone, time_delta_seconds): """Update drone position based on movement pattern""" if not drone["is_active"]: return speed_ms = drone["speed_kmh"] * 1000 / 3600 # Convert to m/s distance_m = speed_ms * time_delta_seconds if drone["pattern"] == "approaching": # Move towards the device center current_distance = self.calculate_distance( drone["lat"], drone["lon"], drone["target_lat"], drone["target_lon"] ) * 1000 # Convert to meters if current_distance > 100: # Still approaching lat_diff = drone["target_lat"] - drone["lat"] lon_diff = drone["target_lon"] - drone["lon"] total_diff = math.sqrt(lat_diff**2 + lon_diff**2) if total_diff > 0: lat_step = (lat_diff / total_diff) * (distance_m / 111000) lon_step = (lon_diff / total_diff) * (distance_m / 111000) drone["lat"] += lat_step drone["lon"] += lon_step else: # Reached target, switch to hovering drone["pattern"] = "hovering" elif drone["pattern"] == "hovering": # Small random movements drone["lat"] += random.uniform(-0.0001, 0.0001) drone["lon"] += random.uniform(-0.0001, 0.0001) elif drone["pattern"] == "patrolling": # Circular or figure-8 movement t = (time.time() - drone["created_at"]) / 100 # Slow circular motion radius = 0.002 # Small patrol radius drone["lat"] = drone["target_lat"] + radius * math.sin(t) drone["lon"] = drone["target_lon"] + radius * math.cos(t) elif drone["pattern"] == "departing": # Move away from device lat_diff = drone["lat"] - drone["target_lat"] lon_diff = drone["lon"] - drone["target_lon"] total_diff = math.sqrt(lat_diff**2 + lon_diff**2) if total_diff > 0: lat_step = (lat_diff / total_diff) * (distance_m / 111000) lon_step = (lon_diff / total_diff) * (distance_m / 111000) drone["lat"] += lat_step drone["lon"] += lon_step def should_detect_drone(self, drone, device): """Determine if device should detect this drone""" distance_km = self.calculate_distance( drone["lat"], drone["lon"], device["lat"], device["lon"] ) # Detection probability decreases with distance if distance_km > device["coverage_radius"]: return False, distance_km # Higher chance of detection when closer detection_prob = 1.0 - (distance_km / device["coverage_radius"]) * 0.7 # Factor in time since last detection (avoid spam) time_since_last = time.time() - drone.get("last_detection", 0) if time_since_last < 2: # Minimum 2 seconds between detections detection_prob *= 0.1 return random.random() < detection_prob, distance_km def generate_detection(self, drone, device, distance_km): """Generate realistic detection data""" drone_info = DRONE_TYPES[drone["type"]] # Calculate RSSI based on distance rssi = self.calculate_rssi(distance_km, drone_info["typical_rssi"]) # Select frequency freq = random.choice(drone_info["freq"]) # Confidence decreases with distance and lower RSSI base_confidence = 0.95 - (distance_km / 10.0) * 0.3 rssi_factor = (rssi + 100) / 70 # Normalize RSSI to 0-1 confidence = max(0.5, min(0.99, base_confidence * rssi_factor)) # Signal duration varies by drone type and detection quality duration_base = 2000 if drone["pattern"] == "hovering" else 1000 duration = duration_base + random.randint(-500, 1500) detection = { "device_id": device["id"], "drone_id": drone["id"], "drone_type": drone["type"], "rssi": rssi, "freq": freq, "geo_lat": drone["lat"], "geo_lon": drone["lon"], "device_timestamp": int(time.time() * 1000), "confidence_level": round(confidence, 2), "signal_duration": max(500, duration) } # Update drone tracking drone["last_detection"] = time.time() drone["detection_count"] += 1 return detection def send_detection(detection_data): """Send detection data to the API""" url = f"{API_BASE_URL}/detections" headers = { "Content-Type": "application/json" } # Debug output - show complete payload being sent if DEBUG_MODE: print("\n" + "="*60) print("šŸ› DEBUG: Sending payload to backend") print("="*60) print(f"URL: {url}") print(f"Headers: {json.dumps(headers, indent=2)}") print("Payload:") print(json.dumps(detection_data, indent=2, sort_keys=True)) print("="*60 + "\n") try: response = requests.post(url, json=detection_data, headers=headers, timeout=10) if response.status_code == 201: print(f"āœ… Device {detection_data['device_id']}: Drone {detection_data['drone_id']} " f"(RSSI: {detection_data['rssi']}dBm, Dist: ~{detection_data.get('_distance', 'N/A')}km)") return True else: print(f"āŒ Failed: {response.status_code} - {response.text}") if DEBUG_MODE: print(f"šŸ› DEBUG: Response body: {response.text}") return False except requests.exceptions.RequestException as e: print(f"āŒ Network error: {e}") if DEBUG_MODE: print(f"šŸ› DEBUG: Exception details: {str(e)}") return False def simulate_orlan_detection_test(): """ Test Orlan military drone detection system Spawns an Orlan 50km from device, slowly approaches until hovering overhead Tests critical alert escalation for high-threat drones """ print("=" * 70) print("🚨 ORLAN MILITARY DRONE DETECTION TEST") print("=" * 70) print("Test Scenario:") print("• Starting position: 50km away (undetectable)") print("• Drone type: Orlan Military (type 1)") print("• Approach pattern: Straight line to target") print("• Detection threshold: ~25km range") print("• Critical alerts: Auto-escalated for Orlan type") print("• End position: Directly overhead (0m)") print("=" * 70) if not DEVICES: print("āŒ No devices available for testing!") return # Use the first device as target target_device = DEVICES[0] print(f"šŸŽÆ Target Device: {target_device['name']}") print(f"šŸ“ Target Location: {target_device['lat']:.6f}, {target_device['lon']:.6f}") # Calculate starting position 50km away (due north) start_distance = 50.0 # km start_lat = target_device['lat'] + (start_distance / 111.0) # ~111km per degree latitude start_lon = target_device['lon'] # Same longitude print(f"šŸ›« Orlan Starting Position: {start_lat:.6f}, {start_lon:.6f}") print(f"šŸ“ Initial Distance: {start_distance:.1f}km") # Detection range parameters max_detection_range = 25.0 # km min_rssi_threshold = -95 def is_detectable(distance): """Check if drone is within detectable range""" if distance > max_detection_range: return False # Calculate RSSI for Orlan (military grade) base_rssi = -50 # Stronger than civilian drones rssi = base_rssi - (20 * 2.3 * math.log10(max(distance, 0.001))) return rssi >= min_rssi_threshold def calculate_orlan_rssi(distance_km): """Calculate RSSI for Orlan military drone""" if distance_km < 0.001: return -25 base_rssi = -50 # Military grade transmission path_loss_exponent = 2.3 rssi = base_rssi - (20 * path_loss_exponent * math.log10(distance_km)) return max(int(rssi), -100) # Verify starting position is undetectable if is_detectable(start_distance): print("āš ļø WARNING: Starting position is within detection range!") else: print("āœ… Starting position confirmed undetectable") print("\n" + "=" * 70) print("STARTING ORLAN APPROACH SIMULATION") print("=" * 70) # Simulation parameters total_steps = 50 final_distance = 0.0 # Directly overhead drone_id = 3000 # Unique ID for Orlan test detection_started = False critical_alerts_started = False try: for step in range(1, total_steps + 1): # Calculate current distance (exponential approach) progress = step / total_steps distance_km = start_distance * (1 - progress) ** 1.8 + final_distance * progress ** 1.8 # Calculate current position current_lat = start_lat + (target_device['lat'] - start_lat) * progress current_lon = start_lon + (target_device['lon'] - start_lon) * progress # Check if detectable detectable = is_detectable(distance_km) rssi = calculate_orlan_rssi(distance_km) if detectable: if not detection_started: print(f"\nšŸ” FIRST DETECTION at {distance_km:.1f}km - Orlan entered detection range!") detection_started = True # Send detection using EXACT standard payload format detection_data = { "device_id": target_device["id"], "geo_lat": current_lat, "geo_lon": current_lon, "device_timestamp": int(time.time() * 1000), "drone_type": 1, # Orlan/Military type "rssi": rssi, "freq": 24, # 2.4 GHz "drone_id": drone_id, "_distance": distance_km # Helper field for logging } # Use centralized send_detection function for consistent debug output success = send_detection(detection_data) if success: status = "🚨 CRITICAL ALERT" if distance_km <= 5 else "āš ļø DETECTED" if not DEBUG_MODE: # Avoid duplicate output when debugging print(f"{status} - Step {step}/{total_steps}: Distance={distance_km:.1f}km, RSSI={rssi}dBm") # Show escalation messages if distance_km <= 5 and not critical_alerts_started: print(f"🚨 CRITICAL ALERT: Orlan within {distance_km:.1f}km - AUTO-ESCALATED!") critical_alerts_started = True elif distance_km <= 1: print(f"šŸ”„ IMMEDIATE THREAT: Orlan within facility perimeter!") elif distance_km <= 0.1: print(f"šŸ’„ DIRECTLY OVERHEAD: Orlan at {distance_km*1000:.0f}m altitude!") else: print(f"āŒ Failed to send Orlan detection at step {step}") continue print(f"āŒ Error sending detection: {e}") else: # Outside detection range print(f"šŸ“” Step {step}/{total_steps}: Distance={distance_km:.1f}km (undetectable, RSSI={rssi}dBm)") # Variable delay based on distance if distance_km > 30: delay = 2.0 elif distance_km > 15: delay = 1.5 elif distance_km > 5: delay = 1.0 else: delay = 0.8 time.sleep(delay) print("\n" + "=" * 70) print("🚨 ORLAN DETECTION TEST COMPLETED") print("=" * 70) print("Test Summary:") print(f"• Starting distance: {start_distance:.1f}km (undetectable)") print(f"• Detection range: ~{max_detection_range:.1f}km") print(f"• Final position: Directly overhead") print(f"• Target device: {target_device['name']}") print(f"• Total steps: {total_steps}") print("") print("Expected Results:") print("āœ… No detections sent while >25km away") print("āœ… First detection when entering detection range") print("āœ… Critical alerts auto-escalated for Orlan (type 1)") print("āœ… All Orlan alerts bypass distance/RSSI rules") print("āœ… Real-time tracking visible on dashboard") print("=" * 70) except KeyboardInterrupt: print("\n\nāš ļø Orlan detection test interrupted by user") except Exception as e: print(f"\nāŒ Error during Orlan detection test: {e}") def simulate_realistic_scenario(duration_minutes=10): """Simulate realistic drone scenario with persistent tracking""" print(f"🚁 Starting realistic drone simulation for {duration_minutes} minutes...") print("šŸ“Š Scenario: Multiple drones with persistent tracking, RSSI changes, movement patterns") print("=" * 80) simulator = DroneSimulator() start_time = time.time() end_time = start_time + (duration_minutes * 60) # Force create initial drone for testing if len(DEVICES) > 0: device = DEVICES[0] # Use first device initial_drone = simulator.create_new_drone(device) simulator.active_drones[initial_drone["id"]] = initial_drone print(f"šŸŽÆ Test drone created: {DRONE_TYPES[initial_drone['type']]['name']} near {device['name']}") detection_count = 0 last_update = start_time try: while time.time() < end_time: current_time = time.time() time_delta = current_time - last_update last_update = current_time # Randomly spawn new drones (1-15% chance per cycle) if random.random() < 0.15 and len(simulator.active_drones) < 8: device = random.choice(DEVICES) new_drone = simulator.create_new_drone(device) simulator.active_drones[new_drone["id"]] = new_drone print(f"šŸ†• New {DRONE_TYPES[new_drone['type']]['name']} spawned near {device['name']}") # Debug: Print active drones count if len(simulator.active_drones) == 0: print("āš ļø No active drones - waiting for spawns...") # Update all active drones drones_to_remove = [] for drone_id, drone in simulator.active_drones.items(): simulator.update_drone_position(drone, time_delta) # Check if drone should be removed (too far or timeout) age_minutes = (current_time - drone["created_at"]) / 60 if age_minutes > 15 or drone["detection_count"] > 50: drones_to_remove.append(drone_id) continue # Debug: Print drone position print(f"šŸ” Checking drone {drone_id} at ({drone['lat']:.4f}, {drone['lon']:.4f})") # Check detection for each device for device in DEVICES: should_detect, distance = simulator.should_detect_drone(drone, device) print(f" šŸ“” Device {device['name']}: distance={distance:.2f}km, should_detect={should_detect}") if should_detect: detection = simulator.generate_detection(drone, device, distance) detection["_distance"] = f"{distance:.1f}" # For logging only if send_detection(detection): detection_count += 1 # Remove expired drones for drone_id in drones_to_remove: drone = simulator.active_drones[drone_id] print(f"šŸ›¬ {DRONE_TYPES[drone['type']]['name']} {drone_id} finished ({drone['detection_count']} detections)") del simulator.active_drones[drone_id] # Status update elapsed_minutes = (current_time - start_time) / 60 if int(elapsed_minutes) % 2 == 0 and elapsed_minutes > 0: active_count = len(simulator.active_drones) if active_count > 0: print(f"šŸ“ Status: {active_count} active drones, {detection_count} total detections") time.sleep(3) # 3 second cycle time except KeyboardInterrupt: print("\nā¹ļø Simulation stopped by user") elapsed = time.time() - start_time print(f"\nšŸ“ˆ Simulation Summary:") print(f" • Duration: {elapsed/60:.1f} minutes") print(f" • Total detections: {detection_count}") print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute") print(f" • Active drones at end: {len(simulator.active_drones)}") def test_api_health(): """Test if the API is responding""" url = f"{API_BASE_URL}/health" try: response = requests.get(url, timeout=5) if response.status_code == 200: print("āœ… API health check passed") return True else: print(f"āŒ API health check failed: {response.status_code}") return False except requests.exceptions.RequestException as e: print(f"āŒ Cannot reach API: {e}") return False def main(): print("🚁 Realistic Drone Detection Simulator") print("=" * 50) # Fetch devices from API first print("šŸ“” Fetching active devices from API...") if not fetch_devices(): print("āŒ Cannot fetch devices from API. Please check if the system is running.") return if len(DEVICES) == 0: print("āŒ No active devices found. Please add some detector devices first.") return # Test API connectivity if not test_api_health(): print("Cannot connect to API. Please check if the system is running.") return print("\nChoose simulation type:") print("1. Realistic scenario (persistent drones, RSSI changes, movement)") print("2. Single approaching drone (watch RSSI strengthen)") print("3. Departing drone (watch RSSI weaken)") print("4. 🚨 Orlan military drone detection test (50km approach)") try: choice = input("\nEnter choice (1-4): ").strip() if choice == "1": duration = input("Duration in minutes (default: 10): ").strip() duration = int(duration) if duration else 10 simulate_realistic_scenario(duration) elif choice == "2": print("šŸŽÆ Simulating approaching drone (RSSI will strengthen)...") # Implementation for approaching drone elif choice == "3": print("šŸ›« Simulating departing drone (RSSI will weaken)...") # Implementation for departing drone elif choice == "4": simulate_orlan_detection_test() else: print("Invalid choice") except KeyboardInterrupt: print("\nšŸ‘‹ Goodbye!") except ValueError: print("Invalid input") if __name__ == "__main__": main()