Files
drone-detector/test_drone_data.py
2025-08-18 06:04:28 +02:00

421 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Realistic Drone Detection Test Script
Simulates realistic drone scenarios with persistent tracking, RSSI changes, and real-world patterns.
Fetches existing devices from the API and simulates drone detections around them.
"""
import requests
import json
import time
import random
import math
from datetime import datetime, timedelta
# Configuration
API_BASE_URL = "http://selfservice.cqers.com/drones/api"
# Alternative for local testing: "http://localhost:3002/api"
# Global variable to store devices fetched from API
DEVICES = []
# Realistic drone types with characteristics
DRONE_TYPES = {
1: {"name": "DJI Mavic", "max_speed": 65, "typical_rssi": -60, "freq": [2400, 2450]},
2: {"name": "Racing Drone", "max_speed": 120, "typical_rssi": -55, "freq": [2400, 5800]},
3: {"name": "DJI Phantom", "max_speed": 50, "typical_rssi": -65, "freq": [2400, 2450]},
4: {"name": "Fixed Wing", "max_speed": 80, "typical_rssi": -70, "freq": [900, 2400]},
5: {"name": "Surveillance", "max_speed": 40, "typical_rssi": -75, "freq": [2400, 5800]}
}
def fetch_devices():
"""Fetch active devices from the API"""
global DEVICES
try:
response = requests.get(f"{API_BASE_URL}/devices/map")
if response.status_code == 200:
data = response.json()
api_devices = data.get('data', [])
# Convert API device format to simulator format
DEVICES = []
for device in api_devices:
DEVICES.append({
"id": device["id"],
"name": device["name"],
"lat": float(device["geo_lat"]),
"lon": float(device["geo_lon"]),
"coverage_radius": 5.0 # Default 5km coverage radius
})
print(f"✅ Fetched {len(DEVICES)} active devices from API")
for device in DEVICES:
print(f" - {device['name']} at ({device['lat']:.4f}, {device['lon']:.4f})")
return True
else:
print(f"❌ Failed to fetch devices: {response.status_code}")
return False
except Exception as e:
print(f"❌ Error fetching devices: {e}")
return False
class DroneSimulator:
def __init__(self):
self.active_drones = {} # Track persistent drones
self.drone_counter = 1000
def calculate_rssi(self, distance_km, base_rssi=-60):
"""Calculate realistic RSSI based on distance (Free Space Path Loss)"""
if distance_km < 0.1: # Very close
return max(base_rssi + 20, -30)
# Simplified FSPL: RSSI decreases ~20dB per decade of distance
rssi = base_rssi - (20 * math.log10(distance_km))
# Add some realistic variation
variation = random.uniform(-5, 5)
final_rssi = int(rssi + variation)
# Clamp to realistic values
return max(min(final_rssi, -30), -100)
def calculate_distance(self, lat1, lon1, lat2, lon2):
"""Calculate distance between two points in km"""
R = 6371 # Earth radius in km
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat/2)**2 +
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon/2)**2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
def create_new_drone(self, device):
"""Create a new drone with realistic starting position"""
self.drone_counter += 1
drone_type = random.choice(list(DRONE_TYPES.keys()))
# Start at edge of coverage area
angle = random.uniform(0, 2 * math.pi)
start_distance = device["coverage_radius"] * random.uniform(0.8, 1.0)
start_lat = device["lat"] + (start_distance / 111.0) * math.cos(angle)
start_lon = device["lon"] + (start_distance / (111.0 * math.cos(math.radians(device["lat"])))) * math.sin(angle)
# Movement pattern
patterns = ["approaching", "patrolling", "departing", "hovering"]
movement_pattern = random.choice(patterns)
drone = {
"id": self.drone_counter,
"type": drone_type,
"lat": start_lat,
"lon": start_lon,
"target_lat": device["lat"] if movement_pattern == "approaching" else start_lat,
"target_lon": device["lon"] if movement_pattern == "approaching" else start_lon,
"speed_kmh": DRONE_TYPES[drone_type]["max_speed"] * random.uniform(0.3, 0.8),
"pattern": movement_pattern,
"created_at": time.time(),
"last_detection": 0,
"detection_count": 0,
"is_active": True
}
return drone
def update_drone_position(self, drone, time_delta_seconds):
"""Update drone position based on movement pattern"""
if not drone["is_active"]:
return
speed_ms = drone["speed_kmh"] * 1000 / 3600 # Convert to m/s
distance_m = speed_ms * time_delta_seconds
if drone["pattern"] == "approaching":
# Move towards the device center
current_distance = self.calculate_distance(
drone["lat"], drone["lon"],
drone["target_lat"], drone["target_lon"]
) * 1000 # Convert to meters
if current_distance > 100: # Still approaching
lat_diff = drone["target_lat"] - drone["lat"]
lon_diff = drone["target_lon"] - drone["lon"]
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
if total_diff > 0:
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
drone["lat"] += lat_step
drone["lon"] += lon_step
else:
# Reached target, switch to hovering
drone["pattern"] = "hovering"
elif drone["pattern"] == "hovering":
# Small random movements
drone["lat"] += random.uniform(-0.0001, 0.0001)
drone["lon"] += random.uniform(-0.0001, 0.0001)
elif drone["pattern"] == "patrolling":
# Circular or figure-8 movement
t = (time.time() - drone["created_at"]) / 100 # Slow circular motion
radius = 0.002 # Small patrol radius
drone["lat"] = drone["target_lat"] + radius * math.sin(t)
drone["lon"] = drone["target_lon"] + radius * math.cos(t)
elif drone["pattern"] == "departing":
# Move away from device
lat_diff = drone["lat"] - drone["target_lat"]
lon_diff = drone["lon"] - drone["target_lon"]
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
if total_diff > 0:
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
drone["lat"] += lat_step
drone["lon"] += lon_step
def should_detect_drone(self, drone, device):
"""Determine if device should detect this drone"""
distance_km = self.calculate_distance(
drone["lat"], drone["lon"],
device["lat"], device["lon"]
)
# Detection probability decreases with distance
if distance_km > device["coverage_radius"]:
return False, distance_km
# Higher chance of detection when closer
detection_prob = 1.0 - (distance_km / device["coverage_radius"]) * 0.7
# Factor in time since last detection (avoid spam)
time_since_last = time.time() - drone.get("last_detection", 0)
if time_since_last < 2: # Minimum 2 seconds between detections
detection_prob *= 0.1
return random.random() < detection_prob, distance_km
def generate_detection(self, drone, device, distance_km):
"""Generate realistic detection data"""
drone_info = DRONE_TYPES[drone["type"]]
# Calculate RSSI based on distance
rssi = self.calculate_rssi(distance_km, drone_info["typical_rssi"])
# Select frequency
freq = random.choice(drone_info["freq"])
# Confidence decreases with distance and lower RSSI
base_confidence = 0.95 - (distance_km / 10.0) * 0.3
rssi_factor = (rssi + 100) / 70 # Normalize RSSI to 0-1
confidence = max(0.5, min(0.99, base_confidence * rssi_factor))
# Signal duration varies by drone type and detection quality
duration_base = 2000 if drone["pattern"] == "hovering" else 1000
duration = duration_base + random.randint(-500, 1500)
detection = {
"device_id": device["id"],
"drone_id": drone["id"],
"drone_type": drone["type"],
"rssi": rssi,
"freq": freq,
"geo_lat": drone["lat"],
"geo_lon": drone["lon"],
"device_timestamp": int(time.time() * 1000),
"confidence_level": round(confidence, 2),
"signal_duration": max(500, duration)
}
# Update drone tracking
drone["last_detection"] = time.time()
drone["detection_count"] += 1
return detection
def send_detection(detection_data):
"""Send detection data to the API"""
url = f"{API_BASE_URL}/detections"
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=detection_data, headers=headers, timeout=10)
if response.status_code == 201:
print(f"✅ Device {detection_data['device_id']}: Drone {detection_data['drone_id']} "
f"(RSSI: {detection_data['rssi']}dBm, Dist: ~{detection_data.get('_distance', 'N/A')}km)")
return True
else:
print(f"❌ Failed: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Network error: {e}")
return False
def simulate_realistic_scenario(duration_minutes=10):
"""Simulate realistic drone scenario with persistent tracking"""
print(f"🚁 Starting realistic drone simulation for {duration_minutes} minutes...")
print("📊 Scenario: Multiple drones with persistent tracking, RSSI changes, movement patterns")
print("=" * 80)
simulator = DroneSimulator()
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
# Force create initial drone for testing
if len(DEVICES) > 0:
device = DEVICES[0] # Use first device
initial_drone = simulator.create_new_drone(device)
simulator.active_drones[initial_drone["id"]] = initial_drone
print(f"🎯 Test drone created: {DRONE_TYPES[initial_drone['type']]['name']} near {device['name']}")
detection_count = 0
last_update = start_time
try:
while time.time() < end_time:
current_time = time.time()
time_delta = current_time - last_update
last_update = current_time
# Randomly spawn new drones (1-15% chance per cycle)
if random.random() < 0.15 and len(simulator.active_drones) < 8:
device = random.choice(DEVICES)
new_drone = simulator.create_new_drone(device)
simulator.active_drones[new_drone["id"]] = new_drone
print(f"🆕 New {DRONE_TYPES[new_drone['type']]['name']} spawned near {device['name']}")
# Debug: Print active drones count
if len(simulator.active_drones) == 0:
print("⚠️ No active drones - waiting for spawns...")
# Update all active drones
drones_to_remove = []
for drone_id, drone in simulator.active_drones.items():
simulator.update_drone_position(drone, time_delta)
# Check if drone should be removed (too far or timeout)
age_minutes = (current_time - drone["created_at"]) / 60
if age_minutes > 15 or drone["detection_count"] > 50:
drones_to_remove.append(drone_id)
continue
# Debug: Print drone position
print(f"🔍 Checking drone {drone_id} at ({drone['lat']:.4f}, {drone['lon']:.4f})")
# Check detection for each device
for device in DEVICES:
should_detect, distance = simulator.should_detect_drone(drone, device)
print(f" 📡 Device {device['name']}: distance={distance:.2f}km, should_detect={should_detect}")
if should_detect:
detection = simulator.generate_detection(drone, device, distance)
detection["_distance"] = f"{distance:.1f}" # For logging only
if send_detection(detection):
detection_count += 1
# Remove expired drones
for drone_id in drones_to_remove:
drone = simulator.active_drones[drone_id]
print(f"🛬 {DRONE_TYPES[drone['type']]['name']} {drone_id} finished ({drone['detection_count']} detections)")
del simulator.active_drones[drone_id]
# Status update
elapsed_minutes = (current_time - start_time) / 60
if int(elapsed_minutes) % 2 == 0 and elapsed_minutes > 0:
active_count = len(simulator.active_drones)
if active_count > 0:
print(f"📍 Status: {active_count} active drones, {detection_count} total detections")
time.sleep(3) # 3 second cycle time
except KeyboardInterrupt:
print("\n⏹️ Simulation stopped by user")
elapsed = time.time() - start_time
print(f"\n📈 Simulation Summary:")
print(f" • Duration: {elapsed/60:.1f} minutes")
print(f" • Total detections: {detection_count}")
print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute")
print(f" • Active drones at end: {len(simulator.active_drones)}")
def test_api_health():
"""Test if the API is responding"""
url = f"{API_BASE_URL}/health"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print("✅ API health check passed")
return True
else:
print(f"❌ API health check failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Cannot reach API: {e}")
return False
def main():
print("🚁 Realistic Drone Detection Simulator")
print("=" * 50)
# Fetch devices from API first
print("📡 Fetching active devices from API...")
if not fetch_devices():
print("❌ Cannot fetch devices from API. Please check if the system is running.")
return
if len(DEVICES) == 0:
print("❌ No active devices found. Please add some detector devices first.")
return
# Test API connectivity
if not test_api_health():
print("Cannot connect to API. Please check if the system is running.")
return
print("\nChoose simulation type:")
print("1. Realistic scenario (persistent drones, RSSI changes, movement)")
print("2. Single approaching drone (watch RSSI strengthen)")
print("3. Departing drone (watch RSSI weaken)")
try:
choice = input("\nEnter choice (1-3): ").strip()
if choice == "1":
duration = input("Duration in minutes (default: 10): ").strip()
duration = int(duration) if duration else 10
simulate_realistic_scenario(duration)
elif choice == "2":
print("🎯 Simulating approaching drone (RSSI will strengthen)...")
# Implementation for approaching drone
elif choice == "3":
print("🛫 Simulating departing drone (RSSI will weaken)...")
# Implementation for departing drone
else:
print("Invalid choice")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
except ValueError:
print("Invalid input")
if __name__ == "__main__":
main()