Files
drone-detector/test_drone_data.py
2025-08-17 08:41:16 +02:00

213 lines
6.7 KiB
Python

#!/usr/bin/env python3
"""
Drone Detection Test Script
Sends simulated drone detection data to the API for testing purposes.
"""
import requests
import json
import time
import random
from datetime import datetime
# Configuration
API_BASE_URL = "http://selfservice.cqers.com/drones/api"
# Alternative for local testing: "http://localhost:3002/api"
# Device configurations (simulating different detector devices)
DEVICES = [
{
"id": 1,
"name": "Stockholm Central Detector",
"lat": 59.3293,
"lon": 18.0686
},
{
"id": 2,
"name": "Arlanda Airport Detector",
"lat": 59.6519,
"lon": 17.9186
},
{
"id": 3,
"name": "Göteborg Harbor Detector",
"lat": 57.7089,
"lon": 11.9746
}
]
# Drone types
DRONE_TYPES = {
0: "Unknown",
1: "Commercial DJI",
2: "Racing Drone",
3: "Fixed Wing",
4: "Military/Surveillance",
5: "Custom Build"
}
def generate_detection_data(device):
"""Generate realistic drone detection data"""
# Random position near the device (within ~5km radius)
lat_offset = random.uniform(-0.045, 0.045) # ~5km latitude
lon_offset = random.uniform(-0.09, 0.09) # ~5km longitude
detection = {
"device_id": device["id"],
"drone_id": random.randint(1000, 9999),
"drone_type": random.randint(0, 5),
"rssi": random.randint(-90, -30), # Signal strength in dBm
"freq": random.choice([2400, 2450, 5800, 5850]), # Common drone frequencies
"geo_lat": device["lat"] + lat_offset,
"geo_lon": device["lon"] + lon_offset,
"device_timestamp": int(time.time() * 1000), # Unix timestamp in ms
"confidence_level": round(random.uniform(0.6, 1.0), 2),
"signal_duration": random.randint(500, 5000) # Duration in ms
}
return detection
def send_detection(detection_data):
"""Send detection data to the API"""
url = f"{API_BASE_URL}/detections"
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=detection_data, headers=headers, timeout=10)
if response.status_code == 201:
print(f"✅ Detection sent successfully: Device {detection_data['device_id']}, "
f"Drone {detection_data['drone_id']}, RSSI {detection_data['rssi']}dBm")
return True
else:
print(f"❌ Failed to send detection: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Network error: {e}")
return False
def test_single_detection():
"""Send a single test detection"""
device = random.choice(DEVICES)
detection = generate_detection_data(device)
print("🚁 Sending single test detection...")
print(f"📍 Device: {device['name']}")
print(f"🛰️ Drone ID: {detection['drone_id']}")
print(f"📡 RSSI: {detection['rssi']}dBm")
print(f"📊 Frequency: {detection['freq']}MHz")
print(f"🎯 Confidence: {detection['confidence_level']}")
return send_detection(detection)
def simulate_continuous_detections(duration_minutes=5, interval_seconds=10):
"""Simulate continuous drone detections for testing"""
print(f"🔄 Starting continuous simulation for {duration_minutes} minutes...")
print(f"⏱️ Sending detection every {interval_seconds} seconds")
print("Press Ctrl+C to stop\n")
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
detection_count = 0
try:
while time.time() < end_time:
# Generate 1-3 simultaneous detections from different devices
num_detections = random.randint(1, 3)
for _ in range(num_detections):
device = random.choice(DEVICES)
detection = generate_detection_data(device)
if send_detection(detection):
detection_count += 1
# Small delay between simultaneous detections
time.sleep(0.5)
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\n⏹️ Simulation stopped by user")
elapsed = time.time() - start_time
print(f"\n📈 Simulation completed:")
print(f" • Duration: {elapsed/60:.1f} minutes")
print(f" • Detections sent: {detection_count}")
print(f" • Average rate: {detection_count/(elapsed/60):.1f} detections/minute")
def test_api_health():
"""Test if the API is responding"""
url = f"{API_BASE_URL}/health"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
print("✅ API health check passed")
return True
else:
print(f"❌ API health check failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Cannot reach API: {e}")
return False
def main():
print("🚁 Drone Detection System - Test Script")
print("=" * 50)
# Test API connectivity
if not test_api_health():
print("Cannot connect to API. Please check if the system is running.")
return
print("\nChoose test mode:")
print("1. Send single test detection")
print("2. Simulate continuous detections")
print("3. Send burst of 10 detections")
try:
choice = input("\nEnter choice (1-3): ").strip()
if choice == "1":
test_single_detection()
elif choice == "2":
duration = input("Duration in minutes (default: 5): ").strip()
interval = input("Interval in seconds (default: 10): ").strip()
duration = int(duration) if duration else 5
interval = int(interval) if interval else 10
simulate_continuous_detections(duration, interval)
elif choice == "3":
print("🚀 Sending burst of 10 detections...")
success_count = 0
for i in range(10):
device = random.choice(DEVICES)
detection = generate_detection_data(device)
if send_detection(detection):
success_count += 1
time.sleep(1) # 1 second between detections
print(f"\n📊 Burst completed: {success_count}/10 detections sent successfully")
else:
print("Invalid choice")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
except ValueError:
print("Invalid input")
if __name__ == "__main__":
main()