739 lines
30 KiB
Python
739 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Realistic Drone Detection Test Script
|
||
Simulates realistic drone scenarios with persistent tracking, RSSI changes, and real-world patterns.
|
||
Fetches existing devices from the API and simulates drone detections around them.
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
import time
|
||
import random
|
||
import math
|
||
import argparse
|
||
from datetime import datetime, timedelta
|
||
|
||
# Configuration
|
||
API_BASE_URL = "http://selfservice.cqers.com/drones/api"
|
||
# Alternative for local testing: "http://localhost:3002/api"
|
||
|
||
# Debug configuration
|
||
DEBUG_MODE = False # Set to True to enable payload debugging
|
||
|
||
# Global variable to store devices fetched from API
|
||
DEVICES = []
|
||
|
||
# Realistic drone types with characteristics
|
||
DRONE_TYPES = {
|
||
0: {"name": "DJI Mavic (Consumer)", "max_speed": 65, "typical_rssi": -60, "freq": [2400, 2450]},
|
||
1: {"name": "Orlan Military Drone", "max_speed": 150, "typical_rssi": -50, "freq": [900, 2400]}, # High threat
|
||
2: {"name": "DJI Matrice (Professional)", "max_speed": 80, "typical_rssi": -55, "freq": [2400, 5800]},
|
||
3: {"name": "Racing Drone", "max_speed": 120, "typical_rssi": -55, "freq": [2400, 5800]},
|
||
4: {"name": "Unknown/Custom", "max_speed": 70, "typical_rssi": -65, "freq": [2400, 2450]}
|
||
}
|
||
|
||
def fetch_devices():
|
||
"""Fetch active devices from the API"""
|
||
global DEVICES
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/devices/map")
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
api_devices = data.get('data', [])
|
||
|
||
# Convert API device format to simulator format
|
||
DEVICES = []
|
||
for device in api_devices:
|
||
DEVICES.append({
|
||
"id": device["id"],
|
||
"name": device["name"],
|
||
"lat": float(device["geo_lat"]),
|
||
"lon": float(device["geo_lon"]),
|
||
"coverage_radius": 5.0 # Default 5km coverage radius
|
||
})
|
||
|
||
print(f"✅ Fetched {len(DEVICES)} active devices from API")
|
||
for device in DEVICES:
|
||
print(f" - {device['name']} at ({device['lat']:.4f}, {device['lon']:.4f})")
|
||
return True
|
||
else:
|
||
print(f"❌ Failed to fetch devices: {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ Error fetching devices: {e}")
|
||
return False
|
||
|
||
class DroneSimulator:
|
||
def __init__(self):
|
||
self.active_drones = {} # Track persistent drones
|
||
self.drone_counter = 1000
|
||
|
||
def calculate_rssi(self, distance_km, base_rssi=-60):
|
||
"""Calculate realistic RSSI based on distance (Free Space Path Loss)"""
|
||
if distance_km < 0.1: # Very close
|
||
return max(base_rssi + 20, -30)
|
||
|
||
# Simplified FSPL: RSSI decreases ~20dB per decade of distance
|
||
rssi = base_rssi - (20 * math.log10(distance_km))
|
||
|
||
# Add some realistic variation
|
||
variation = random.uniform(-5, 5)
|
||
final_rssi = int(rssi + variation)
|
||
|
||
# Clamp to realistic values
|
||
return max(min(final_rssi, -30), -100)
|
||
|
||
def calculate_distance(self, lat1, lon1, lat2, lon2):
|
||
"""Calculate distance between two points in km"""
|
||
R = 6371 # Earth radius in km
|
||
|
||
lat1_rad = math.radians(lat1)
|
||
lat2_rad = math.radians(lat2)
|
||
delta_lat = math.radians(lat2 - lat1)
|
||
delta_lon = math.radians(lon2 - lon1)
|
||
|
||
a = (math.sin(delta_lat/2)**2 +
|
||
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon/2)**2)
|
||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||
|
||
return R * c
|
||
|
||
def create_new_drone(self, device):
|
||
"""Create a new drone with realistic starting position"""
|
||
self.drone_counter += 1
|
||
|
||
# Drone type selection with weighted probabilities
|
||
# Orlan drones are rare (2% chance), consumer drones common (50%), others distributed
|
||
rand = random.random()
|
||
if rand < 0.02: # 2% chance for Orlan (military)
|
||
drone_type = 1
|
||
print(f"🚨 GENERATING ORLAN MILITARY DRONE (ID: {self.drone_counter}) - High threat simulation!")
|
||
elif rand < 0.52: # 50% chance for consumer
|
||
drone_type = 0
|
||
elif rand < 0.72: # 20% chance for professional
|
||
drone_type = 2
|
||
elif rand < 0.92: # 20% chance for racing
|
||
drone_type = 3
|
||
else: # 8% chance for unknown
|
||
drone_type = 4
|
||
|
||
# Start at edge of coverage area
|
||
angle = random.uniform(0, 2 * math.pi)
|
||
start_distance = device["coverage_radius"] * random.uniform(0.8, 1.0)
|
||
|
||
start_lat = device["lat"] + (start_distance / 111.0) * math.cos(angle)
|
||
start_lon = device["lon"] + (start_distance / (111.0 * math.cos(math.radians(device["lat"])))) * math.sin(angle)
|
||
|
||
# Movement pattern
|
||
patterns = ["approaching", "patrolling", "departing", "hovering"]
|
||
movement_pattern = random.choice(patterns)
|
||
|
||
drone = {
|
||
"id": self.drone_counter,
|
||
"type": drone_type,
|
||
"lat": start_lat,
|
||
"lon": start_lon,
|
||
"target_lat": device["lat"] if movement_pattern == "approaching" else start_lat,
|
||
"target_lon": device["lon"] if movement_pattern == "approaching" else start_lon,
|
||
"speed_kmh": DRONE_TYPES[drone_type]["max_speed"] * random.uniform(0.3, 0.8),
|
||
"pattern": movement_pattern,
|
||
"created_at": time.time(),
|
||
"last_detection": 0,
|
||
"detection_count": 0,
|
||
"is_active": True
|
||
}
|
||
|
||
return drone
|
||
|
||
def update_drone_position(self, drone, time_delta_seconds):
|
||
"""Update drone position based on movement pattern"""
|
||
if not drone["is_active"]:
|
||
return
|
||
|
||
speed_ms = drone["speed_kmh"] * 1000 / 3600 # Convert to m/s
|
||
distance_m = speed_ms * time_delta_seconds
|
||
|
||
if drone["pattern"] == "approaching":
|
||
# Move towards the device center
|
||
current_distance = self.calculate_distance(
|
||
drone["lat"], drone["lon"],
|
||
drone["target_lat"], drone["target_lon"]
|
||
) * 1000 # Convert to meters
|
||
|
||
if current_distance > 100: # Still approaching
|
||
lat_diff = drone["target_lat"] - drone["lat"]
|
||
lon_diff = drone["target_lon"] - drone["lon"]
|
||
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
|
||
|
||
if total_diff > 0:
|
||
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
|
||
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
|
||
|
||
drone["lat"] += lat_step
|
||
drone["lon"] += lon_step
|
||
else:
|
||
# Reached target, switch to hovering
|
||
drone["pattern"] = "hovering"
|
||
|
||
elif drone["pattern"] == "hovering":
|
||
# Small random movements
|
||
drone["lat"] += random.uniform(-0.0001, 0.0001)
|
||
drone["lon"] += random.uniform(-0.0001, 0.0001)
|
||
|
||
elif drone["pattern"] == "patrolling":
|
||
# Circular or figure-8 movement
|
||
t = (time.time() - drone["created_at"]) / 100 # Slow circular motion
|
||
radius = 0.002 # Small patrol radius
|
||
drone["lat"] = drone["target_lat"] + radius * math.sin(t)
|
||
drone["lon"] = drone["target_lon"] + radius * math.cos(t)
|
||
|
||
elif drone["pattern"] == "departing":
|
||
# Move away from device
|
||
lat_diff = drone["lat"] - drone["target_lat"]
|
||
lon_diff = drone["lon"] - drone["target_lon"]
|
||
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
|
||
|
||
if total_diff > 0:
|
||
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
|
||
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
|
||
|
||
drone["lat"] += lat_step
|
||
drone["lon"] += lon_step
|
||
|
||
def should_detect_drone(self, drone, device):
|
||
"""Determine if device should detect this drone"""
|
||
distance_km = self.calculate_distance(
|
||
drone["lat"], drone["lon"],
|
||
device["lat"], device["lon"]
|
||
)
|
||
|
||
# Detection probability decreases with distance
|
||
if distance_km > device["coverage_radius"]:
|
||
return False, distance_km
|
||
|
||
# Higher chance of detection when closer
|
||
detection_prob = 1.0 - (distance_km / device["coverage_radius"]) * 0.7
|
||
|
||
# Factor in time since last detection (avoid spam)
|
||
time_since_last = time.time() - drone.get("last_detection", 0)
|
||
if time_since_last < 2: # Minimum 2 seconds between detections
|
||
detection_prob *= 0.1
|
||
|
||
return random.random() < detection_prob, distance_km
|
||
|
||
def generate_detection(self, drone, device, distance_km):
|
||
"""Generate realistic detection data"""
|
||
drone_info = DRONE_TYPES[drone["type"]]
|
||
|
||
# Calculate RSSI based on distance
|
||
rssi = self.calculate_rssi(distance_km, drone_info["typical_rssi"])
|
||
|
||
# Select frequency
|
||
freq = random.choice(drone_info["freq"])
|
||
|
||
# Confidence decreases with distance and lower RSSI
|
||
base_confidence = 0.95 - (distance_km / 10.0) * 0.3
|
||
rssi_factor = (rssi + 100) / 70 # Normalize RSSI to 0-1
|
||
confidence = max(0.5, min(0.99, base_confidence * rssi_factor))
|
||
|
||
# Signal duration varies by drone type and detection quality
|
||
duration_base = 2000 if drone["pattern"] == "hovering" else 1000
|
||
duration = duration_base + random.randint(-500, 1500)
|
||
|
||
detection = {
|
||
"device_id": device["id"],
|
||
"geo_lat": device["lat"], # Device detector location, NOT drone location
|
||
"geo_lon": device["lon"], # Device detector location, NOT drone location
|
||
"device_timestamp": int(time.time() * 1000),
|
||
"drone_type": drone["type"],
|
||
"rssi": rssi,
|
||
"freq": freq,
|
||
"drone_id": drone["id"]
|
||
}
|
||
|
||
# Update drone tracking
|
||
drone["last_detection"] = time.time()
|
||
drone["detection_count"] += 1
|
||
|
||
return detection
|
||
|
||
def send_heartbeat(device_id):
|
||
"""Send heartbeat to keep device online"""
|
||
global DEBUG_MODE
|
||
url = f"{API_BASE_URL}/heartbeat"
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
heartbeat_data = {
|
||
"type": "heartbeat",
|
||
"key": f"device_{device_id}",
|
||
"device_id": device_id,
|
||
"signal_strength": random.randint(-70, -30), # Simulate signal strength
|
||
"battery_level": random.randint(70, 100), # Simulate battery level
|
||
"temperature": random.randint(20, 45), # Simulate temperature
|
||
"uptime": random.randint(3600, 86400), # Simulate uptime
|
||
"memory_usage": random.randint(30, 80), # Simulate memory usage
|
||
"firmware_version": "1.0.0"
|
||
}
|
||
|
||
# Debug output for heartbeat
|
||
if DEBUG_MODE:
|
||
print("\n" + "-"*40)
|
||
print("💓 DEBUG: Sending heartbeat to backend")
|
||
print("-"*40)
|
||
print(f"URL: {url}")
|
||
print("Heartbeat payload:")
|
||
print(json.dumps(heartbeat_data, indent=2, sort_keys=True))
|
||
print("-"*40 + "\n")
|
||
|
||
try:
|
||
response = requests.post(url, json=heartbeat_data, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 201:
|
||
if DEBUG_MODE:
|
||
print(f"💓 Device {device_id}: Heartbeat sent successfully")
|
||
return True
|
||
else:
|
||
if DEBUG_MODE:
|
||
print(f"❌ Heartbeat failed: {response.status_code} - {response.text}")
|
||
return False
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
if DEBUG_MODE:
|
||
print(f"❌ Heartbeat network error: {e}")
|
||
return False
|
||
|
||
def send_detection(detection_data):
|
||
"""Send detection data to the API"""
|
||
global DEBUG_MODE
|
||
url = f"{API_BASE_URL}/detections"
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# Debug output - show complete payload being sent
|
||
if DEBUG_MODE:
|
||
print("\n" + "="*60)
|
||
print("🐛 DEBUG: Sending payload to backend")
|
||
print("="*60)
|
||
print(f"URL: {url}")
|
||
print(f"Headers: {json.dumps(headers, indent=2)}")
|
||
print("Payload:")
|
||
print(json.dumps(detection_data, indent=2, sort_keys=True))
|
||
print("="*60 + "\n")
|
||
|
||
try:
|
||
response = requests.post(url, json=detection_data, headers=headers, timeout=10)
|
||
|
||
if response.status_code == 201:
|
||
# Estimate distance from RSSI for display purposes only
|
||
estimated_distance = "N/A"
|
||
if 'rssi' in detection_data:
|
||
rssi = detection_data['rssi']
|
||
if rssi > -40:
|
||
estimated_distance = "<0.1km"
|
||
elif rssi > -60:
|
||
estimated_distance = "~0.5-2km"
|
||
elif rssi > -80:
|
||
estimated_distance = "~2-8km"
|
||
elif rssi > -90:
|
||
estimated_distance = "~8-15km"
|
||
else:
|
||
estimated_distance = ">15km"
|
||
|
||
print(f"✅ Device {detection_data['device_id']}: Drone {detection_data['drone_id']} "
|
||
f"(RSSI: {detection_data['rssi']}dBm, Est. distance: {estimated_distance})")
|
||
return True
|
||
else:
|
||
print(f"❌ Failed: {response.status_code} - {response.text}")
|
||
if DEBUG_MODE:
|
||
print(f"🐛 DEBUG: Response body: {response.text}")
|
||
return False
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"❌ Network error: {e}")
|
||
if DEBUG_MODE:
|
||
print(f"🐛 DEBUG: Exception details: {str(e)}")
|
||
return False
|
||
|
||
def simulate_orlan_detection_test():
|
||
"""
|
||
Test Orlan military drone detection system
|
||
Spawns an Orlan 50km from device, slowly approaches until hovering overhead
|
||
Tests critical alert escalation for high-threat drones
|
||
"""
|
||
print("=" * 70)
|
||
print("🚨 ORLAN MILITARY DRONE DETECTION TEST")
|
||
print("=" * 70)
|
||
print("Test Scenario:")
|
||
print("• Starting position: 50km away (undetectable)")
|
||
print("• Drone type: Orlan Military (type 1)")
|
||
print("• Approach pattern: Straight line to target")
|
||
print("• Detection threshold: ~25km range")
|
||
print("• Critical alerts: Auto-escalated for Orlan type")
|
||
print("• End position: Directly overhead (0m)")
|
||
print("=" * 70)
|
||
|
||
if not DEVICES:
|
||
print("❌ No devices available for testing!")
|
||
return
|
||
|
||
# Use the first device as target
|
||
target_device = DEVICES[0]
|
||
print(f"🎯 Target Device: {target_device['name']}")
|
||
print(f"📍 Target Location: {target_device['lat']:.6f}, {target_device['lon']:.6f}")
|
||
|
||
# Calculate starting position 50km away (due north)
|
||
start_distance = 50.0 # km
|
||
start_lat = target_device['lat'] + (start_distance / 111.0) # ~111km per degree latitude
|
||
start_lon = target_device['lon'] # Same longitude
|
||
|
||
print(f"🛫 Orlan Starting Position: {start_lat:.6f}, {start_lon:.6f}")
|
||
print(f"📏 Initial Distance: {start_distance:.1f}km")
|
||
|
||
# Detection range parameters
|
||
max_detection_range = 25.0 # km
|
||
min_rssi_threshold = -95
|
||
|
||
def is_detectable(distance):
|
||
"""Check if drone is within detectable range"""
|
||
if distance > max_detection_range:
|
||
return False
|
||
# Calculate RSSI for Orlan (military grade)
|
||
base_rssi = -50 # Stronger than civilian drones
|
||
rssi = base_rssi - (20 * 2.3 * math.log10(max(distance, 0.001)))
|
||
return rssi >= min_rssi_threshold
|
||
|
||
def calculate_orlan_rssi(distance_km):
|
||
"""Calculate RSSI for Orlan military drone"""
|
||
if distance_km < 0.001:
|
||
return -25
|
||
base_rssi = -50 # Military grade transmission
|
||
path_loss_exponent = 2.3
|
||
rssi = base_rssi - (20 * path_loss_exponent * math.log10(distance_km))
|
||
return max(int(rssi), -100)
|
||
|
||
# Verify starting position is undetectable
|
||
if is_detectable(start_distance):
|
||
print("⚠️ WARNING: Starting position is within detection range!")
|
||
else:
|
||
print("✅ Starting position confirmed undetectable")
|
||
|
||
print("\n" + "=" * 70)
|
||
print("STARTING ORLAN APPROACH SIMULATION")
|
||
print("💓 Sending initial heartbeat to keep device online...")
|
||
print("=" * 70)
|
||
|
||
# Send initial heartbeat to target device
|
||
send_heartbeat(target_device['id'])
|
||
|
||
# Simulation parameters
|
||
total_steps = 50
|
||
final_distance = 0.0 # Directly overhead
|
||
drone_id = 3000 # Unique ID for Orlan test
|
||
last_heartbeat = time.time()
|
||
heartbeat_interval = 60 # Send heartbeat every 60 seconds
|
||
|
||
detection_started = False
|
||
critical_alerts_started = False
|
||
|
||
try:
|
||
for step in range(1, total_steps + 1):
|
||
current_time = time.time()
|
||
|
||
# Send periodic heartbeat to keep device online
|
||
if current_time - last_heartbeat >= heartbeat_interval:
|
||
print("💓 Sending heartbeat to maintain device status...")
|
||
send_heartbeat(target_device['id'])
|
||
last_heartbeat = current_time
|
||
|
||
# Calculate current distance (exponential approach)
|
||
progress = step / total_steps
|
||
distance_km = start_distance * (1 - progress) ** 1.8 + final_distance * progress ** 1.8
|
||
|
||
# Calculate current position
|
||
current_lat = start_lat + (target_device['lat'] - start_lat) * progress
|
||
current_lon = start_lon + (target_device['lon'] - start_lon) * progress
|
||
|
||
# Check if detectable
|
||
detectable = is_detectable(distance_km)
|
||
rssi = calculate_orlan_rssi(distance_km)
|
||
|
||
if detectable:
|
||
if not detection_started:
|
||
print(f"\n🔍 FIRST DETECTION at {distance_km:.1f}km - Orlan entered detection range!")
|
||
detection_started = True
|
||
|
||
# Send detection using EXACT standard payload format
|
||
detection_data = {
|
||
"device_id": target_device["id"],
|
||
"geo_lat": target_device["lat"], # Device detector location, NOT drone location
|
||
"geo_lon": target_device["lon"], # Device detector location, NOT drone location
|
||
"device_timestamp": int(time.time() * 1000),
|
||
"drone_type": 1, # Orlan/Military type
|
||
"rssi": rssi,
|
||
"freq": 24, # 2.4 GHz
|
||
"drone_id": drone_id
|
||
}
|
||
|
||
# Use centralized send_detection function for consistent debug output
|
||
success = send_detection(detection_data)
|
||
if success:
|
||
status = "🚨 CRITICAL ALERT" if rssi > -70 else "⚠️ DETECTED" # Use RSSI for criticality
|
||
if not DEBUG_MODE: # Avoid duplicate output when debugging
|
||
print(f"{status} - Step {step}/{total_steps}: RSSI={rssi}dBm")
|
||
|
||
# Show escalation messages based on RSSI strength
|
||
if rssi > -60 and not critical_alerts_started:
|
||
print(f"🚨 CRITICAL ALERT: Strong Orlan signal (RSSI {rssi}dBm) - AUTO-ESCALATED!")
|
||
critical_alerts_started = True
|
||
elif rssi > -40:
|
||
print(f"🔥 IMMEDIATE THREAT: Very strong Orlan signal - likely overhead!")
|
||
else:
|
||
print(f"❌ Failed to send Orlan detection at step {step}")
|
||
continue
|
||
print(f"❌ Error sending detection: {e}")
|
||
else:
|
||
# Outside detection range (RSSI too weak)
|
||
print(f"📡 Step {step}/{total_steps}: RSSI={rssi}dBm (signal too weak for detection)")
|
||
|
||
# Variable delay based on RSSI strength (stronger signal = faster updates)
|
||
if rssi < -90:
|
||
delay = 2.0 # Weak signal, slow updates
|
||
elif rssi < -80:
|
||
delay = 1.5
|
||
elif rssi < -60:
|
||
delay = 1.0
|
||
else:
|
||
delay = 0.8 # Strong signal, fast updates
|
||
|
||
time.sleep(delay)
|
||
|
||
print("\n" + "=" * 70)
|
||
print("🚨 ORLAN DETECTION TEST COMPLETED")
|
||
print("=" * 70)
|
||
print("Test Summary:")
|
||
print(f"• Starting distance: {start_distance:.1f}km (undetectable)")
|
||
print(f"• Detection range: ~{max_detection_range:.1f}km")
|
||
print(f"• Final position: Directly overhead")
|
||
print(f"• Target device: {target_device['name']}")
|
||
print(f"• Total steps: {total_steps}")
|
||
print("")
|
||
print("Expected Results:")
|
||
print("✅ No detections sent while >25km away")
|
||
print("✅ First detection when entering detection range")
|
||
print("✅ Critical alerts auto-escalated for Orlan (type 1)")
|
||
print("✅ All Orlan alerts bypass distance/RSSI rules")
|
||
print("✅ Real-time tracking visible on dashboard")
|
||
print("=" * 70)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n⚠️ Orlan detection test interrupted by user")
|
||
except Exception as e:
|
||
print(f"\n❌ Error during Orlan detection test: {e}")
|
||
|
||
def simulate_realistic_scenario(duration_minutes=10):
|
||
"""Simulate realistic drone scenario with persistent tracking"""
|
||
print(f"🚁 Starting realistic drone simulation for {duration_minutes} minutes...")
|
||
print("📊 Scenario: Multiple drones with persistent tracking, RSSI changes, movement patterns")
|
||
print("💓 Heartbeat system: Keeping devices online during test")
|
||
print("=" * 80)
|
||
|
||
simulator = DroneSimulator()
|
||
start_time = time.time()
|
||
end_time = start_time + (duration_minutes * 60)
|
||
last_heartbeat = start_time
|
||
heartbeat_interval = 60 # Send heartbeat every 60 seconds (well under 300s limit)
|
||
|
||
# Send initial heartbeats to all devices
|
||
print("💓 Sending initial heartbeats to all devices...")
|
||
for device in DEVICES:
|
||
send_heartbeat(device['id'])
|
||
|
||
# Force create initial drone for testing
|
||
if len(DEVICES) > 0:
|
||
device = DEVICES[0] # Use first device
|
||
initial_drone = simulator.create_new_drone(device)
|
||
simulator.active_drones[initial_drone["id"]] = initial_drone
|
||
print(f"🎯 Test drone created: {DRONE_TYPES[initial_drone['type']]['name']} near {device['name']}")
|
||
|
||
detection_count = 0
|
||
last_update = start_time
|
||
heartbeat_count = 0
|
||
|
||
try:
|
||
while time.time() < end_time:
|
||
current_time = time.time()
|
||
time_delta = current_time - last_update
|
||
last_update = current_time
|
||
|
||
# Send periodic heartbeats to keep devices online
|
||
if current_time - last_heartbeat >= heartbeat_interval:
|
||
print(f"💓 Sending heartbeats to {len(DEVICES)} devices...")
|
||
for device in DEVICES:
|
||
if send_heartbeat(device['id']):
|
||
heartbeat_count += 1
|
||
last_heartbeat = current_time
|
||
|
||
# Randomly spawn new drones (1-15% chance per cycle)
|
||
if random.random() < 0.15 and len(simulator.active_drones) < 8:
|
||
device = random.choice(DEVICES)
|
||
new_drone = simulator.create_new_drone(device)
|
||
simulator.active_drones[new_drone["id"]] = new_drone
|
||
print(f"🆕 New {DRONE_TYPES[new_drone['type']]['name']} spawned near {device['name']}")
|
||
|
||
# Debug: Print active drones count
|
||
if len(simulator.active_drones) == 0:
|
||
print("⚠️ No active drones - waiting for spawns...")
|
||
|
||
# Update all active drones
|
||
drones_to_remove = []
|
||
for drone_id, drone in simulator.active_drones.items():
|
||
simulator.update_drone_position(drone, time_delta)
|
||
|
||
# Check if drone should be removed (too far or timeout)
|
||
age_minutes = (current_time - drone["created_at"]) / 60
|
||
if age_minutes > 15 or drone["detection_count"] > 50:
|
||
drones_to_remove.append(drone_id)
|
||
continue
|
||
|
||
# Debug: Print drone position
|
||
print(f"🔍 Checking drone {drone_id} at ({drone['lat']:.4f}, {drone['lon']:.4f})")
|
||
|
||
# Check detection for each device
|
||
for device in DEVICES:
|
||
should_detect, distance = simulator.should_detect_drone(drone, device)
|
||
|
||
print(f" 📡 Device {device['name']}: distance={distance:.2f}km, should_detect={should_detect}")
|
||
|
||
if should_detect:
|
||
detection = simulator.generate_detection(drone, device, distance)
|
||
|
||
if send_detection(detection):
|
||
detection_count += 1
|
||
|
||
# Remove expired drones
|
||
for drone_id in drones_to_remove:
|
||
drone = simulator.active_drones[drone_id]
|
||
print(f"🛬 {DRONE_TYPES[drone['type']]['name']} {drone_id} finished ({drone['detection_count']} detections)")
|
||
del simulator.active_drones[drone_id]
|
||
|
||
# Status update
|
||
elapsed_minutes = (current_time - start_time) / 60
|
||
if int(elapsed_minutes) % 2 == 0 and elapsed_minutes > 0:
|
||
active_count = len(simulator.active_drones)
|
||
if active_count > 0:
|
||
print(f"📍 Status: {active_count} active drones, {detection_count} total detections, {heartbeat_count} heartbeats sent")
|
||
|
||
time.sleep(3) # 3 second cycle time
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⏹️ Simulation stopped by user")
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"\n📈 Simulation Summary:")
|
||
print(f" • Duration: {elapsed/60:.1f} minutes")
|
||
print(f" • Total detections: {detection_count}")
|
||
print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute")
|
||
print(f" • Active drones at end: {len(simulator.active_drones)}")
|
||
|
||
def test_api_health():
|
||
"""Test if the API is responding"""
|
||
url = f"{API_BASE_URL}/health"
|
||
|
||
try:
|
||
response = requests.get(url, timeout=5)
|
||
if response.status_code == 200:
|
||
print("✅ API health check passed")
|
||
return True
|
||
else:
|
||
print(f"❌ API health check failed: {response.status_code}")
|
||
return False
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"❌ Cannot reach API: {e}")
|
||
return False
|
||
|
||
def main():
|
||
global DEBUG_MODE
|
||
|
||
print("🚁 Realistic Drone Detection Simulator")
|
||
print("=" * 50)
|
||
|
||
# Show current debug status
|
||
print(f"🐛 Debug mode status: {'ENABLED' if DEBUG_MODE else 'DISABLED'}")
|
||
|
||
# Fetch devices from API first
|
||
print("📡 Fetching active devices from API...")
|
||
if not fetch_devices():
|
||
print("❌ Cannot fetch devices from API. Please check if the system is running.")
|
||
return
|
||
|
||
if len(DEVICES) == 0:
|
||
print("❌ No active devices found. Please add some detector devices first.")
|
||
return
|
||
|
||
# Test API connectivity
|
||
if not test_api_health():
|
||
print("Cannot connect to API. Please check if the system is running.")
|
||
return
|
||
|
||
# Debug mode configuration
|
||
if not DEBUG_MODE: # Only ask if not already set via command line
|
||
debug_choice = input("\n🐛 Enable debug mode? (y/N): ").strip().lower()
|
||
if debug_choice in ['y', 'yes']:
|
||
DEBUG_MODE = True
|
||
print("✅ Debug mode ENABLED - Will show complete payloads being sent to backend")
|
||
else:
|
||
DEBUG_MODE = False
|
||
print("ℹ️ Debug mode disabled")
|
||
else:
|
||
print("✅ Debug mode ALREADY ENABLED - Will show complete payloads being sent to backend")
|
||
|
||
print("\nChoose simulation type:")
|
||
print("1. Realistic scenario (persistent drones, RSSI changes, movement)")
|
||
print("2. Single approaching drone (watch RSSI strengthen)")
|
||
print("3. Departing drone (watch RSSI weaken)")
|
||
print("4. 🚨 Orlan military drone detection test (50km approach)")
|
||
|
||
try:
|
||
choice = input("\nEnter choice (1-4): ").strip()
|
||
|
||
if choice == "1":
|
||
duration = input("Duration in minutes (default: 10): ").strip()
|
||
duration = int(duration) if duration else 10
|
||
simulate_realistic_scenario(duration)
|
||
|
||
elif choice == "2":
|
||
print("🎯 Simulating approaching drone (RSSI will strengthen)...")
|
||
# Implementation for approaching drone
|
||
|
||
elif choice == "3":
|
||
print("🛫 Simulating departing drone (RSSI will weaken)...")
|
||
# Implementation for departing drone
|
||
|
||
elif choice == "4":
|
||
simulate_orlan_detection_test()
|
||
|
||
else:
|
||
print("Invalid choice")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n👋 Goodbye!")
|
||
except ValueError:
|
||
print("Invalid input")
|
||
|
||
if __name__ == "__main__":
|
||
# Parse command line arguments
|
||
parser = argparse.ArgumentParser(description='Realistic Drone Detection Simulator')
|
||
parser.add_argument('--debug', '-d', action='store_true',
|
||
help='Enable debug mode to show complete payloads sent to backend')
|
||
args = parser.parse_args()
|
||
|
||
# Set debug mode from command line argument
|
||
if args.debug:
|
||
DEBUG_MODE = True
|
||
print("🐛 Debug mode enabled via command line argument")
|
||
|
||
main()
|