Files
drone-detector/drone_simulator.py
2025-08-16 19:43:44 +02:00

380 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Swedish Drone Detection Simulator
This script simulates drone detection data for testing the security monitoring system.
It generates realistic detection data with Swedish coordinates for government sites,
water facilities, and sensitive areas.
Usage:
python drone_simulator.py --help
python drone_simulator.py --devices 5 --interval 30 --duration 3600
"""
import requests
import time
import random
import json
import argparse
import threading
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Tuple
import math
# Swedish coordinates for various sensitive locations
SWEDISH_LOCATIONS = {
"government_sites": [
{"name": "Riksdag Stockholm", "lat": 59.3275, "lon": 18.0644},
{"name": "Government Offices Stockholm", "lat": 59.3320, "lon": 18.0648},
{"name": "Swedish Armed Forces HQ", "lat": 59.3242, "lon": 18.0969},
{"name": "SÄPO Headquarters", "lat": 59.3370, "lon": 18.0585},
{"name": "Royal Palace Stockholm", "lat": 59.3267, "lon": 18.0717},
],
"water_facilities": [
{"name": "Norsborg Water Treatment", "lat": 59.2766, "lon": 17.8217},
{"name": "Lovö Water Treatment", "lat": 59.3508, "lon": 17.8367},
{"name": "Göteborg Water Works", "lat": 57.6950, "lon": 11.9850},
{"name": "Malmö Water Treatment", "lat": 55.5833, "lon": 12.9833},
{"name": "Uppsala Water Plant", "lat": 59.8586, "lon": 17.6389},
],
"nuclear_facilities": [
{"name": "Forsmark Nuclear Plant", "lat": 60.4017, "lon": 18.1753},
{"name": "Oskarshamn Nuclear Plant", "lat": 57.4167, "lon": 16.6667},
{"name": "Ringhals Nuclear Plant", "lat": 57.2603, "lon": 12.1086},
],
"airports": [
{"name": "Arlanda Airport", "lat": 59.6519, "lon": 17.9186},
{"name": "Landvetter Airport", "lat": 57.6628, "lon": 12.2944},
{"name": "Kastrup Airport (Copenhagen)", "lat": 55.6181, "lon": 12.6563},
{"name": "Sturup Airport", "lat": 55.5264, "lon": 13.3761},
],
"military_bases": [
{"name": "Karlsborg Fortress", "lat": 58.5342, "lon": 14.5219},
{"name": "Boden Fortress", "lat": 65.8250, "lon": 21.6889},
{"name": "Gotland Regiment", "lat": 57.6364, "lon": 18.2944},
{"name": "Amf 1 Livgardet", "lat": 59.4017, "lon": 17.9439},
]
}
@dataclass
class DroneDevice:
"""Represents a drone detection device"""
device_id: int
name: str
location: str
lat: float
lon: float
category: str
last_heartbeat: float = 0
battery_level: int = 100
signal_strength: int = -45
temperature: float = 20.0
status: str = "active"
@dataclass
class DroneDetection:
"""Represents a drone detection event"""
device_id: int
geo_lat: float
geo_lon: float
device_timestamp: int
drone_type: int
rssi: int
freq: int
drone_id: int
class SwedishDroneSimulator:
"""Simulates drone detection events for Swedish security installations"""
def __init__(self, api_base_url: str = "http://localhost:3001/api"):
self.api_base_url = api_base_url
self.devices: List[DroneDevice] = []
self.running = False
self.threat_scenarios = {
"low_threat": {"probability": 0.7, "rssi_range": (-90, -70), "distance_km": (5, 15)},
"medium_threat": {"probability": 0.2, "rssi_range": (-70, -55), "distance_km": (0.2, 5)},
"high_threat": {"probability": 0.08, "rssi_range": (-55, -40), "distance_km": (0.05, 0.2)},
"critical_threat": {"probability": 0.02, "rssi_range": (-40, -25), "distance_km": (0, 0.05)}
}
def generate_devices(self, num_devices: int) -> List[DroneDevice]:
"""Generate drone detection devices at Swedish sensitive locations"""
devices = []
device_id_base = 1941875380
all_locations = []
for category, locations in SWEDISH_LOCATIONS.items():
for loc in locations:
all_locations.append((category, loc))
# Select random locations
selected_locations = random.sample(all_locations, min(num_devices, len(all_locations)))
for i, (category, location) in enumerate(selected_locations):
device = DroneDevice(
device_id=device_id_base + i + 1,
name=f"SecureGuard-{i+1:03d}",
location=location["name"],
lat=location["lat"],
lon=location["lon"],
category=category,
battery_level=random.randint(75, 100),
signal_strength=random.randint(-60, -30),
temperature=random.uniform(15, 30)
)
devices.append(device)
self.devices = devices
return devices
def calculate_threat_based_rssi(self, distance_km: float, base_rssi: int = -30) -> int:
"""Calculate RSSI based on distance for realistic threat simulation"""
# Free space path loss formula adapted for drone detection
# RSSI = base_rssi - 20*log10(distance_m) - path_loss_factor
distance_m = distance_km * 1000
if distance_m < 1:
distance_m = 1
path_loss = 20 * math.log10(distance_m) + random.randint(5, 15)
rssi = base_rssi - int(path_loss)
# Clamp to realistic RSSI range
return max(-100, min(-25, rssi))
def generate_detection_near_device(self, device: DroneDevice, threat_level: str = "low_threat") -> DroneDetection:
"""Generate a drone detection near a specific device with threat-based parameters"""
threat_config = self.threat_scenarios[threat_level]
# Generate distance based on threat level
min_dist, max_dist = threat_config["distance_km"]
distance_km = random.uniform(min_dist, max_dist)
# Calculate realistic RSSI based on distance
rssi = self.calculate_threat_based_rssi(distance_km)
# Ensure RSSI is within threat level range
rssi_min, rssi_max = threat_config["rssi_range"]
rssi = max(rssi_min, min(rssi_max, rssi))
# Generate coordinates near device (within distance)
lat_offset = (random.random() - 0.5) * (distance_km / 111.0) * 2 # ~111km per degree
lon_offset = (random.random() - 0.5) * (distance_km / (111.0 * math.cos(math.radians(device.lat)))) * 2
detection_lat = device.lat + lat_offset
detection_lon = device.lon + lon_offset
# Drone type based on threat level
if threat_level == "critical_threat":
drone_type = 1 # Professional/Military
elif threat_level == "high_threat":
drone_type = random.choice([1, 2]) # Professional or Racing
else:
drone_type = random.choice([0, 0, 0, 1, 2]) # Mostly consumer
return DroneDetection(
device_id=device.device_id,
geo_lat=round(detection_lat, 6),
geo_lon=round(detection_lon, 6),
device_timestamp=int(time.time()),
drone_type=drone_type,
rssi=rssi,
freq=random.choice([20, 22, 24, 25, 27, 58, 915, 2400, 2450, 5800]), # Common drone frequencies
drone_id=random.randint(1000, 9999)
)
def select_threat_level(self) -> str:
"""Select threat level based on probability weights for realistic scenarios"""
rand = random.random()
cumulative = 0
for threat_level, config in self.threat_scenarios.items():
cumulative += config["probability"]
if rand <= cumulative:
return threat_level
return "low_threat"
def send_detection(self, detection: DroneDetection) -> bool:
"""Send drone detection to the API"""
try:
payload = {
"device_id": detection.device_id,
"geo_lat": detection.geo_lat,
"geo_lon": detection.geo_lon,
"device_timestamp": detection.device_timestamp,
"drone_type": detection.drone_type,
"rssi": detection.rssi,
"freq": detection.freq,
"drone_id": detection.drone_id
}
response = requests.post(
f"{self.api_base_url}/detections",
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 201:
threat_level = "CRITICAL" if detection.rssi >= -40 else "HIGH" if detection.rssi >= -55 else "MEDIUM" if detection.rssi >= -70 else "LOW"
device = next((d for d in self.devices if d.device_id == detection.device_id), None)
location_name = device.location if device else "Unknown"
print(f"🚨 {threat_level} THREAT DETECTION:")
print(f" Location: {location_name}")
print(f" Device: {detection.device_id}")
print(f" RSSI: {detection.rssi} dBm")
print(f" Drone Type: {detection.drone_type}")
print(f" Est. Distance: ~{round(10**(((-30) - detection.rssi) / (10 * 3)))}m")
print(f" Coordinates: {detection.geo_lat}, {detection.geo_lon}")
print()
return True
else:
print(f"❌ Failed to send detection: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ Error sending detection: {e}")
return False
def send_heartbeat(self, device: DroneDevice) -> bool:
"""Send device heartbeat to the API"""
try:
# Simulate device status changes
if random.random() < 0.05: # 5% chance of status change
device.battery_level = max(10, device.battery_level - random.randint(1, 5))
device.signal_strength = random.randint(-65, -25)
device.temperature = random.uniform(10, 35)
payload = {
"type": "heartbeat",
"key": f"device_{device.device_id}_key",
"battery_level": device.battery_level,
"signal_strength": device.signal_strength,
"temperature": device.temperature
}
response = requests.post(
f"{self.api_base_url}/heartbeat",
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code == 200:
device.last_heartbeat = time.time()
status_icon = "🟢" if device.battery_level > 30 else "🟡" if device.battery_level > 15 else "🔴"
print(f"{status_icon} Heartbeat: {device.name} ({device.location}) - Battery: {device.battery_level}%")
return True
else:
print(f"❌ Failed to send heartbeat for device {device.device_id}: {response.status_code}")
return False
except Exception as e:
print(f"❌ Error sending heartbeat for device {device.device_id}: {e}")
return False
def run_simulation(self, detection_interval: int = 60, heartbeat_interval: int = 30, duration: int = 3600):
"""Run the complete simulation"""
print("🇸🇪 Swedish Drone Detection Security Simulator")
print("=" * 50)
print(f"📊 Simulation Parameters:")
print(f" • Devices: {len(self.devices)}")
print(f" • Detection interval: {detection_interval}s")
print(f" • Heartbeat interval: {heartbeat_interval}s")
print(f" • Duration: {duration}s ({duration//60} minutes)")
print(f" • API Base URL: {self.api_base_url}")
print()
# Display device locations
print("📍 Monitoring Locations:")
for device in self.devices:
print(f"{device.name}: {device.location} ({device.category.replace('_', ' ').title()})")
print()
self.running = True
start_time = time.time()
last_detection_time = 0
last_heartbeat_time = 0
print("🚀 Starting simulation...\n")
try:
while self.running and (time.time() - start_time) < duration:
current_time = time.time()
# Send heartbeats
if current_time - last_heartbeat_time >= heartbeat_interval:
for device in self.devices:
self.send_heartbeat(device)
last_heartbeat_time = current_time
print()
# Generate detections
if current_time - last_detection_time >= detection_interval:
# Simulate detection probability (not every interval)
if random.random() < 0.4: # 40% chance of detection
device = random.choice(self.devices)
threat_level = self.select_threat_level()
detection = self.generate_detection_near_device(device, threat_level)
self.send_detection(detection)
last_detection_time = current_time
# Sleep for a short interval
time.sleep(1)
except KeyboardInterrupt:
print("\n🛑 Simulation stopped by user")
print(f"\n✅ Simulation completed. Duration: {int(time.time() - start_time)}s")
def main():
parser = argparse.ArgumentParser(description="Swedish Drone Detection Security Simulator")
parser.add_argument("--devices", type=int, default=5, help="Number of devices to simulate (default: 5)")
parser.add_argument("--detection-interval", type=int, default=60, help="Detection interval in seconds (default: 60)")
parser.add_argument("--heartbeat-interval", type=int, default=30, help="Heartbeat interval in seconds (default: 30)")
parser.add_argument("--duration", type=int, default=3600, help="Simulation duration in seconds (default: 3600)")
parser.add_argument("--api-url", default="http://localhost:3001/api", help="API base URL (default: http://localhost:3001/api)")
parser.add_argument("--list-locations", action="store_true", help="List all available Swedish locations and exit")
args = parser.parse_args()
if args.list_locations:
print("🇸🇪 Available Swedish Security Monitoring Locations:")
print("=" * 60)
for category, locations in SWEDISH_LOCATIONS.items():
print(f"\n{category.replace('_', ' ').title()}:")
for i, loc in enumerate(locations, 1):
print(f" {i}. {loc['name']} ({loc['lat']}, {loc['lon']})")
return
# Initialize simulator
simulator = SwedishDroneSimulator(args.api_url)
# Generate devices
devices = simulator.generate_devices(args.devices)
# Test API connectivity
try:
response = requests.get(f"{args.api_url}/dashboard/stats", timeout=5)
if response.status_code != 200:
print(f"⚠️ Warning: API test failed ({response.status_code}). Proceeding anyway...")
except Exception as e:
print(f"⚠️ Warning: Could not connect to API at {args.api_url}")
print(f" Make sure the backend server is running!")
print(f" Error: {e}")
print()
# Run simulation
simulator.run_simulation(
detection_interval=args.detection_interval,
heartbeat_interval=args.heartbeat_interval,
duration=args.duration
)
if __name__ == "__main__":
main()