Files
drone-detector/drone_simulator.py
2025-09-07 15:14:28 +02:00

359 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Swedish Drone Detection Simulator
This script simulates drone detection data for testing the security monitoring system.
It generates realistic detection data with Swedish coordinates for government sites,
water facilities, and sensitive areas.
Usage:
python drone_simulator.py --help
python drone_simulator.py --devices 5 --interval 30 --duration 3600
"""
import requests
import time
import random
import json
import argparse
import threading
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Tuple
import math
# Swedish coordinates for various sensitive locations
SWEDISH_LOCATIONS = {
"government_sites": [
{"name": "Riksdag Stockholm", "lat": 59.3275, "lon": 18.0644},
{"name": "Government Offices Stockholm", "lat": 59.3320, "lon": 18.0648},
{"name": "Swedish Armed Forces HQ", "lat": 59.3242, "lon": 18.0969},
{"name": "SÄPO Headquarters", "lat": 59.3370, "lon": 18.0585},
{"name": "Royal Palace Stockholm", "lat": 59.3267, "lon": 18.0717},
],
"water_facilities": [
{"name": "Norsborg Water Treatment", "lat": 59.2766, "lon": 17.8217},
{"name": "Lovö Water Treatment", "lat": 59.3508, "lon": 17.8367},
{"name": "Göteborg Water Works", "lat": 57.6950, "lon": 11.9850},
{"name": "Malmö Water Treatment", "lat": 55.5833, "lon": 12.9833},
{"name": "Uppsala Water Plant", "lat": 59.8586, "lon": 17.6389},
],
"nuclear_facilities": [
{"name": "Forsmark Nuclear Plant", "lat": 60.4017, "lon": 18.1753},
{"name": "Oskarshamn Nuclear Plant", "lat": 57.4167, "lon": 16.6667},
{"name": "Ringhals Nuclear Plant", "lat": 57.2603, "lon": 12.1086},
],
"airports": [
{"name": "Arlanda Airport", "lat": 59.6519, "lon": 17.9186},
{"name": "Landvetter Airport", "lat": 57.6628, "lon": 12.2944},
{"name": "Kastrup Airport (Copenhagen)", "lat": 55.6181, "lon": 12.6563},
{"name": "Sturup Airport", "lat": 55.5264, "lon": 13.3761},
],
"military_bases": [
{"name": "Karlsborg Fortress", "lat": 58.5342, "lon": 14.5219},
{"name": "Boden Fortress", "lat": 65.8250, "lon": 21.6889},
{"name": "Gotland Regiment", "lat": 57.6364, "lon": 18.2944},
{"name": "Amf 1 Livgardet", "lat": 59.4017, "lon": 17.9439},
]
}
@dataclass
class DroneDevice:
"""Represents a drone detection device"""
device_id: int
name: str
location: str
lat: float
lon: float
category: str
last_heartbeat: float = 0
battery_level: int = 100
signal_strength: int = -45
temperature: float = 20.0
status: str = "active"
@dataclass
class DroneDetection:
"""Represents a drone detection event"""
device_id: int
geo_lat: float
geo_lon: float
device_timestamp: int
drone_type: int
rssi: int
freq: int
drone_id: int
class SwedishDroneSimulator:
"""Simulates drone detection events for Swedish security installations"""
def __init__(self, api_base_url: str = "http://localhost:3002/api"):
self.api_base_url = api_base_url
self.devices: List[DroneDevice] = []
self.running = False
self.threat_scenarios = {
"low_threat": {"probability": 0.4, "rssi_range": (-90, -70)},
"medium_threat": {"probability": 0.3, "rssi_range": (-70, -55)},
"high_threat": {"probability": 0.2, "rssi_range": (-55, -40)},
"critical_threat": {"probability": 0.1, "rssi_range": (-40, -25)}
}
def generate_devices(self, num_devices: int) -> List[DroneDevice]:
"""Generate drone detection devices at Swedish sensitive locations"""
devices = []
device_id_base = 1941875380 # Use the same IDs as in database setup
all_locations = []
for category, locations in SWEDISH_LOCATIONS.items():
for loc in locations:
all_locations.append((category, loc))
# Select random locations
selected_locations = random.sample(all_locations, min(num_devices, len(all_locations)))
for i, (category, location) in enumerate(selected_locations):
device = DroneDevice(
device_id=device_id_base + i + 1,
name=f"SecureGuard-{i+1:03d}",
location=location["name"],
lat=location["lat"],
lon=location["lon"],
category=category,
battery_level=random.randint(75, 100),
signal_strength=random.randint(-60, -30),
temperature=random.uniform(15, 30)
)
devices.append(device)
self.devices = devices
return devices
def generate_detection_near_device(self, device: DroneDevice, threat_level: str = "low_threat") -> DroneDetection:
"""Generate a drone detection with realistic RSSI and drone type"""
threat_config = self.threat_scenarios[threat_level]
# Generate RSSI (what the device actually measures)
rssi_min, rssi_max = threat_config["rssi_range"]
rssi = random.randint(rssi_min, rssi_max)
# Drone type based on threat level and realistic scenarios
if threat_level == "critical_threat":
# Critical threats: Military drones (Orlan, Zala, Lancet)
drone_type = random.choice([2, 3, 5, 6, 14]) # Orlan, Zala, Lancet variants, Supercam
elif threat_level == "high_threat":
# High threats: Maybe military or FPV attack drones
drone_type = random.choice([7, 8, 9, 10, 11]) # FPV drones, Maybe Orlan/Zala/Lancet
else:
# Low/medium threats: Commercial/DJI drones or unknown
drone_type = random.choice([1, 13, 16, 17, 18]) # Unknown, DJI, various commercial types
return DroneDetection(
device_id=device.device_id,
geo_lat=device.lat, # Use device location - backend will handle positioning
geo_lon=device.lon, # Use device location - backend will handle positioning
device_timestamp=int(time.time()),
drone_type=drone_type,
rssi=rssi,
freq=random.choice([20, 22, 24, 25, 27, 58, 915, 2400, 2450, 5800]), # Common drone frequencies
drone_id=random.randint(1000, 9999)
)
def select_threat_level(self) -> str:
"""Select threat level based on probability weights for realistic scenarios"""
rand = random.random()
cumulative = 0
for threat_level, config in self.threat_scenarios.items():
cumulative += config["probability"]
if rand <= cumulative:
return threat_level
return "low_threat"
def send_detection(self, detection: DroneDetection) -> bool:
"""Send drone detection to the API"""
try:
payload = {
"type": "detection", # Add type field for detection
"device_id": detection.device_id,
"geo_lat": detection.geo_lat,
"geo_lon": detection.geo_lon,
"device_timestamp": detection.device_timestamp,
"drone_type": detection.drone_type,
"rssi": detection.rssi,
"freq": detection.freq,
"drone_id": detection.drone_id
}
response = requests.post(
f"{self.api_base_url}/detectors",
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 201:
threat_level = "CRITICAL" if detection.rssi >= -40 else "HIGH" if detection.rssi >= -55 else "MEDIUM" if detection.rssi >= -70 else "LOW"
device = next((d for d in self.devices if d.device_id == detection.device_id), None)
location_name = device.location if device else "Unknown"
print(f"🚨 {threat_level} THREAT DETECTION:")
print(f" Location: {location_name}")
print(f" Device: {detection.device_id}")
print(f" RSSI: {detection.rssi} dBm")
print(f" Drone Type: {detection.drone_type}")
print(f" Est. Distance: ~{round(10**(((-30) - detection.rssi) / (10 * 3)))}m")
print(f" Coordinates: {detection.geo_lat}, {detection.geo_lon}")
print()
return True
else:
print(f"❌ Failed to send detection: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ Error sending detection: {e}")
return False
def send_heartbeat(self, device: DroneDevice) -> bool:
"""Send device heartbeat to the API"""
try:
payload = {
"type": "heartbeat",
"key": str(device.device_id) # Just the device ID as key
}
response = requests.post(
f"{self.api_base_url}/detectors",
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
device.last_heartbeat = time.time()
status_icon = "🟢"
print(f"{status_icon} Heartbeat: {device.name} ({device.location})")
return True
else:
print(f"❌ Failed to send heartbeat for device {device.device_id}: {response.status_code}")
return False
except Exception as e:
print(f"❌ Error sending heartbeat for device {device.device_id}: {e}")
return False
def run_simulation(self, detection_interval: int = 60, heartbeat_interval: int = 30, duration: int = 3600):
"""Run the complete simulation"""
print("🇸🇪 Swedish Drone Detection Security Simulator")
print("=" * 50)
print(f"📊 Simulation Parameters:")
print(f" • Devices: {len(self.devices)}")
print(f" • Detection interval: {detection_interval}s")
print(f" • Heartbeat interval: {heartbeat_interval}s")
print(f" • Duration: {duration}s ({duration//60} minutes)")
print(f" • API Base URL: {self.api_base_url}")
print()
# Display device locations
print("📍 Monitoring Locations:")
for device in self.devices:
print(f"{device.name}: {device.location} ({device.category.replace('_', ' ').title()})")
print()
self.running = True
start_time = time.time()
last_detection_time = 0
last_heartbeat_time = 0
print("🚀 Starting simulation...\n")
try:
while self.running and (time.time() - start_time) < duration:
current_time = time.time()
# Send heartbeats
if current_time - last_heartbeat_time >= heartbeat_interval:
for device in self.devices:
self.send_heartbeat(device)
last_heartbeat_time = current_time
print()
# Generate detections
if current_time - last_detection_time >= detection_interval:
# Higher chance of detection for testing (70% chance)
if random.random() < 0.7:
# Sometimes generate multiple simultaneous detections for testing alarms
num_detections = 1
if random.random() < 0.3: # 30% chance of multiple detections
num_detections = random.randint(2, min(4, len(self.devices)))
print(f"🎯 Generating {num_detections} simultaneous detection{'s' if num_detections > 1 else ''}...")
# Select different devices for simultaneous detections
selected_devices = random.sample(self.devices, num_detections)
for device in selected_devices:
threat_level = self.select_threat_level()
detection = self.generate_detection_near_device(device, threat_level)
self.send_detection(detection)
# Small delay between simultaneous detections for realism
time.sleep(0.5)
last_detection_time = current_time
# Sleep for a short interval
time.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Simulation stopped by user")
print(f"\n✅ Simulation completed. Duration: {int(time.time() - start_time)}s")
def main():
parser = argparse.ArgumentParser(description="Swedish Drone Detection Security Simulator")
parser.add_argument("--devices", type=int, default=5, help="Number of devices to simulate (default: 5)")
parser.add_argument("--detection-interval", type=int, default=60, help="Detection interval in seconds (default: 60)")
parser.add_argument("--heartbeat-interval", type=int, default=30, help="Heartbeat interval in seconds (default: 30)")
parser.add_argument("--duration", type=int, default=3600, help="Simulation duration in seconds (default: 3600)")
parser.add_argument("--api-url", default="http://localhost:3002/api", help="API base URL (default: http://localhost:3002/api)")
parser.add_argument("--list-locations", action="store_true", help="List all available Swedish locations and exit")
args = parser.parse_args()
if args.list_locations:
print("🇸🇪 Available Swedish Security Monitoring Locations:")
print("=" * 60)
for category, locations in SWEDISH_LOCATIONS.items():
print(f"\n{category.replace('_', ' ').title()}:")
for i, loc in enumerate(locations, 1):
print(f" {i}. {loc['name']} ({loc['lat']}, {loc['lon']})")
return
# Initialize simulator
simulator = SwedishDroneSimulator(args.api_url)
# Generate devices
devices = simulator.generate_devices(args.devices)
# Test API connectivity
try:
response = requests.get(f"{args.api_url}/dashboard/stats", timeout=5)
if response.status_code != 200:
print(f"⚠️ Warning: API test failed ({response.status_code}). Proceeding anyway...")
except Exception as e:
print(f"⚠️ Warning: Could not connect to API at {args.api_url}")
print(f" Make sure the backend server is running!")
print(f" Error: {e}")
print()
# Run simulation
simulator.run_simulation(
detection_interval=args.detection_interval,
heartbeat_interval=args.heartbeat_interval,
duration=args.duration
)
if __name__ == "__main__":
main()