Fix jwt-token
This commit is contained in:
@@ -18,11 +18,63 @@ warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
||||
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:3002/api')
|
||||
BASE_PATH = os.getenv('VITE_BASE_PATH', '').rstrip('/')
|
||||
|
||||
# Authentication configuration - Optional for local testing
|
||||
USERNAME = os.getenv('TEST_USERNAME', 'admin')
|
||||
PASSWORD = os.getenv('TEST_PASSWORD', 'admin123')
|
||||
SKIP_AUTH = os.getenv('SKIP_AUTH', 'false').lower() == 'true'
|
||||
|
||||
# If BASE_PATH is set, construct the full URL
|
||||
if BASE_PATH and not API_BASE_URL.endswith('/api'):
|
||||
domain = API_BASE_URL.replace('/api', '').replace('/drones/api', '').replace('/uggla/api', '')
|
||||
API_BASE_URL = f"{domain}{BASE_PATH}/api"
|
||||
|
||||
# Global variable to store authentication token
|
||||
AUTH_TOKEN = None
|
||||
|
||||
def authenticate():
|
||||
"""Authenticate with the API and get access token"""
|
||||
global AUTH_TOKEN
|
||||
|
||||
if SKIP_AUTH:
|
||||
print("🔓 Authentication skipped (SKIP_AUTH=true)")
|
||||
return True
|
||||
|
||||
try:
|
||||
login_data = {
|
||||
"username": USERNAME,
|
||||
"password": PASSWORD
|
||||
}
|
||||
|
||||
response = requests.post(f"{API_BASE_URL}/users/login", json=login_data, verify=False, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get('success') and 'data' in data and 'token' in data['data']:
|
||||
AUTH_TOKEN = data['data']['token']
|
||||
print(f"✅ Authentication successful")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ Invalid login response: {data}")
|
||||
return False
|
||||
else:
|
||||
print(f"❌ Authentication failed: HTTP {response.status_code}")
|
||||
print(f" Response: {response.text}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"❌ Authentication error: {e}")
|
||||
return False
|
||||
|
||||
def get_auth_headers():
|
||||
"""Get headers with authentication token"""
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
if not SKIP_AUTH and AUTH_TOKEN:
|
||||
headers['Authorization'] = f'Bearer {AUTH_TOKEN}'
|
||||
|
||||
return headers
|
||||
|
||||
def test_device_health_status():
|
||||
"""Test device health monitoring status"""
|
||||
print("🔍 Testing Device Health Monitoring System")
|
||||
@@ -91,7 +143,7 @@ def test_device_list():
|
||||
print("-" * 40)
|
||||
|
||||
try:
|
||||
response = requests.get(f"{API_BASE_URL}/devices", verify=False, timeout=10)
|
||||
response = requests.get(f"{API_BASE_URL}/devices", headers=get_auth_headers(), verify=False, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
@@ -137,7 +189,7 @@ def test_alert_rules():
|
||||
print("-" * 40)
|
||||
|
||||
try:
|
||||
response = requests.get(f"{API_BASE_URL}/alerts/rules", verify=False, timeout=10)
|
||||
response = requests.get(f"{API_BASE_URL}/alerts/rules", headers=get_auth_headers(), verify=False, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
@@ -181,6 +233,12 @@ def main():
|
||||
print(f"⏰ Test Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print()
|
||||
|
||||
# Authenticate first
|
||||
print("🔐 Authenticating...")
|
||||
if not authenticate():
|
||||
print("❌ Authentication failed, some tests may not work")
|
||||
print()
|
||||
|
||||
# Run all tests
|
||||
test_device_health_status()
|
||||
test_device_list()
|
||||
|
||||
Reference in New Issue
Block a user