#!/usr/bin/env python3 """ Swedish Drone Detection Simulator This script simulates drone detection data for testing the security monitoring system. It generates realistic detection data with Swedish coordinates for government sites, water facilities, and sensitive areas. Usage: python drone_simulator.py --help python drone_simulator.py --devices 5 --interval 30 --duration 3600 """ import requests import time import random import json import argparse import threading from datetime import datetime, timedelta from dataclasses import dataclass from typing import List, Tuple import math # Swedish coordinates for various sensitive locations SWEDISH_LOCATIONS = { "government_sites": [ {"name": "Riksdag Stockholm", "lat": 59.3275, "lon": 18.0644}, {"name": "Government Offices Stockholm", "lat": 59.3320, "lon": 18.0648}, {"name": "Swedish Armed Forces HQ", "lat": 59.3242, "lon": 18.0969}, {"name": "SÄPO Headquarters", "lat": 59.3370, "lon": 18.0585}, {"name": "Royal Palace Stockholm", "lat": 59.3267, "lon": 18.0717}, ], "water_facilities": [ {"name": "Norsborg Water Treatment", "lat": 59.2766, "lon": 17.8217}, {"name": "Lovö Water Treatment", "lat": 59.3508, "lon": 17.8367}, {"name": "Göteborg Water Works", "lat": 57.6950, "lon": 11.9850}, {"name": "Malmö Water Treatment", "lat": 55.5833, "lon": 12.9833}, {"name": "Uppsala Water Plant", "lat": 59.8586, "lon": 17.6389}, ], "nuclear_facilities": [ {"name": "Forsmark Nuclear Plant", "lat": 60.4017, "lon": 18.1753}, {"name": "Oskarshamn Nuclear Plant", "lat": 57.4167, "lon": 16.6667}, {"name": "Ringhals Nuclear Plant", "lat": 57.2603, "lon": 12.1086}, ], "airports": [ {"name": "Arlanda Airport", "lat": 59.6519, "lon": 17.9186}, {"name": "Landvetter Airport", "lat": 57.6628, "lon": 12.2944}, {"name": "Kastrup Airport (Copenhagen)", "lat": 55.6181, "lon": 12.6563}, {"name": "Sturup Airport", "lat": 55.5264, "lon": 13.3761}, ], "military_bases": [ {"name": "Karlsborg Fortress", "lat": 58.5342, "lon": 14.5219}, {"name": "Boden Fortress", "lat": 65.8250, "lon": 21.6889}, {"name": "Gotland Regiment", "lat": 57.6364, "lon": 18.2944}, {"name": "Amf 1 Livgardet", "lat": 59.4017, "lon": 17.9439}, ] } @dataclass class DroneDevice: """Represents a drone detection device""" device_id: int name: str location: str lat: float lon: float category: str last_heartbeat: float = 0 battery_level: int = 100 signal_strength: int = -45 temperature: float = 20.0 status: str = "active" @dataclass class DroneDetection: """Represents a drone detection event""" device_id: int geo_lat: float geo_lon: float device_timestamp: int drone_type: int rssi: int freq: int drone_id: int class SwedishDroneSimulator: """Simulates drone detection events for Swedish security installations""" def __init__(self, api_base_url: str = "http://localhost:3002/api"): self.api_base_url = api_base_url self.devices: List[DroneDevice] = [] self.running = False self.threat_scenarios = { "low_threat": {"probability": 0.7, "rssi_range": (-90, -70), "distance_km": (5, 15)}, "medium_threat": {"probability": 0.2, "rssi_range": (-70, -55), "distance_km": (0.2, 5)}, "high_threat": {"probability": 0.08, "rssi_range": (-55, -40), "distance_km": (0.05, 0.2)}, "critical_threat": {"probability": 0.02, "rssi_range": (-40, -25), "distance_km": (0, 0.05)} } def generate_devices(self, num_devices: int) -> List[DroneDevice]: """Generate drone detection devices at Swedish sensitive locations""" devices = [] device_id_base = 1001 # Use simple device IDs starting from 1001 all_locations = [] for category, locations in SWEDISH_LOCATIONS.items(): for loc in locations: all_locations.append((category, loc)) # Select random locations selected_locations = random.sample(all_locations, min(num_devices, len(all_locations))) for i, (category, location) in enumerate(selected_locations): device = DroneDevice( device_id=device_id_base + i + 1, name=f"SecureGuard-{i+1:03d}", location=location["name"], lat=location["lat"], lon=location["lon"], category=category, battery_level=random.randint(75, 100), signal_strength=random.randint(-60, -30), temperature=random.uniform(15, 30) ) devices.append(device) self.devices = devices return devices def calculate_threat_based_rssi(self, distance_km: float, base_rssi: int = -30) -> int: """Calculate RSSI based on distance for realistic threat simulation""" # Free space path loss formula adapted for drone detection # RSSI = base_rssi - 20*log10(distance_m) - path_loss_factor distance_m = distance_km * 1000 if distance_m < 1: distance_m = 1 path_loss = 20 * math.log10(distance_m) + random.randint(5, 15) rssi = base_rssi - int(path_loss) # Clamp to realistic RSSI range return max(-100, min(-25, rssi)) def generate_detection_near_device(self, device: DroneDevice, threat_level: str = "low_threat") -> DroneDetection: """Generate a drone detection near a specific device with threat-based parameters""" threat_config = self.threat_scenarios[threat_level] # Generate distance based on threat level min_dist, max_dist = threat_config["distance_km"] distance_km = random.uniform(min_dist, max_dist) # Calculate realistic RSSI based on distance rssi = self.calculate_threat_based_rssi(distance_km) # Ensure RSSI is within threat level range rssi_min, rssi_max = threat_config["rssi_range"] rssi = max(rssi_min, min(rssi_max, rssi)) # Generate coordinates near device (within distance) lat_offset = (random.random() - 0.5) * (distance_km / 111.0) * 2 # ~111km per degree lon_offset = (random.random() - 0.5) * (distance_km / (111.0 * math.cos(math.radians(device.lat)))) * 2 detection_lat = device.lat + lat_offset detection_lon = device.lon + lon_offset # Drone type based on threat level if threat_level == "critical_threat": drone_type = 1 # Professional/Military elif threat_level == "high_threat": drone_type = random.choice([1, 2]) # Professional or Racing else: drone_type = random.choice([0, 0, 0, 1, 2]) # Mostly consumer return DroneDetection( device_id=device.device_id, geo_lat=round(detection_lat, 6), geo_lon=round(detection_lon, 6), device_timestamp=int(time.time()), drone_type=drone_type, rssi=rssi, freq=random.choice([20, 22, 24, 25, 27, 58, 915, 2400, 2450, 5800]), # Common drone frequencies drone_id=random.randint(1000, 9999) ) def select_threat_level(self) -> str: """Select threat level based on probability weights for realistic scenarios""" rand = random.random() cumulative = 0 for threat_level, config in self.threat_scenarios.items(): cumulative += config["probability"] if rand <= cumulative: return threat_level return "low_threat" def send_detection(self, detection: DroneDetection) -> bool: """Send drone detection to the API""" try: payload = { "type": "detection", # Add type field for detection "device_id": detection.device_id, "geo_lat": detection.geo_lat, "geo_lon": detection.geo_lon, "device_timestamp": detection.device_timestamp, "drone_type": detection.drone_type, "rssi": detection.rssi, "freq": detection.freq, "drone_id": detection.drone_id } response = requests.post( f"{self.api_base_url}/detectors", json=payload, headers={"Content-Type": "application/json"}, timeout=10 ) if response.status_code == 201: threat_level = "CRITICAL" if detection.rssi >= -40 else "HIGH" if detection.rssi >= -55 else "MEDIUM" if detection.rssi >= -70 else "LOW" device = next((d for d in self.devices if d.device_id == detection.device_id), None) location_name = device.location if device else "Unknown" print(f"🚨 {threat_level} THREAT DETECTION:") print(f" Location: {location_name}") print(f" Device: {detection.device_id}") print(f" RSSI: {detection.rssi} dBm") print(f" Drone Type: {detection.drone_type}") print(f" Est. Distance: ~{round(10**(((-30) - detection.rssi) / (10 * 3)))}m") print(f" Coordinates: {detection.geo_lat}, {detection.geo_lon}") print() return True else: print(f"❌ Failed to send detection: {response.status_code} - {response.text}") return False except Exception as e: print(f"❌ Error sending detection: {e}") return False def send_heartbeat(self, device: DroneDevice) -> bool: """Send device heartbeat to the API""" try: # Simulate device status changes if random.random() < 0.05: # 5% chance of status change device.battery_level = max(10, device.battery_level - random.randint(1, 5)) device.signal_strength = random.randint(-65, -25) device.temperature = random.uniform(10, 35) payload = { "type": "heartbeat", "key": str(device.device_id), # Use device ID directly as key "battery_level": device.battery_level, "signal_strength": device.signal_strength, "temperature": device.temperature } response = requests.post( f"{self.api_base_url}/detectors", json=payload, headers={"Content-Type": "application/json"}, timeout=10 ) if response.status_code == 200: device.last_heartbeat = time.time() status_icon = "🟢" if device.battery_level > 30 else "🟡" if device.battery_level > 15 else "🔴" print(f"{status_icon} Heartbeat: {device.name} ({device.location}) - Battery: {device.battery_level}%") return True else: print(f"❌ Failed to send heartbeat for device {device.device_id}: {response.status_code}") return False except Exception as e: print(f"❌ Error sending heartbeat for device {device.device_id}: {e}") return False def run_simulation(self, detection_interval: int = 60, heartbeat_interval: int = 30, duration: int = 3600): """Run the complete simulation""" print("🇸🇪 Swedish Drone Detection Security Simulator") print("=" * 50) print(f"📊 Simulation Parameters:") print(f" • Devices: {len(self.devices)}") print(f" • Detection interval: {detection_interval}s") print(f" • Heartbeat interval: {heartbeat_interval}s") print(f" • Duration: {duration}s ({duration//60} minutes)") print(f" • API Base URL: {self.api_base_url}") print() # Display device locations print("📍 Monitoring Locations:") for device in self.devices: print(f" • {device.name}: {device.location} ({device.category.replace('_', ' ').title()})") print() self.running = True start_time = time.time() last_detection_time = 0 last_heartbeat_time = 0 print("🚀 Starting simulation...\n") try: while self.running and (time.time() - start_time) < duration: current_time = time.time() # Send heartbeats if current_time - last_heartbeat_time >= heartbeat_interval: for device in self.devices: self.send_heartbeat(device) last_heartbeat_time = current_time print() # Generate detections if current_time - last_detection_time >= detection_interval: # Simulate detection probability (not every interval) if random.random() < 0.4: # 40% chance of detection device = random.choice(self.devices) threat_level = self.select_threat_level() detection = self.generate_detection_near_device(device, threat_level) self.send_detection(detection) last_detection_time = current_time # Sleep for a short interval time.sleep(1) except KeyboardInterrupt: print("\n🛑 Simulation stopped by user") print(f"\n✅ Simulation completed. Duration: {int(time.time() - start_time)}s") def main(): parser = argparse.ArgumentParser(description="Swedish Drone Detection Security Simulator") parser.add_argument("--devices", type=int, default=5, help="Number of devices to simulate (default: 5)") parser.add_argument("--detection-interval", type=int, default=60, help="Detection interval in seconds (default: 60)") parser.add_argument("--heartbeat-interval", type=int, default=30, help="Heartbeat interval in seconds (default: 30)") parser.add_argument("--duration", type=int, default=3600, help="Simulation duration in seconds (default: 3600)") parser.add_argument("--api-url", default="http://localhost:3002/api", help="API base URL (default: http://localhost:3002/api)") parser.add_argument("--list-locations", action="store_true", help="List all available Swedish locations and exit") args = parser.parse_args() if args.list_locations: print("🇸🇪 Available Swedish Security Monitoring Locations:") print("=" * 60) for category, locations in SWEDISH_LOCATIONS.items(): print(f"\n{category.replace('_', ' ').title()}:") for i, loc in enumerate(locations, 1): print(f" {i}. {loc['name']} ({loc['lat']}, {loc['lon']})") return # Initialize simulator simulator = SwedishDroneSimulator(args.api_url) # Generate devices devices = simulator.generate_devices(args.devices) # Test API connectivity try: response = requests.get(f"{args.api_url}/dashboard/stats", timeout=5) if response.status_code != 200: print(f"⚠️ Warning: API test failed ({response.status_code}). Proceeding anyway...") except Exception as e: print(f"⚠️ Warning: Could not connect to API at {args.api_url}") print(f" Make sure the backend server is running!") print(f" Error: {e}") print() # Run simulation simulator.run_simulation( detection_interval=args.detection_interval, heartbeat_interval=args.heartbeat_interval, duration=args.duration ) if __name__ == "__main__": main()