#!/usr/bin/env python3 """ Realistic Drone Detection Test Script Simulates realistic drone scenarios with persistent tracking, RSSI changes, and real-world patterns. Fetches existing devices from the API and simulates drone detections around them. """ import requests import json import time import random import math from datetime import datetime, timedelta # Configuration API_BASE_URL = "http://selfservice.cqers.com/drones/api" # Alternative for local testing: "http://localhost:3002/api" # Global variable to store devices fetched from API DEVICES = [] # Realistic drone types with characteristics DRONE_TYPES = { 0: {"name": "DJI Mavic (Consumer)", "max_speed": 65, "typical_rssi": -60, "freq": [2400, 2450]}, 1: {"name": "Orlan Military Drone", "max_speed": 150, "typical_rssi": -50, "freq": [900, 2400]}, # High threat 2: {"name": "DJI Matrice (Professional)", "max_speed": 80, "typical_rssi": -55, "freq": [2400, 5800]}, 3: {"name": "Racing Drone", "max_speed": 120, "typical_rssi": -55, "freq": [2400, 5800]}, 4: {"name": "Unknown/Custom", "max_speed": 70, "typical_rssi": -65, "freq": [2400, 2450]} } def fetch_devices(): """Fetch active devices from the API""" global DEVICES try: response = requests.get(f"{API_BASE_URL}/devices/map") if response.status_code == 200: data = response.json() api_devices = data.get('data', []) # Convert API device format to simulator format DEVICES = [] for device in api_devices: DEVICES.append({ "id": device["id"], "name": device["name"], "lat": float(device["geo_lat"]), "lon": float(device["geo_lon"]), "coverage_radius": 5.0 # Default 5km coverage radius }) print(f"āœ… Fetched {len(DEVICES)} active devices from API") for device in DEVICES: print(f" - {device['name']} at ({device['lat']:.4f}, {device['lon']:.4f})") return True else: print(f"āŒ Failed to fetch devices: {response.status_code}") return False except Exception as e: print(f"āŒ Error fetching devices: {e}") return False class DroneSimulator: def __init__(self): self.active_drones = {} # Track persistent drones self.drone_counter = 1000 def calculate_rssi(self, distance_km, base_rssi=-60): """Calculate realistic RSSI based on distance (Free Space Path Loss)""" if distance_km < 0.1: # Very close return max(base_rssi + 20, -30) # Simplified FSPL: RSSI decreases ~20dB per decade of distance rssi = base_rssi - (20 * math.log10(distance_km)) # Add some realistic variation variation = random.uniform(-5, 5) final_rssi = int(rssi + variation) # Clamp to realistic values return max(min(final_rssi, -30), -100) def calculate_distance(self, lat1, lon1, lat2, lon2): """Calculate distance between two points in km""" R = 6371 # Earth radius in km lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = (math.sin(delta_lat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon/2)**2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def create_new_drone(self, device): """Create a new drone with realistic starting position""" self.drone_counter += 1 # Drone type selection with weighted probabilities # Orlan drones are rare (2% chance), consumer drones common (50%), others distributed rand = random.random() if rand < 0.02: # 2% chance for Orlan (military) drone_type = 1 print(f"🚨 GENERATING ORLAN MILITARY DRONE (ID: {self.drone_counter}) - High threat simulation!") elif rand < 0.52: # 50% chance for consumer drone_type = 0 elif rand < 0.72: # 20% chance for professional drone_type = 2 elif rand < 0.92: # 20% chance for racing drone_type = 3 else: # 8% chance for unknown drone_type = 4 # Start at edge of coverage area angle = random.uniform(0, 2 * math.pi) start_distance = device["coverage_radius"] * random.uniform(0.8, 1.0) start_lat = device["lat"] + (start_distance / 111.0) * math.cos(angle) start_lon = device["lon"] + (start_distance / (111.0 * math.cos(math.radians(device["lat"])))) * math.sin(angle) # Movement pattern patterns = ["approaching", "patrolling", "departing", "hovering"] movement_pattern = random.choice(patterns) drone = { "id": self.drone_counter, "type": drone_type, "lat": start_lat, "lon": start_lon, "target_lat": device["lat"] if movement_pattern == "approaching" else start_lat, "target_lon": device["lon"] if movement_pattern == "approaching" else start_lon, "speed_kmh": DRONE_TYPES[drone_type]["max_speed"] * random.uniform(0.3, 0.8), "pattern": movement_pattern, "created_at": time.time(), "last_detection": 0, "detection_count": 0, "is_active": True } return drone def update_drone_position(self, drone, time_delta_seconds): """Update drone position based on movement pattern""" if not drone["is_active"]: return speed_ms = drone["speed_kmh"] * 1000 / 3600 # Convert to m/s distance_m = speed_ms * time_delta_seconds if drone["pattern"] == "approaching": # Move towards the device center current_distance = self.calculate_distance( drone["lat"], drone["lon"], drone["target_lat"], drone["target_lon"] ) * 1000 # Convert to meters if current_distance > 100: # Still approaching lat_diff = drone["target_lat"] - drone["lat"] lon_diff = drone["target_lon"] - drone["lon"] total_diff = math.sqrt(lat_diff**2 + lon_diff**2) if total_diff > 0: lat_step = (lat_diff / total_diff) * (distance_m / 111000) lon_step = (lon_diff / total_diff) * (distance_m / 111000) drone["lat"] += lat_step drone["lon"] += lon_step else: # Reached target, switch to hovering drone["pattern"] = "hovering" elif drone["pattern"] == "hovering": # Small random movements drone["lat"] += random.uniform(-0.0001, 0.0001) drone["lon"] += random.uniform(-0.0001, 0.0001) elif drone["pattern"] == "patrolling": # Circular or figure-8 movement t = (time.time() - drone["created_at"]) / 100 # Slow circular motion radius = 0.002 # Small patrol radius drone["lat"] = drone["target_lat"] + radius * math.sin(t) drone["lon"] = drone["target_lon"] + radius * math.cos(t) elif drone["pattern"] == "departing": # Move away from device lat_diff = drone["lat"] - drone["target_lat"] lon_diff = drone["lon"] - drone["target_lon"] total_diff = math.sqrt(lat_diff**2 + lon_diff**2) if total_diff > 0: lat_step = (lat_diff / total_diff) * (distance_m / 111000) lon_step = (lon_diff / total_diff) * (distance_m / 111000) drone["lat"] += lat_step drone["lon"] += lon_step def should_detect_drone(self, drone, device): """Determine if device should detect this drone""" distance_km = self.calculate_distance( drone["lat"], drone["lon"], device["lat"], device["lon"] ) # Detection probability decreases with distance if distance_km > device["coverage_radius"]: return False, distance_km # Higher chance of detection when closer detection_prob = 1.0 - (distance_km / device["coverage_radius"]) * 0.7 # Factor in time since last detection (avoid spam) time_since_last = time.time() - drone.get("last_detection", 0) if time_since_last < 2: # Minimum 2 seconds between detections detection_prob *= 0.1 return random.random() < detection_prob, distance_km def generate_detection(self, drone, device, distance_km): """Generate realistic detection data""" drone_info = DRONE_TYPES[drone["type"]] # Calculate RSSI based on distance rssi = self.calculate_rssi(distance_km, drone_info["typical_rssi"]) # Select frequency freq = random.choice(drone_info["freq"]) # Confidence decreases with distance and lower RSSI base_confidence = 0.95 - (distance_km / 10.0) * 0.3 rssi_factor = (rssi + 100) / 70 # Normalize RSSI to 0-1 confidence = max(0.5, min(0.99, base_confidence * rssi_factor)) # Signal duration varies by drone type and detection quality duration_base = 2000 if drone["pattern"] == "hovering" else 1000 duration = duration_base + random.randint(-500, 1500) detection = { "device_id": device["id"], "drone_id": drone["id"], "drone_type": drone["type"], "rssi": rssi, "freq": freq, "geo_lat": drone["lat"], "geo_lon": drone["lon"], "device_timestamp": int(time.time() * 1000), "confidence_level": round(confidence, 2), "signal_duration": max(500, duration) } # Update drone tracking drone["last_detection"] = time.time() drone["detection_count"] += 1 return detection def send_detection(detection_data): """Send detection data to the API""" url = f"{API_BASE_URL}/detections" headers = { "Content-Type": "application/json" } try: response = requests.post(url, json=detection_data, headers=headers, timeout=10) if response.status_code == 201: print(f"āœ… Device {detection_data['device_id']}: Drone {detection_data['drone_id']} " f"(RSSI: {detection_data['rssi']}dBm, Dist: ~{detection_data.get('_distance', 'N/A')}km)") return True else: print(f"āŒ Failed: {response.status_code} - {response.text}") return False except requests.exceptions.RequestException as e: print(f"āŒ Network error: {e}") return False def simulate_realistic_scenario(duration_minutes=10): """Simulate realistic drone scenario with persistent tracking""" print(f"🚁 Starting realistic drone simulation for {duration_minutes} minutes...") print("šŸ“Š Scenario: Multiple drones with persistent tracking, RSSI changes, movement patterns") print("=" * 80) simulator = DroneSimulator() start_time = time.time() end_time = start_time + (duration_minutes * 60) # Force create initial drone for testing if len(DEVICES) > 0: device = DEVICES[0] # Use first device initial_drone = simulator.create_new_drone(device) simulator.active_drones[initial_drone["id"]] = initial_drone print(f"šŸŽÆ Test drone created: {DRONE_TYPES[initial_drone['type']]['name']} near {device['name']}") detection_count = 0 last_update = start_time try: while time.time() < end_time: current_time = time.time() time_delta = current_time - last_update last_update = current_time # Randomly spawn new drones (1-15% chance per cycle) if random.random() < 0.15 and len(simulator.active_drones) < 8: device = random.choice(DEVICES) new_drone = simulator.create_new_drone(device) simulator.active_drones[new_drone["id"]] = new_drone print(f"šŸ†• New {DRONE_TYPES[new_drone['type']]['name']} spawned near {device['name']}") # Debug: Print active drones count if len(simulator.active_drones) == 0: print("āš ļø No active drones - waiting for spawns...") # Update all active drones drones_to_remove = [] for drone_id, drone in simulator.active_drones.items(): simulator.update_drone_position(drone, time_delta) # Check if drone should be removed (too far or timeout) age_minutes = (current_time - drone["created_at"]) / 60 if age_minutes > 15 or drone["detection_count"] > 50: drones_to_remove.append(drone_id) continue # Debug: Print drone position print(f"šŸ” Checking drone {drone_id} at ({drone['lat']:.4f}, {drone['lon']:.4f})") # Check detection for each device for device in DEVICES: should_detect, distance = simulator.should_detect_drone(drone, device) print(f" šŸ“” Device {device['name']}: distance={distance:.2f}km, should_detect={should_detect}") if should_detect: detection = simulator.generate_detection(drone, device, distance) detection["_distance"] = f"{distance:.1f}" # For logging only if send_detection(detection): detection_count += 1 # Remove expired drones for drone_id in drones_to_remove: drone = simulator.active_drones[drone_id] print(f"šŸ›¬ {DRONE_TYPES[drone['type']]['name']} {drone_id} finished ({drone['detection_count']} detections)") del simulator.active_drones[drone_id] # Status update elapsed_minutes = (current_time - start_time) / 60 if int(elapsed_minutes) % 2 == 0 and elapsed_minutes > 0: active_count = len(simulator.active_drones) if active_count > 0: print(f"šŸ“ Status: {active_count} active drones, {detection_count} total detections") time.sleep(3) # 3 second cycle time except KeyboardInterrupt: print("\nā¹ļø Simulation stopped by user") elapsed = time.time() - start_time print(f"\nšŸ“ˆ Simulation Summary:") print(f" • Duration: {elapsed/60:.1f} minutes") print(f" • Total detections: {detection_count}") print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute") print(f" • Active drones at end: {len(simulator.active_drones)}") def test_api_health(): """Test if the API is responding""" url = f"{API_BASE_URL}/health" try: response = requests.get(url, timeout=5) if response.status_code == 200: print("āœ… API health check passed") return True else: print(f"āŒ API health check failed: {response.status_code}") return False except requests.exceptions.RequestException as e: print(f"āŒ Cannot reach API: {e}") return False def main(): print("🚁 Realistic Drone Detection Simulator") print("=" * 50) # Fetch devices from API first print("šŸ“” Fetching active devices from API...") if not fetch_devices(): print("āŒ Cannot fetch devices from API. Please check if the system is running.") return if len(DEVICES) == 0: print("āŒ No active devices found. Please add some detector devices first.") return # Test API connectivity if not test_api_health(): print("Cannot connect to API. Please check if the system is running.") return print("\nChoose simulation type:") print("1. Realistic scenario (persistent drones, RSSI changes, movement)") print("2. Single approaching drone (watch RSSI strengthen)") print("3. Departing drone (watch RSSI weaken)") try: choice = input("\nEnter choice (1-3): ").strip() if choice == "1": duration = input("Duration in minutes (default: 10): ").strip() duration = int(duration) if duration else 10 simulate_realistic_scenario(duration) elif choice == "2": print("šŸŽÆ Simulating approaching drone (RSSI will strengthen)...") # Implementation for approaching drone elif choice == "3": print("šŸ›« Simulating departing drone (RSSI will weaken)...") # Implementation for departing drone else: print("Invalid choice") except KeyboardInterrupt: print("\nšŸ‘‹ Goodbye!") except ValueError: print("Invalid input") if __name__ == "__main__": main()