381 lines
16 KiB
Python
381 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Swedish Drone Detection Simulator
|
|
|
|
This script simulates drone detection data for testing the security monitoring system.
|
|
It generates realistic detection data with Swedish coordinates for government sites,
|
|
water facilities, and sensitive areas.
|
|
|
|
Usage:
|
|
python drone_simulator.py --help
|
|
python drone_simulator.py --devices 5 --interval 30 --duration 3600
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import random
|
|
import json
|
|
import argparse
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
from typing import List, Tuple
|
|
import math
|
|
|
|
# Swedish coordinates for various sensitive locations
|
|
SWEDISH_LOCATIONS = {
|
|
"government_sites": [
|
|
{"name": "Riksdag Stockholm", "lat": 59.3275, "lon": 18.0644},
|
|
{"name": "Government Offices Stockholm", "lat": 59.3320, "lon": 18.0648},
|
|
{"name": "Swedish Armed Forces HQ", "lat": 59.3242, "lon": 18.0969},
|
|
{"name": "SÄPO Headquarters", "lat": 59.3370, "lon": 18.0585},
|
|
{"name": "Royal Palace Stockholm", "lat": 59.3267, "lon": 18.0717},
|
|
],
|
|
"water_facilities": [
|
|
{"name": "Norsborg Water Treatment", "lat": 59.2766, "lon": 17.8217},
|
|
{"name": "Lovö Water Treatment", "lat": 59.3508, "lon": 17.8367},
|
|
{"name": "Göteborg Water Works", "lat": 57.6950, "lon": 11.9850},
|
|
{"name": "Malmö Water Treatment", "lat": 55.5833, "lon": 12.9833},
|
|
{"name": "Uppsala Water Plant", "lat": 59.8586, "lon": 17.6389},
|
|
],
|
|
"nuclear_facilities": [
|
|
{"name": "Forsmark Nuclear Plant", "lat": 60.4017, "lon": 18.1753},
|
|
{"name": "Oskarshamn Nuclear Plant", "lat": 57.4167, "lon": 16.6667},
|
|
{"name": "Ringhals Nuclear Plant", "lat": 57.2603, "lon": 12.1086},
|
|
],
|
|
"airports": [
|
|
{"name": "Arlanda Airport", "lat": 59.6519, "lon": 17.9186},
|
|
{"name": "Landvetter Airport", "lat": 57.6628, "lon": 12.2944},
|
|
{"name": "Kastrup Airport (Copenhagen)", "lat": 55.6181, "lon": 12.6563},
|
|
{"name": "Sturup Airport", "lat": 55.5264, "lon": 13.3761},
|
|
],
|
|
"military_bases": [
|
|
{"name": "Karlsborg Fortress", "lat": 58.5342, "lon": 14.5219},
|
|
{"name": "Boden Fortress", "lat": 65.8250, "lon": 21.6889},
|
|
{"name": "Gotland Regiment", "lat": 57.6364, "lon": 18.2944},
|
|
{"name": "Amf 1 Livgardet", "lat": 59.4017, "lon": 17.9439},
|
|
]
|
|
}
|
|
|
|
@dataclass
|
|
class DroneDevice:
|
|
"""Represents a drone detection device"""
|
|
device_id: int
|
|
name: str
|
|
location: str
|
|
lat: float
|
|
lon: float
|
|
category: str
|
|
last_heartbeat: float = 0
|
|
battery_level: int = 100
|
|
signal_strength: int = -45
|
|
temperature: float = 20.0
|
|
status: str = "active"
|
|
|
|
@dataclass
|
|
class DroneDetection:
|
|
"""Represents a drone detection event"""
|
|
device_id: int
|
|
geo_lat: float
|
|
geo_lon: float
|
|
device_timestamp: int
|
|
drone_type: int
|
|
rssi: int
|
|
freq: int
|
|
drone_id: int
|
|
|
|
class SwedishDroneSimulator:
|
|
"""Simulates drone detection events for Swedish security installations"""
|
|
|
|
def __init__(self, api_base_url: str = "http://localhost:3001/api"):
|
|
self.api_base_url = api_base_url
|
|
self.devices: List[DroneDevice] = []
|
|
self.running = False
|
|
self.threat_scenarios = {
|
|
"low_threat": {"probability": 0.7, "rssi_range": (-90, -70), "distance_km": (5, 15)},
|
|
"medium_threat": {"probability": 0.2, "rssi_range": (-70, -55), "distance_km": (0.2, 5)},
|
|
"high_threat": {"probability": 0.08, "rssi_range": (-55, -40), "distance_km": (0.05, 0.2)},
|
|
"critical_threat": {"probability": 0.02, "rssi_range": (-40, -25), "distance_km": (0, 0.05)}
|
|
}
|
|
|
|
def generate_devices(self, num_devices: int) -> List[DroneDevice]:
|
|
"""Generate drone detection devices at Swedish sensitive locations"""
|
|
devices = []
|
|
device_id_base = 1941875380
|
|
|
|
all_locations = []
|
|
for category, locations in SWEDISH_LOCATIONS.items():
|
|
for loc in locations:
|
|
all_locations.append((category, loc))
|
|
|
|
# Select random locations
|
|
selected_locations = random.sample(all_locations, min(num_devices, len(all_locations)))
|
|
|
|
for i, (category, location) in enumerate(selected_locations):
|
|
device = DroneDevice(
|
|
device_id=device_id_base + i + 1,
|
|
name=f"SecureGuard-{i+1:03d}",
|
|
location=location["name"],
|
|
lat=location["lat"],
|
|
lon=location["lon"],
|
|
category=category,
|
|
battery_level=random.randint(75, 100),
|
|
signal_strength=random.randint(-60, -30),
|
|
temperature=random.uniform(15, 30)
|
|
)
|
|
devices.append(device)
|
|
|
|
self.devices = devices
|
|
return devices
|
|
|
|
def calculate_threat_based_rssi(self, distance_km: float, base_rssi: int = -30) -> int:
|
|
"""Calculate RSSI based on distance for realistic threat simulation"""
|
|
# Free space path loss formula adapted for drone detection
|
|
# RSSI = base_rssi - 20*log10(distance_m) - path_loss_factor
|
|
distance_m = distance_km * 1000
|
|
if distance_m < 1:
|
|
distance_m = 1
|
|
|
|
path_loss = 20 * math.log10(distance_m) + random.randint(5, 15)
|
|
rssi = base_rssi - int(path_loss)
|
|
|
|
# Clamp to realistic RSSI range
|
|
return max(-100, min(-25, rssi))
|
|
|
|
def generate_detection_near_device(self, device: DroneDevice, threat_level: str = "low_threat") -> DroneDetection:
|
|
"""Generate a drone detection near a specific device with threat-based parameters"""
|
|
|
|
threat_config = self.threat_scenarios[threat_level]
|
|
|
|
# Generate distance based on threat level
|
|
min_dist, max_dist = threat_config["distance_km"]
|
|
distance_km = random.uniform(min_dist, max_dist)
|
|
|
|
# Calculate realistic RSSI based on distance
|
|
rssi = self.calculate_threat_based_rssi(distance_km)
|
|
|
|
# Ensure RSSI is within threat level range
|
|
rssi_min, rssi_max = threat_config["rssi_range"]
|
|
rssi = max(rssi_min, min(rssi_max, rssi))
|
|
|
|
# Generate coordinates near device (within distance)
|
|
lat_offset = (random.random() - 0.5) * (distance_km / 111.0) * 2 # ~111km per degree
|
|
lon_offset = (random.random() - 0.5) * (distance_km / (111.0 * math.cos(math.radians(device.lat)))) * 2
|
|
|
|
detection_lat = device.lat + lat_offset
|
|
detection_lon = device.lon + lon_offset
|
|
|
|
# Drone type based on threat level
|
|
if threat_level == "critical_threat":
|
|
drone_type = 1 # Professional/Military
|
|
elif threat_level == "high_threat":
|
|
drone_type = random.choice([1, 2]) # Professional or Racing
|
|
else:
|
|
drone_type = random.choice([0, 0, 0, 1, 2]) # Mostly consumer
|
|
|
|
return DroneDetection(
|
|
device_id=device.device_id,
|
|
geo_lat=round(detection_lat, 6),
|
|
geo_lon=round(detection_lon, 6),
|
|
device_timestamp=int(time.time()),
|
|
drone_type=drone_type,
|
|
rssi=rssi,
|
|
freq=random.choice([20, 22, 24, 25, 27, 58, 915, 2400, 2450, 5800]), # Common drone frequencies
|
|
drone_id=random.randint(1000, 9999)
|
|
)
|
|
|
|
def select_threat_level(self) -> str:
|
|
"""Select threat level based on probability weights for realistic scenarios"""
|
|
rand = random.random()
|
|
cumulative = 0
|
|
|
|
for threat_level, config in self.threat_scenarios.items():
|
|
cumulative += config["probability"]
|
|
if rand <= cumulative:
|
|
return threat_level
|
|
|
|
return "low_threat"
|
|
|
|
def send_detection(self, detection: DroneDetection) -> bool:
|
|
"""Send drone detection to the API"""
|
|
try:
|
|
payload = {
|
|
"type": "detection", # Add type field for detection
|
|
"device_id": detection.device_id,
|
|
"geo_lat": detection.geo_lat,
|
|
"geo_lon": detection.geo_lon,
|
|
"device_timestamp": detection.device_timestamp,
|
|
"drone_type": detection.drone_type,
|
|
"rssi": detection.rssi,
|
|
"freq": detection.freq,
|
|
"drone_id": detection.drone_id
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{self.api_base_url}/detectors",
|
|
json=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
threat_level = "CRITICAL" if detection.rssi >= -40 else "HIGH" if detection.rssi >= -55 else "MEDIUM" if detection.rssi >= -70 else "LOW"
|
|
device = next((d for d in self.devices if d.device_id == detection.device_id), None)
|
|
location_name = device.location if device else "Unknown"
|
|
|
|
print(f"🚨 {threat_level} THREAT DETECTION:")
|
|
print(f" Location: {location_name}")
|
|
print(f" Device: {detection.device_id}")
|
|
print(f" RSSI: {detection.rssi} dBm")
|
|
print(f" Drone Type: {detection.drone_type}")
|
|
print(f" Est. Distance: ~{round(10**(((-30) - detection.rssi) / (10 * 3)))}m")
|
|
print(f" Coordinates: {detection.geo_lat}, {detection.geo_lon}")
|
|
print()
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to send detection: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error sending detection: {e}")
|
|
return False
|
|
|
|
def send_heartbeat(self, device: DroneDevice) -> bool:
|
|
"""Send device heartbeat to the API"""
|
|
try:
|
|
# Simulate device status changes
|
|
if random.random() < 0.05: # 5% chance of status change
|
|
device.battery_level = max(10, device.battery_level - random.randint(1, 5))
|
|
device.signal_strength = random.randint(-65, -25)
|
|
device.temperature = random.uniform(10, 35)
|
|
|
|
payload = {
|
|
"type": "heartbeat",
|
|
"key": f"device_{device.device_id}_key",
|
|
"battery_level": device.battery_level,
|
|
"signal_strength": device.signal_strength,
|
|
"temperature": device.temperature
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{self.api_base_url}/detectors",
|
|
json=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
device.last_heartbeat = time.time()
|
|
status_icon = "🟢" if device.battery_level > 30 else "🟡" if device.battery_level > 15 else "🔴"
|
|
print(f"{status_icon} Heartbeat: {device.name} ({device.location}) - Battery: {device.battery_level}%")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to send heartbeat for device {device.device_id}: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error sending heartbeat for device {device.device_id}: {e}")
|
|
return False
|
|
|
|
def run_simulation(self, detection_interval: int = 60, heartbeat_interval: int = 30, duration: int = 3600):
|
|
"""Run the complete simulation"""
|
|
print("🇸🇪 Swedish Drone Detection Security Simulator")
|
|
print("=" * 50)
|
|
print(f"📊 Simulation Parameters:")
|
|
print(f" • Devices: {len(self.devices)}")
|
|
print(f" • Detection interval: {detection_interval}s")
|
|
print(f" • Heartbeat interval: {heartbeat_interval}s")
|
|
print(f" • Duration: {duration}s ({duration//60} minutes)")
|
|
print(f" • API Base URL: {self.api_base_url}")
|
|
print()
|
|
|
|
# Display device locations
|
|
print("📍 Monitoring Locations:")
|
|
for device in self.devices:
|
|
print(f" • {device.name}: {device.location} ({device.category.replace('_', ' ').title()})")
|
|
print()
|
|
|
|
self.running = True
|
|
start_time = time.time()
|
|
last_detection_time = 0
|
|
last_heartbeat_time = 0
|
|
|
|
print("🚀 Starting simulation...\n")
|
|
|
|
try:
|
|
while self.running and (time.time() - start_time) < duration:
|
|
current_time = time.time()
|
|
|
|
# Send heartbeats
|
|
if current_time - last_heartbeat_time >= heartbeat_interval:
|
|
for device in self.devices:
|
|
self.send_heartbeat(device)
|
|
last_heartbeat_time = current_time
|
|
print()
|
|
|
|
# Generate detections
|
|
if current_time - last_detection_time >= detection_interval:
|
|
# Simulate detection probability (not every interval)
|
|
if random.random() < 0.4: # 40% chance of detection
|
|
device = random.choice(self.devices)
|
|
threat_level = self.select_threat_level()
|
|
detection = self.generate_detection_near_device(device, threat_level)
|
|
self.send_detection(detection)
|
|
|
|
last_detection_time = current_time
|
|
|
|
# Sleep for a short interval
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n🛑 Simulation stopped by user")
|
|
|
|
print(f"\n✅ Simulation completed. Duration: {int(time.time() - start_time)}s")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Swedish Drone Detection Security Simulator")
|
|
parser.add_argument("--devices", type=int, default=5, help="Number of devices to simulate (default: 5)")
|
|
parser.add_argument("--detection-interval", type=int, default=60, help="Detection interval in seconds (default: 60)")
|
|
parser.add_argument("--heartbeat-interval", type=int, default=30, help="Heartbeat interval in seconds (default: 30)")
|
|
parser.add_argument("--duration", type=int, default=3600, help="Simulation duration in seconds (default: 3600)")
|
|
parser.add_argument("--api-url", default="http://localhost:3001/api", help="API base URL (default: http://localhost:3001/api)")
|
|
parser.add_argument("--list-locations", action="store_true", help="List all available Swedish locations and exit")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list_locations:
|
|
print("🇸🇪 Available Swedish Security Monitoring Locations:")
|
|
print("=" * 60)
|
|
for category, locations in SWEDISH_LOCATIONS.items():
|
|
print(f"\n{category.replace('_', ' ').title()}:")
|
|
for i, loc in enumerate(locations, 1):
|
|
print(f" {i}. {loc['name']} ({loc['lat']}, {loc['lon']})")
|
|
return
|
|
|
|
# Initialize simulator
|
|
simulator = SwedishDroneSimulator(args.api_url)
|
|
|
|
# Generate devices
|
|
devices = simulator.generate_devices(args.devices)
|
|
|
|
# Test API connectivity
|
|
try:
|
|
response = requests.get(f"{args.api_url}/dashboard/stats", timeout=5)
|
|
if response.status_code != 200:
|
|
print(f"⚠️ Warning: API test failed ({response.status_code}). Proceeding anyway...")
|
|
except Exception as e:
|
|
print(f"⚠️ Warning: Could not connect to API at {args.api_url}")
|
|
print(f" Make sure the backend server is running!")
|
|
print(f" Error: {e}")
|
|
print()
|
|
|
|
# Run simulation
|
|
simulator.run_simulation(
|
|
detection_interval=args.detection_interval,
|
|
heartbeat_interval=args.heartbeat_interval,
|
|
duration=args.duration
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|