268 lines
8.9 KiB
Python
268 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced Drone Detection Test Script
|
|
Simulates drone detection data with configurable drone ID and other parameters
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import sys
|
|
import os
|
|
import time
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
# Configuration
|
|
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
|
DEVICE_ID = int(os.getenv('DEVICE_ID', '1941875381'))
|
|
|
|
# Available drone types
|
|
DRONE_TYPES = {
|
|
0: "None",
|
|
1: "Unknown",
|
|
2: "Orlan",
|
|
3: "Zala",
|
|
4: "Eleron",
|
|
5: "ZalaLancet",
|
|
6: "Lancet",
|
|
7: "FPV_CrossFire",
|
|
8: "FPV_ELRS",
|
|
9: "MaybeOrlan",
|
|
10: "MaybeZala",
|
|
11: "MaybeLancet",
|
|
12: "MaybeEleron",
|
|
13: "DJI",
|
|
14: "Supercam",
|
|
15: "MaybeSupercam",
|
|
16: "REB",
|
|
17: "CryptoOrlan",
|
|
18: "DJIe"
|
|
}
|
|
|
|
def show_drone_types():
|
|
"""Display available drone types"""
|
|
print("\n==================================================")
|
|
print("🚁 AVAILABLE DRONE TYPES")
|
|
print("==================================================")
|
|
|
|
categories = {
|
|
"🎖️ MILITARY DRONES": [2, 3, 4, 5, 6, 17],
|
|
"🏢 COMMERCIAL DRONES": [13, 14, 15, 18],
|
|
"🎮 FPV DRONES": [7, 8],
|
|
"❓ UNCERTAIN TYPES": [9, 10, 11, 12, 15],
|
|
"🔧 OTHER": [0, 1, 16]
|
|
}
|
|
|
|
for category, drone_ids in categories.items():
|
|
print(f"\n{category}:")
|
|
for drone_id in drone_ids:
|
|
if drone_id in DRONE_TYPES:
|
|
print(f" {drone_id:2d}: {DRONE_TYPES[drone_id]}")
|
|
|
|
def send_detection(drone_type=2, drone_id=None, geo_lat=0, geo_lon=0, rssi=-45, freq=2400, device_id=None, show_response=True):
|
|
"""
|
|
Send a drone detection packet
|
|
|
|
Args:
|
|
drone_type: Type of drone (0-18)
|
|
drone_id: Specific drone ID to simulate (auto-generated if None)
|
|
geo_lat: Latitude coordinate
|
|
geo_lon: Longitude coordinate
|
|
rssi: Signal strength
|
|
freq: Frequency
|
|
device_id: Device ID to use (uses global DEVICE_ID if None)
|
|
show_response: Whether to print the response
|
|
"""
|
|
|
|
# Use provided device_id or fall back to global DEVICE_ID
|
|
if device_id is None:
|
|
device_id = DEVICE_ID
|
|
else:
|
|
# Convert to int if it's a numeric string
|
|
try:
|
|
device_id = int(device_id) if device_id.isdigit() else device_id
|
|
except (AttributeError, ValueError):
|
|
pass # Keep as-is if conversion fails
|
|
|
|
# Generate drone_id if not specified
|
|
if drone_id is None:
|
|
drone_id = int(time.time()) % 100000 # Use timestamp for uniqueness
|
|
|
|
# Create detection packet matching your original format
|
|
packet = {
|
|
"device_id": device_id,
|
|
"geo_lat": float(geo_lat),
|
|
"geo_lon": float(geo_lon),
|
|
"device_timestamp": int(time.time()),
|
|
"drone_type": drone_type,
|
|
"rssi": float(rssi),
|
|
"freq": freq,
|
|
"drone_id": drone_id
|
|
}
|
|
|
|
drone_name = DRONE_TYPES.get(drone_type, f"Unknown({drone_type})")
|
|
|
|
if show_response:
|
|
print(f"📡 Sending detection:")
|
|
print(f" Drone Type: {drone_type} ({drone_name})")
|
|
print(f" Drone ID: {drone_id}")
|
|
print(f" Device ID: {device_id}")
|
|
print(f" Location: ({geo_lat}, {geo_lon})")
|
|
print(f" RSSI: {rssi}, Freq: {freq}")
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{API_BASE_URL}/detectors",
|
|
headers={'Content-Type': 'application/json'},
|
|
json=packet,
|
|
timeout=10
|
|
)
|
|
|
|
if show_response:
|
|
if response.status_code in [200, 201]:
|
|
print(f"✅ Detection sent successfully (Status: {response.status_code})")
|
|
try:
|
|
data = response.json()
|
|
if data.get('success'):
|
|
print(f" Response: {data.get('message', 'OK')}")
|
|
if 'data' in data and 'id' in data['data']:
|
|
print(f" Detection ID: {data['data']['id']}")
|
|
if 'data' in data and 'threat_level' in data['data']:
|
|
print(f" Threat Level: {data['data']['threat_level']}")
|
|
else:
|
|
print(f" Response: {data}")
|
|
except:
|
|
print(f" Raw response: {response.text}")
|
|
else:
|
|
print(f"❌ Detection failed: {response.status_code}")
|
|
print(f" Response: {response.text}")
|
|
|
|
return response.status_code in [200, 201]
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if show_response:
|
|
print(f"❌ Connection error: {e}")
|
|
return False
|
|
|
|
def simulate_drone_approach(drone_type=2, drone_id=None, steps=10):
|
|
"""
|
|
Simulate a drone approaching from distance
|
|
|
|
Args:
|
|
drone_type: Type of drone to simulate
|
|
drone_id: Specific drone ID (auto-generated if None)
|
|
steps: Number of detection steps
|
|
"""
|
|
|
|
if drone_id is None:
|
|
drone_id = int(time.time()) % 100000
|
|
|
|
drone_name = DRONE_TYPES.get(drone_type, f"Unknown({drone_type})")
|
|
|
|
print("\n" + "="*70)
|
|
print(f"🎯 DRONE APPROACH SIMULATION")
|
|
print("="*70)
|
|
print(f"Drone Type: {drone_type} ({drone_name})")
|
|
print(f"Drone ID: {drone_id}")
|
|
print(f"Device ID: {DEVICE_ID}")
|
|
print(f"Steps: {steps}")
|
|
print("="*70)
|
|
|
|
# Simulate approach from 50km to 0km
|
|
distances = [50000 - (i * 50000 / steps) for i in range(steps)]
|
|
|
|
for i, distance in enumerate(distances):
|
|
# Calculate RSSI based on distance (closer = stronger signal)
|
|
rssi = -80 + (50000 - distance) / 1000 # Stronger signal as distance decreases
|
|
rssi = max(rssi, -100) # Cap at -100dBm
|
|
|
|
# Calculate coordinates (simulate movement)
|
|
lat = distance / 111000 # Rough conversion to degrees
|
|
lon = 0
|
|
|
|
print(f"\n📍 Step {i+1}/{steps}: Distance {distance:.0f}m")
|
|
|
|
success = send_detection(
|
|
drone_type=drone_type,
|
|
drone_id=drone_id,
|
|
geo_lat=lat,
|
|
geo_lon=lon,
|
|
rssi=rssi,
|
|
freq=2400,
|
|
show_response=True
|
|
)
|
|
|
|
if not success:
|
|
print("❌ Failed to send detection, stopping simulation")
|
|
break
|
|
|
|
# Wait between detections
|
|
if i < len(distances) - 1:
|
|
time.sleep(1)
|
|
|
|
print("\n✅ Simulation completed!")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Enhanced Drone Detection Test Script')
|
|
parser.add_argument('--drone-type', '-t', type=int, default=2, choices=list(DRONE_TYPES.keys()),
|
|
help='Drone type (0-18), default: 2 (Orlan)')
|
|
parser.add_argument('--drone-id', '-d', type=int, default=None,
|
|
help='Specific drone ID to simulate (auto-generated if not specified)')
|
|
parser.add_argument('--device-id', type=str, default='1941875381',
|
|
help='Device ID to use for detection (default: 1941875381)')
|
|
parser.add_argument('--simulate', '-s', action='store_true',
|
|
help='Run approach simulation instead of single detection')
|
|
parser.add_argument('--steps', type=int, default=10,
|
|
help='Number of simulation steps (default: 10)')
|
|
parser.add_argument('--list-types', '-l', action='store_true',
|
|
help='List available drone types and exit')
|
|
parser.add_argument('--lat', type=float, default=0,
|
|
help='Latitude coordinate (default: 0)')
|
|
parser.add_argument('--lon', type=float, default=0,
|
|
help='Longitude coordinate (default: 0)')
|
|
parser.add_argument('--rssi', type=float, default=-45,
|
|
help='Signal strength (default: -45)')
|
|
parser.add_argument('--freq', type=int, default=2400,
|
|
help='Frequency (default: 2400)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
print("="*70)
|
|
print("🎯 ENHANCED DRONE DETECTION TEST")
|
|
print("="*70)
|
|
print(f"API URL: {API_BASE_URL}")
|
|
print(f"Device ID: {args.device_id}")
|
|
print("="*70)
|
|
|
|
if args.list_types:
|
|
show_drone_types()
|
|
return
|
|
|
|
# Validate drone type
|
|
if args.drone_type not in DRONE_TYPES:
|
|
print(f"❌ Invalid drone type: {args.drone_type}")
|
|
show_drone_types()
|
|
return
|
|
|
|
if args.simulate:
|
|
simulate_drone_approach(
|
|
drone_type=args.drone_type,
|
|
drone_id=args.drone_id,
|
|
device_id=args.device_id,
|
|
steps=args.steps
|
|
)
|
|
else:
|
|
print(f"\n🚁 Single Detection Test")
|
|
print("-" * 30)
|
|
success = send_detection(
|
|
drone_type=args.drone_type,
|
|
drone_id=args.drone_id,
|
|
geo_lat=args.lat,
|
|
geo_lon=args.lon,
|
|
rssi=args.rssi,
|
|
freq=args.freq,
|
|
device_id=args.device_id
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main() |