/** * Data Retention Service * Standalone microservice for automated data cleanup */ const cron = require('node-cron'); const { Op } = require('sequelize'); const http = require('http'); const url = require('url'); // Initialize database connection const { initializeDatabase, getModels } = require('./database'); class DataRetentionService { constructor() { this.isRunning = false; this.lastCleanup = null; this.cleanupStats = { totalRuns: 0, totalDetectionsDeleted: 0, totalHeartbeatsDeleted: 0, totalLogsDeleted: 0, lastRunDuration: 0, errors: [] }; } /** * Start the data retention cleanup service */ async start() { console.log('๐Ÿ—‚๏ธ Starting Data Retention Service...'); console.log(`๐Ÿ“… Environment: ${process.env.NODE_ENV || 'development'}`); console.log(`๐Ÿ’พ Database: ${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`); try { // Initialize database connection await initializeDatabase(); console.log('โœ… Database connection established'); // Schedule daily cleanup at 2:00 AM UTC cron.schedule('0 2 * * *', async () => { await this.performCleanup(); }, { scheduled: true, timezone: "UTC" }); console.log('โฐ Scheduled cleanup: Daily at 2:00 AM UTC'); // Start metrics HTTP server this.startMetricsServer(); // Run immediate cleanup in development or if IMMEDIATE_CLEANUP is set if (process.env.NODE_ENV === 'development' || process.env.IMMEDIATE_CLEANUP === 'true') { console.log('๐Ÿงน Running immediate cleanup...'); setTimeout(() => this.performCleanup(), 5000); } // Health check endpoint simulation setInterval(() => { this.logHealthStatus(); }, 60000); // Every minute console.log('โœ… Data Retention Service started successfully'); } catch (error) { console.error('โŒ Failed to start Data Retention Service:', error); process.exit(1); } } /** * Perform cleanup for all tenants */ async performCleanup() { if (this.isRunning) { console.log('โณ Data retention cleanup already running, skipping...'); return; } this.isRunning = true; const startTime = Date.now(); try { console.log('๐Ÿงน Starting data retention cleanup...'); console.log(`โฐ Cleanup started at: ${new Date().toISOString()}`); const { Tenant, DroneDetection, Heartbeat, SecurityLog } = await getModels(); // Get all active tenants with their retention policies const tenants = await Tenant.findAll({ attributes: ['id', 'slug', 'features'], where: { is_active: true } }); console.log(`๐Ÿข Found ${tenants.length} active tenants to process`); let totalDetectionsDeleted = 0; let totalHeartbeatsDeleted = 0; let totalLogsDeleted = 0; let errors = []; for (const tenant of tenants) { try { const result = await this.cleanupTenant(tenant); totalDetectionsDeleted += result.detections; totalHeartbeatsDeleted += result.heartbeats; totalLogsDeleted += result.logs; } catch (error) { console.error(`โŒ Error cleaning tenant ${tenant.slug}:`, error); errors.push({ tenantSlug: tenant.slug, error: error.message, timestamp: new Date().toISOString() }); } } const duration = Date.now() - startTime; this.lastCleanup = new Date(); this.cleanupStats.totalRuns++; this.cleanupStats.totalDetectionsDeleted += totalDetectionsDeleted; this.cleanupStats.totalHeartbeatsDeleted += totalHeartbeatsDeleted; this.cleanupStats.totalLogsDeleted += totalLogsDeleted; this.cleanupStats.lastRunDuration = duration; this.cleanupStats.errors = errors; console.log('โœ… Data retention cleanup completed'); console.log(`โฑ๏ธ Duration: ${duration}ms`); console.log(`๐Ÿ“Š Deleted: ${totalDetectionsDeleted} detections, ${totalHeartbeatsDeleted} heartbeats, ${totalLogsDeleted} logs`); if (errors.length > 0) { console.log(`โš ๏ธ Errors encountered: ${errors.length}`); errors.forEach(err => console.log(` - ${err.tenantSlug}: ${err.error}`)); } } catch (error) { console.error('โŒ Data retention cleanup failed:', error); this.cleanupStats.errors.push({ error: error.message, timestamp: new Date().toISOString() }); } finally { this.isRunning = false; } } /** * Clean up data for a specific tenant */ async cleanupTenant(tenant) { const retentionDays = tenant.features?.data_retention_days; // Skip if unlimited retention (-1) if (retentionDays === -1) { console.log(`โญ๏ธ Skipping tenant ${tenant.slug} - unlimited retention`); return { detections: 0, heartbeats: 0, logs: 0 }; } // Default to 90 days if not specified const effectiveRetentionDays = retentionDays || 90; const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - effectiveRetentionDays); console.log(`๐Ÿงน Cleaning tenant ${tenant.slug} - removing data older than ${effectiveRetentionDays} days (before ${cutoffDate.toISOString()})`); const { DroneDetection, Heartbeat, SecurityLog } = await getModels(); // Clean up drone detections const deletedDetections = await DroneDetection.destroy({ where: { tenant_id: tenant.id, timestamp: { [Op.lt]: cutoffDate } } }); // Clean up heartbeats const deletedHeartbeats = await Heartbeat.destroy({ where: { tenant_id: tenant.id, timestamp: { [Op.lt]: cutoffDate } } }); // Clean up security logs (if they have tenant_id) let deletedLogs = 0; try { deletedLogs = await SecurityLog.destroy({ where: { tenant_id: tenant.id, timestamp: { [Op.lt]: cutoffDate } } }); } catch (error) { // SecurityLog might not have tenant_id field console.log(`โš ๏ธ Skipping security logs for tenant ${tenant.slug}: ${error.message}`); } console.log(`โœ… Tenant ${tenant.slug}: Deleted ${deletedDetections} detections, ${deletedHeartbeats} heartbeats, ${deletedLogs} logs`); return { detections: deletedDetections, heartbeats: deletedHeartbeats, logs: deletedLogs }; } /** * Log health status */ logHealthStatus() { const memUsage = process.memoryUsage(); const uptime = process.uptime(); console.log(`๐Ÿ’š Health Check - Uptime: ${Math.floor(uptime)}s, Memory: ${Math.round(memUsage.heapUsed / 1024 / 1024)}MB, Last Cleanup: ${this.lastCleanup ? this.lastCleanup.toISOString() : 'Never'}`); } /** * Get service statistics */ getStats() { return { ...this.cleanupStats, isRunning: this.isRunning, lastCleanup: this.lastCleanup, uptime: process.uptime(), memoryUsage: process.memoryUsage(), nextScheduledRun: '2:00 AM UTC daily' }; } /** * Get detailed metrics for dashboard */ getMetrics() { const uptime = Math.floor(process.uptime()); const memoryUsage = process.memoryUsage(); return { service: { name: 'data-retention-service', version: '1.0.0', status: 'running', uptime: uptime, uptimeFormatted: this.formatUptime(uptime) }, performance: { memoryUsage: { heapUsed: Math.round(memoryUsage.heapUsed / 1024 / 1024), heapTotal: Math.round(memoryUsage.heapTotal / 1024 / 1024), external: Math.round(memoryUsage.external / 1024 / 1024), rss: Math.round(memoryUsage.rss / 1024 / 1024) }, cpuUsage: process.cpuUsage() }, cleanup: { lastRun: this.lastCleanup, lastRunFormatted: this.lastCleanup ? new Date(this.lastCleanup).toLocaleString() : null, isCurrentlyRunning: this.isRunning, nextScheduledRun: '2:00 AM UTC daily', stats: this.cleanupStats }, schedule: { cronExpression: '0 2 * * *', timezone: 'UTC', description: 'Daily cleanup at 2:00 AM UTC' } }; } /** * Format uptime in human readable format */ formatUptime(seconds) { const days = Math.floor(seconds / 86400); const hours = Math.floor((seconds % 86400) / 3600); const minutes = Math.floor((seconds % 3600) / 60); const secs = seconds % 60; if (days > 0) { return `${days}d ${hours}h ${minutes}m ${secs}s`; } else if (hours > 0) { return `${hours}h ${minutes}m ${secs}s`; } else if (minutes > 0) { return `${minutes}m ${secs}s`; } else { return `${secs}s`; } } /** * Start HTTP server for metrics endpoint */ startMetricsServer() { const port = process.env.METRICS_PORT || 3001; const server = http.createServer((req, res) => { const parsedUrl = url.parse(req.url, true); // Set CORS headers res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); res.setHeader('Content-Type', 'application/json'); if (req.method === 'OPTIONS') { res.writeHead(200); res.end(); return; } if (req.method === 'GET') { if (parsedUrl.pathname === '/metrics') { // Detailed metrics for dashboard res.writeHead(200); res.end(JSON.stringify(this.getMetrics(), null, 2)); } else if (parsedUrl.pathname === '/health') { // Simple health check res.writeHead(200); res.end(JSON.stringify({ status: 'healthy', uptime: Math.floor(process.uptime()), lastCleanup: this.lastCleanup, isRunning: this.isRunning }, null, 2)); } else if (parsedUrl.pathname === '/stats') { // Basic stats res.writeHead(200); res.end(JSON.stringify(this.getStats(), null, 2)); } else { res.writeHead(404); res.end(JSON.stringify({ error: 'Not found' })); } } else { res.writeHead(405); res.end(JSON.stringify({ error: 'Method not allowed' })); } }); server.listen(port, '0.0.0.0', () => { console.log(`๐Ÿ“Š Metrics server listening on internal port ${port}`); console.log(`๐Ÿ“Š Endpoints: /health, /metrics, /stats`); console.log(`๐Ÿ”’ Access restricted to Docker internal network only`); }); return server; } /** * Graceful shutdown */ async shutdown() { console.log('๐Ÿ”„ Graceful shutdown initiated...'); // Wait for current cleanup to finish while (this.isRunning) { console.log('โณ Waiting for cleanup to finish...'); await new Promise(resolve => setTimeout(resolve, 1000)); } console.log('โœ… Data Retention Service stopped'); process.exit(0); } } // Initialize and start the service const service = new DataRetentionService(); // Handle graceful shutdown process.on('SIGTERM', () => service.shutdown()); process.on('SIGINT', () => service.shutdown()); // Start the service service.start().catch(error => { console.error('Failed to start service:', error); process.exit(1); }); module.exports = DataRetentionService;