405 lines
16 KiB
Python
405 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Realistic Drone Detection Test Script
|
|
Simulates realistic drone scenarios with persistent tracking, RSSI changes, and real-world patterns.
|
|
Fetches existing devices from the API and simulates drone detections around them.
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import random
|
|
import math
|
|
from datetime import datetime, timedelta
|
|
|
|
# Configuration
|
|
API_BASE_URL = "http://selfservice.cqers.com/drones/api"
|
|
# Alternative for local testing: "http://localhost:3002/api"
|
|
|
|
# Global variable to store devices fetched from API
|
|
DEVICES = []
|
|
|
|
# Realistic drone types with characteristics
|
|
DRONE_TYPES = {
|
|
1: {"name": "DJI Mavic", "max_speed": 65, "typical_rssi": -60, "freq": [2400, 2450]},
|
|
2: {"name": "Racing Drone", "max_speed": 120, "typical_rssi": -55, "freq": [2400, 5800]},
|
|
3: {"name": "DJI Phantom", "max_speed": 50, "typical_rssi": -65, "freq": [2400, 2450]},
|
|
4: {"name": "Fixed Wing", "max_speed": 80, "typical_rssi": -70, "freq": [900, 2400]},
|
|
5: {"name": "Surveillance", "max_speed": 40, "typical_rssi": -75, "freq": [2400, 5800]}
|
|
}
|
|
|
|
def fetch_devices():
|
|
"""Fetch active devices from the API"""
|
|
global DEVICES
|
|
try:
|
|
response = requests.get(f"{API_BASE_URL}/devices/map")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
api_devices = data.get('data', [])
|
|
|
|
# Convert API device format to simulator format
|
|
DEVICES = []
|
|
for device in api_devices:
|
|
DEVICES.append({
|
|
"id": device["id"],
|
|
"name": device["name"],
|
|
"lat": float(device["geo_lat"]),
|
|
"lon": float(device["geo_lon"]),
|
|
"coverage_radius": 5.0 # Default 5km coverage radius
|
|
})
|
|
|
|
print(f"✅ Fetched {len(DEVICES)} active devices from API")
|
|
for device in DEVICES:
|
|
print(f" - {device['name']} at ({device['lat']:.4f}, {device['lon']:.4f})")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to fetch devices: {response.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error fetching devices: {e}")
|
|
return False
|
|
|
|
class DroneSimulator:
|
|
def __init__(self):
|
|
self.active_drones = {} # Track persistent drones
|
|
self.drone_counter = 1000
|
|
|
|
def calculate_rssi(self, distance_km, base_rssi=-60):
|
|
"""Calculate realistic RSSI based on distance (Free Space Path Loss)"""
|
|
if distance_km < 0.1: # Very close
|
|
return max(base_rssi + 20, -30)
|
|
|
|
# Simplified FSPL: RSSI decreases ~20dB per decade of distance
|
|
rssi = base_rssi - (20 * math.log10(distance_km))
|
|
|
|
# Add some realistic variation
|
|
variation = random.uniform(-5, 5)
|
|
final_rssi = int(rssi + variation)
|
|
|
|
# Clamp to realistic values
|
|
return max(min(final_rssi, -30), -100)
|
|
|
|
def calculate_distance(self, lat1, lon1, lat2, lon2):
|
|
"""Calculate distance between two points in km"""
|
|
R = 6371 # Earth radius in km
|
|
|
|
lat1_rad = math.radians(lat1)
|
|
lat2_rad = math.radians(lat2)
|
|
delta_lat = math.radians(lat2 - lat1)
|
|
delta_lon = math.radians(lon2 - lon1)
|
|
|
|
a = (math.sin(delta_lat/2)**2 +
|
|
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon/2)**2)
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
|
|
|
return R * c
|
|
|
|
def create_new_drone(self, device):
|
|
"""Create a new drone with realistic starting position"""
|
|
self.drone_counter += 1
|
|
drone_type = random.choice(list(DRONE_TYPES.keys()))
|
|
|
|
# Start at edge of coverage area
|
|
angle = random.uniform(0, 2 * math.pi)
|
|
start_distance = device["coverage_radius"] * random.uniform(0.8, 1.0)
|
|
|
|
start_lat = device["lat"] + (start_distance / 111.0) * math.cos(angle)
|
|
start_lon = device["lon"] + (start_distance / (111.0 * math.cos(math.radians(device["lat"])))) * math.sin(angle)
|
|
|
|
# Movement pattern
|
|
patterns = ["approaching", "patrolling", "departing", "hovering"]
|
|
movement_pattern = random.choice(patterns)
|
|
|
|
drone = {
|
|
"id": self.drone_counter,
|
|
"type": drone_type,
|
|
"lat": start_lat,
|
|
"lon": start_lon,
|
|
"target_lat": device["lat"] if movement_pattern == "approaching" else start_lat,
|
|
"target_lon": device["lon"] if movement_pattern == "approaching" else start_lon,
|
|
"speed_kmh": DRONE_TYPES[drone_type]["max_speed"] * random.uniform(0.3, 0.8),
|
|
"pattern": movement_pattern,
|
|
"created_at": time.time(),
|
|
"last_detection": 0,
|
|
"detection_count": 0,
|
|
"is_active": True
|
|
}
|
|
|
|
return drone
|
|
|
|
def update_drone_position(self, drone, time_delta_seconds):
|
|
"""Update drone position based on movement pattern"""
|
|
if not drone["is_active"]:
|
|
return
|
|
|
|
speed_ms = drone["speed_kmh"] * 1000 / 3600 # Convert to m/s
|
|
distance_m = speed_ms * time_delta_seconds
|
|
|
|
if drone["pattern"] == "approaching":
|
|
# Move towards the device center
|
|
current_distance = self.calculate_distance(
|
|
drone["lat"], drone["lon"],
|
|
drone["target_lat"], drone["target_lon"]
|
|
) * 1000 # Convert to meters
|
|
|
|
if current_distance > 100: # Still approaching
|
|
lat_diff = drone["target_lat"] - drone["lat"]
|
|
lon_diff = drone["target_lon"] - drone["lon"]
|
|
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
|
|
|
|
if total_diff > 0:
|
|
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
|
|
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
|
|
|
|
drone["lat"] += lat_step
|
|
drone["lon"] += lon_step
|
|
else:
|
|
# Reached target, switch to hovering
|
|
drone["pattern"] = "hovering"
|
|
|
|
elif drone["pattern"] == "hovering":
|
|
# Small random movements
|
|
drone["lat"] += random.uniform(-0.0001, 0.0001)
|
|
drone["lon"] += random.uniform(-0.0001, 0.0001)
|
|
|
|
elif drone["pattern"] == "patrolling":
|
|
# Circular or figure-8 movement
|
|
t = (time.time() - drone["created_at"]) / 100 # Slow circular motion
|
|
radius = 0.002 # Small patrol radius
|
|
drone["lat"] = drone["target_lat"] + radius * math.sin(t)
|
|
drone["lon"] = drone["target_lon"] + radius * math.cos(t)
|
|
|
|
elif drone["pattern"] == "departing":
|
|
# Move away from device
|
|
lat_diff = drone["lat"] - drone["target_lat"]
|
|
lon_diff = drone["lon"] - drone["target_lon"]
|
|
total_diff = math.sqrt(lat_diff**2 + lon_diff**2)
|
|
|
|
if total_diff > 0:
|
|
lat_step = (lat_diff / total_diff) * (distance_m / 111000)
|
|
lon_step = (lon_diff / total_diff) * (distance_m / 111000)
|
|
|
|
drone["lat"] += lat_step
|
|
drone["lon"] += lon_step
|
|
|
|
def should_detect_drone(self, drone, device):
|
|
"""Determine if device should detect this drone"""
|
|
distance_km = self.calculate_distance(
|
|
drone["lat"], drone["lon"],
|
|
device["lat"], device["lon"]
|
|
)
|
|
|
|
# Detection probability decreases with distance
|
|
if distance_km > device["coverage_radius"]:
|
|
return False, distance_km
|
|
|
|
# Higher chance of detection when closer
|
|
detection_prob = 1.0 - (distance_km / device["coverage_radius"]) * 0.7
|
|
|
|
# Factor in time since last detection (avoid spam)
|
|
time_since_last = time.time() - drone.get("last_detection", 0)
|
|
if time_since_last < 2: # Minimum 2 seconds between detections
|
|
detection_prob *= 0.1
|
|
|
|
return random.random() < detection_prob, distance_km
|
|
|
|
def generate_detection(self, drone, device, distance_km):
|
|
"""Generate realistic detection data"""
|
|
drone_info = DRONE_TYPES[drone["type"]]
|
|
|
|
# Calculate RSSI based on distance
|
|
rssi = self.calculate_rssi(distance_km, drone_info["typical_rssi"])
|
|
|
|
# Select frequency
|
|
freq = random.choice(drone_info["freq"])
|
|
|
|
# Confidence decreases with distance and lower RSSI
|
|
base_confidence = 0.95 - (distance_km / 10.0) * 0.3
|
|
rssi_factor = (rssi + 100) / 70 # Normalize RSSI to 0-1
|
|
confidence = max(0.5, min(0.99, base_confidence * rssi_factor))
|
|
|
|
# Signal duration varies by drone type and detection quality
|
|
duration_base = 2000 if drone["pattern"] == "hovering" else 1000
|
|
duration = duration_base + random.randint(-500, 1500)
|
|
|
|
detection = {
|
|
"device_id": device["id"],
|
|
"drone_id": drone["id"],
|
|
"drone_type": drone["type"],
|
|
"rssi": rssi,
|
|
"freq": freq,
|
|
"geo_lat": drone["lat"],
|
|
"geo_lon": drone["lon"],
|
|
"device_timestamp": int(time.time() * 1000),
|
|
"confidence_level": round(confidence, 2),
|
|
"signal_duration": max(500, duration)
|
|
}
|
|
|
|
# Update drone tracking
|
|
drone["last_detection"] = time.time()
|
|
drone["detection_count"] += 1
|
|
|
|
return detection
|
|
|
|
def send_detection(detection_data):
|
|
"""Send detection data to the API"""
|
|
url = f"{API_BASE_URL}/detections"
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, json=detection_data, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 201:
|
|
print(f"✅ Device {detection_data['device_id']}: Drone {detection_data['drone_id']} "
|
|
f"(RSSI: {detection_data['rssi']}dBm, Dist: ~{detection_data.get('_distance', 'N/A')}km)")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Network error: {e}")
|
|
return False
|
|
|
|
def simulate_realistic_scenario(duration_minutes=10):
|
|
"""Simulate realistic drone scenario with persistent tracking"""
|
|
print(f"🚁 Starting realistic drone simulation for {duration_minutes} minutes...")
|
|
print("📊 Scenario: Multiple drones with persistent tracking, RSSI changes, movement patterns")
|
|
print("=" * 80)
|
|
|
|
simulator = DroneSimulator()
|
|
start_time = time.time()
|
|
end_time = start_time + (duration_minutes * 60)
|
|
|
|
detection_count = 0
|
|
last_update = start_time
|
|
|
|
try:
|
|
while time.time() < end_time:
|
|
current_time = time.time()
|
|
time_delta = current_time - last_update
|
|
last_update = current_time
|
|
|
|
# Randomly spawn new drones (1-15% chance per cycle)
|
|
if random.random() < 0.15 and len(simulator.active_drones) < 8:
|
|
device = random.choice(DEVICES)
|
|
new_drone = simulator.create_new_drone(device)
|
|
simulator.active_drones[new_drone["id"]] = new_drone
|
|
print(f"🆕 New {DRONE_TYPES[new_drone['type']]['name']} spawned near {device['name']}")
|
|
|
|
# Update all active drones
|
|
drones_to_remove = []
|
|
for drone_id, drone in simulator.active_drones.items():
|
|
simulator.update_drone_position(drone, time_delta)
|
|
|
|
# Check if drone should be removed (too far or timeout)
|
|
age_minutes = (current_time - drone["created_at"]) / 60
|
|
if age_minutes > 15 or drone["detection_count"] > 50:
|
|
drones_to_remove.append(drone_id)
|
|
continue
|
|
|
|
# Check detection for each device
|
|
for device in DEVICES:
|
|
should_detect, distance = simulator.should_detect_drone(drone, device)
|
|
|
|
if should_detect:
|
|
detection = simulator.generate_detection(drone, device, distance)
|
|
detection["_distance"] = f"{distance:.1f}" # For logging only
|
|
|
|
if send_detection(detection):
|
|
detection_count += 1
|
|
|
|
# Remove expired drones
|
|
for drone_id in drones_to_remove:
|
|
drone = simulator.active_drones[drone_id]
|
|
print(f"🛬 {DRONE_TYPES[drone['type']]['name']} {drone_id} finished ({drone['detection_count']} detections)")
|
|
del simulator.active_drones[drone_id]
|
|
|
|
# Status update
|
|
elapsed_minutes = (current_time - start_time) / 60
|
|
if int(elapsed_minutes) % 2 == 0 and elapsed_minutes > 0:
|
|
active_count = len(simulator.active_drones)
|
|
if active_count > 0:
|
|
print(f"📍 Status: {active_count} active drones, {detection_count} total detections")
|
|
|
|
time.sleep(3) # 3 second cycle time
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⏹️ Simulation stopped by user")
|
|
|
|
elapsed = time.time() - start_time
|
|
print(f"\n📈 Simulation Summary:")
|
|
print(f" • Duration: {elapsed/60:.1f} minutes")
|
|
print(f" • Total detections: {detection_count}")
|
|
print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute")
|
|
print(f" • Active drones at end: {len(simulator.active_drones)}")
|
|
|
|
def test_api_health():
|
|
"""Test if the API is responding"""
|
|
url = f"{API_BASE_URL}/health"
|
|
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
print("✅ API health check passed")
|
|
return True
|
|
else:
|
|
print(f"❌ API health check failed: {response.status_code}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Cannot reach API: {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("🚁 Realistic Drone Detection Simulator")
|
|
print("=" * 50)
|
|
|
|
# Fetch devices from API first
|
|
print("📡 Fetching active devices from API...")
|
|
if not fetch_devices():
|
|
print("❌ Cannot fetch devices from API. Please check if the system is running.")
|
|
return
|
|
|
|
if len(DEVICES) == 0:
|
|
print("❌ No active devices found. Please add some detector devices first.")
|
|
return
|
|
|
|
# Test API connectivity
|
|
if not test_api_health():
|
|
print("Cannot connect to API. Please check if the system is running.")
|
|
return
|
|
|
|
print("\nChoose simulation type:")
|
|
print("1. Realistic scenario (persistent drones, RSSI changes, movement)")
|
|
print("2. Single approaching drone (watch RSSI strengthen)")
|
|
print("3. Departing drone (watch RSSI weaken)")
|
|
|
|
try:
|
|
choice = input("\nEnter choice (1-3): ").strip()
|
|
|
|
if choice == "1":
|
|
duration = input("Duration in minutes (default: 10): ").strip()
|
|
duration = int(duration) if duration else 10
|
|
simulate_realistic_scenario(duration)
|
|
|
|
elif choice == "2":
|
|
print("🎯 Simulating approaching drone (RSSI will strengthen)...")
|
|
# Implementation for approaching drone
|
|
|
|
elif choice == "3":
|
|
print("🛫 Simulating departing drone (RSSI will weaken)...")
|
|
# Implementation for departing drone
|
|
|
|
else:
|
|
print("Invalid choice")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Goodbye!")
|
|
except ValueError:
|
|
print("Invalid input")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|