From bf4d43f6f7728cf61a5e7aac77adc7ffe564f318 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 5 Aug 2025 15:58:51 +0300 Subject: [PATCH] debugging web ui --- .../multi-exchange-data-aggregation/tasks.md | 3 +- COBY/README.md | 49 +++++ COBY/api/rate_limiter.py | 64 ++++++- COBY/api/response_formatter.py | 135 +++++++++++++- COBY/api/rest_api.py | 121 +++++++++++- COBY/caching/redis_manager.py | 76 +++++++- COBY/monitoring/memory_monitor.py | 173 +++++++++++++----- COBY/web/static/index.html | 8 +- 8 files changed, 571 insertions(+), 58 deletions(-) diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index 2525316..5d3a6d5 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -98,7 +98,8 @@ - Create comprehensive API documentation - _Requirements: 4.1, 4.2, 4.4, 6.3_ -- [x] 9. Implement web dashboard for visualization +- [-] 9. Implement web dashboard for visualization + diff --git a/COBY/README.md b/COBY/README.md index 23d0653..87f309c 100644 --- a/COBY/README.md +++ b/COBY/README.md @@ -2,6 +2,55 @@ COBY (Cryptocurrency Order Book Yielder) is a comprehensive data collection and aggregation subsystem designed to serve as the foundational data layer for trading systems. It collects real-time order book and OHLCV data from multiple cryptocurrency exchanges, aggregates it into standardized formats, and provides both live data feeds and historical replay capabilities. + + +## Kickstart + +🌐 Web Dashboard Access: +URL: http://localhost:8080/ (same port as the API) + +The FastAPI application serves both: + +API endpoints at http://localhost:8080/api/... +Web dashboard at http://localhost:8080/ (root path) +📁 Dashboard Files: +The dashboard is served from static files located at: + +HTML: COBY/web/static/index.html +Static assets: COBY/web/static/ directory +🔧 How it's configured: +In COBY/api/rest_api.py, the FastAPI app mounts static files: + +# Mount static files for web dashboard (since we removed nginx) +static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static") +if os.path.exists(static_path): + app.mount("/static", StaticFiles(directory=static_path), name="static") + # Serve index.html at root for dashboard + app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard") +🚀 To access the dashboard: +Start the application: python COBY/main.py --debug +Open browser: Navigate to http://localhost:8080/ +API health check: http://localhost:8080/health +📊 Dashboard Features: +The dashboard (COBY/web/static/index.html) includes: + +System status monitoring +Exchange connection status +Performance metrics (CPU, memory, throughput, latency) +Real-time updates via WebSocket +Responsive design +🔌 WebSocket Connection: +The dashboard connects to WebSocket on port 8081 for real-time updates: + +WebSocket URL: ws://localhost:8081/dashboard +So to summarize: + +Web Dashboard: http://localhost:8080/ +API: http://localhost:8080/api/... +WebSocket: ws://localhost:8081/ + + + ## 🏗️ Architecture The system follows a modular architecture with clear separation of concerns: diff --git a/COBY/api/rate_limiter.py b/COBY/api/rate_limiter.py index 83074ef..75b3ce9 100644 --- a/COBY/api/rate_limiter.py +++ b/COBY/api/rate_limiter.py @@ -32,4 +32,66 @@ class RateLimiter: # Add current request self.requests[client_id].append(now) - return True \ No newline at end of file + return True + + def get_client_stats(self, client_id: str) -> Dict: + """Get rate limiting stats for a specific client""" + now = time.time() + minute_ago = now - 60 + + # Clean old requests + self.requests[client_id] = [ + req_time for req_time in self.requests[client_id] + if req_time > minute_ago + ] + + current_requests = len(self.requests[client_id]) + remaining_tokens = max(0, self.requests_per_minute - current_requests) + + # Calculate reset time (next minute boundary) + reset_time = int(now) + (60 - int(now) % 60) + + return { + 'client_id': client_id, + 'current_requests': current_requests, + 'remaining_tokens': remaining_tokens, + 'requests_per_minute': self.requests_per_minute, + 'reset_time': reset_time, + 'window_start': minute_ago, + 'window_end': now + } + + def get_global_stats(self) -> Dict: + """Get global rate limiting statistics""" + now = time.time() + minute_ago = now - 60 + + total_clients = len(self.requests) + total_requests = 0 + active_clients = 0 + + for client_id in list(self.requests.keys()): + # Clean old requests + self.requests[client_id] = [ + req_time for req_time in self.requests[client_id] + if req_time > minute_ago + ] + + client_requests = len(self.requests[client_id]) + total_requests += client_requests + + if client_requests > 0: + active_clients += 1 + + # Remove clients with no recent requests + if client_requests == 0: + del self.requests[client_id] + + return { + 'total_clients': total_clients, + 'active_clients': active_clients, + 'total_requests_last_minute': total_requests, + 'requests_per_minute_limit': self.requests_per_minute, + 'burst_size': self.burst_size, + 'window_duration': 60 + } \ No newline at end of file diff --git a/COBY/api/response_formatter.py b/COBY/api/response_formatter.py index 1d0a581..62837fa 100644 --- a/COBY/api/response_formatter.py +++ b/COBY/api/response_formatter.py @@ -9,17 +9,36 @@ from datetime import datetime class ResponseFormatter: """Format API responses consistently""" - def success(self, data: Any, message: str = "Success") -> Dict[str, Any]: + def __init__(self): + self.stats = { + 'responses_formatted': 0, + 'errors_formatted': 0, + 'success_responses': 0, + 'created_at': datetime.utcnow().isoformat() + } + + def success(self, data: Any, message: str = "Success", metadata: Optional[Dict] = None) -> Dict[str, Any]: """Format success response""" - return { + self.stats['responses_formatted'] += 1 + self.stats['success_responses'] += 1 + + response = { "status": "success", "message": message, "data": data, "timestamp": datetime.utcnow().isoformat() } + + if metadata: + response["metadata"] = metadata + + return response def error(self, message: str, code: str = "ERROR", details: Optional[Dict] = None) -> Dict[str, Any]: """Format error response""" + self.stats['responses_formatted'] += 1 + self.stats['errors_formatted'] += 1 + response = { "status": "error", "message": message, @@ -34,8 +53,120 @@ class ResponseFormatter: def health(self, healthy: bool = True, components: Optional[Dict] = None) -> Dict[str, Any]: """Format health check response""" + self.stats['responses_formatted'] += 1 + return { "status": "healthy" if healthy else "unhealthy", "timestamp": datetime.utcnow().isoformat(), "components": components or {} + } + + def rate_limit_error(self, client_stats: Dict) -> Dict[str, Any]: + """Format rate limit error response""" + self.stats['responses_formatted'] += 1 + self.stats['errors_formatted'] += 1 + + return { + "status": "error", + "message": "Rate limit exceeded", + "code": "RATE_LIMIT_EXCEEDED", + "timestamp": datetime.utcnow().isoformat(), + "details": { + "remaining_tokens": client_stats.get('remaining_tokens', 0), + "reset_time": client_stats.get('reset_time', 0), + "requests_per_minute": client_stats.get('requests_per_minute', 0) + } + } + + def validation_error(self, field: str, message: str) -> Dict[str, Any]: + """Format validation error response""" + self.stats['responses_formatted'] += 1 + self.stats['errors_formatted'] += 1 + + return { + "status": "error", + "message": f"Validation error: {message}", + "code": "VALIDATION_ERROR", + "timestamp": datetime.utcnow().isoformat(), + "details": { + "field": field, + "validation_message": message + } + } + + def status_response(self, data: Dict) -> Dict[str, Any]: + """Format status response""" + self.stats['responses_formatted'] += 1 + self.stats['success_responses'] += 1 + + return { + "status": "success", + "message": "System status", + "data": data, + "timestamp": datetime.utcnow().isoformat() + } + + def heatmap_response(self, heatmap_data: Any, symbol: str, exchange: Optional[str] = None) -> Dict[str, Any]: + """Format heatmap response""" + self.stats['responses_formatted'] += 1 + self.stats['success_responses'] += 1 + + if not heatmap_data: + return self.error("Heatmap data not found", "HEATMAP_NOT_FOUND") + + return { + "status": "success", + "message": f"Heatmap data for {symbol}", + "data": { + "symbol": symbol, + "exchange": exchange or "consolidated", + "heatmap": heatmap_data + }, + "timestamp": datetime.utcnow().isoformat() + } + + def orderbook_response(self, orderbook_data: Any, symbol: str, exchange: str) -> Dict[str, Any]: + """Format order book response""" + self.stats['responses_formatted'] += 1 + self.stats['success_responses'] += 1 + + if not orderbook_data: + return self.error("Order book data not found", "ORDERBOOK_NOT_FOUND") + + return { + "status": "success", + "message": f"Order book data for {symbol}@{exchange}", + "data": { + "symbol": symbol, + "exchange": exchange, + "orderbook": orderbook_data + }, + "timestamp": datetime.utcnow().isoformat() + } + + def metrics_response(self, metrics_data: Any, symbol: str, exchange: str) -> Dict[str, Any]: + """Format metrics response""" + self.stats['responses_formatted'] += 1 + self.stats['success_responses'] += 1 + + if not metrics_data: + return self.error("Metrics data not found", "METRICS_NOT_FOUND") + + return { + "status": "success", + "message": f"Metrics data for {symbol}@{exchange}", + "data": { + "symbol": symbol, + "exchange": exchange, + "metrics": metrics_data + }, + "timestamp": datetime.utcnow().isoformat() + } + + def get_stats(self) -> Dict[str, Any]: + """Get formatter statistics""" + return { + **self.stats, + 'uptime_seconds': (datetime.utcnow() - datetime.fromisoformat(self.stats['created_at'])).total_seconds(), + 'error_rate': self.stats['errors_formatted'] / max(1, self.stats['responses_formatted']) } \ No newline at end of file diff --git a/COBY/api/rest_api.py b/COBY/api/rest_api.py index f6d675e..69585b3 100644 --- a/COBY/api/rest_api.py +++ b/COBY/api/rest_api.py @@ -2,13 +2,15 @@ REST API server for COBY system. """ -from fastapi import FastAPI, HTTPException, Request, Query, Path +from fastapi import FastAPI, HTTPException, Request, Query, Path, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from typing import Optional, List import asyncio import os +import json +import time try: from ..simple_config import config from ..caching.redis_manager import redis_manager @@ -27,6 +29,43 @@ except ImportError: logger = get_logger(__name__) +class ConnectionManager: + """Manage WebSocket connections for dashboard updates""" + + def __init__(self): + self.active_connections: List[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") + + def disconnect(self, websocket: WebSocket): + if websocket in self.active_connections: + self.active_connections.remove(websocket) + logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") + + async def send_personal_message(self, message: str, websocket: WebSocket): + try: + await websocket.send_text(message) + except Exception as e: + logger.error(f"Error sending personal message: {e}") + self.disconnect(websocket) + + async def broadcast(self, message: str): + disconnected = [] + for connection in self.active_connections: + try: + await connection.send_text(message) + except Exception as e: + logger.error(f"Error broadcasting to connection: {e}") + disconnected.append(connection) + + # Remove disconnected clients + for connection in disconnected: + self.disconnect(connection) + + def create_app(config_obj=None) -> FastAPI: """Create and configure FastAPI application""" @@ -38,12 +77,7 @@ def create_app(config_obj=None) -> FastAPI: redoc_url="/redoc" ) - # Mount static files for web dashboard (since we removed nginx) - static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static") - if os.path.exists(static_path): - app.mount("/static", StaticFiles(directory=static_path), name="static") - # Serve index.html at root for dashboard - app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard") + # We'll mount static files AFTER defining all API routes to avoid conflicts # Add CORS middleware app.add_middleware( @@ -60,7 +94,49 @@ def create_app(config_obj=None) -> FastAPI: burst_size=20 ) response_formatter = ResponseFormatter() + connection_manager = ConnectionManager() + @app.websocket("/ws/dashboard") + async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for dashboard real-time updates""" + await connection_manager.connect(websocket) + try: + while True: + # Send periodic updates + await asyncio.sleep(5) # Update every 5 seconds + + # Gather system status + system_data = { + "timestamp": time.time(), + "performance": { + "cpu_usage": 25.5, # Stub data + "memory_usage": 45.2, + "throughput": 1250, + "avg_latency": 12.3 + }, + "exchanges": { + "binance": "connected", + "coinbase": "connected", + "kraken": "disconnected", + "bybit": "connected" + }, + "processing": { + "active": True, + "total_processed": 15420 + } + } + + await connection_manager.send_personal_message( + json.dumps(system_data), + websocket + ) + + except WebSocketDisconnect: + connection_manager.disconnect(websocket) + except Exception as e: + logger.error(f"WebSocket error: {e}") + connection_manager.disconnect(websocket) + @app.get("/health") async def health_check(): """Health check endpoint""" @@ -127,6 +203,30 @@ def create_app(config_obj=None) -> FastAPI: except Exception as e: logger.error(f"API server shutdown error: {e}") + # API Health check endpoint (for dashboard) + @app.get("/api/health") + async def api_health_check(): + """API Health check endpoint for dashboard""" + try: + # Check Redis connection + redis_healthy = await redis_manager.ping() + + health_data = { + 'status': 'healthy' if redis_healthy else 'degraded', + 'redis': 'connected' if redis_healthy else 'disconnected', + 'version': '1.0.0', + 'timestamp': time.time() + } + + return response_formatter.status_response(health_data) + + except Exception as e: + logger.error(f"Health check failed: {e}") + return JSONResponse( + status_code=503, + content=response_formatter.error("Service unavailable", "HEALTH_CHECK_FAILED") + ) + # Health check endpoint @app.get("/health") async def health_check(): @@ -415,6 +515,13 @@ def create_app(config_obj=None) -> FastAPI: content=response_formatter.error("Internal server error", "BATCH_HEATMAPS_ERROR") ) + # Mount static files for web dashboard AFTER all API routes are defined + static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static") + if os.path.exists(static_path): + app.mount("/static", StaticFiles(directory=static_path), name="static") + # Serve index.html at root for dashboard, but this should be last + app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard") + return app diff --git a/COBY/caching/redis_manager.py b/COBY/caching/redis_manager.py index 70c2c2b..24d30a4 100644 --- a/COBY/caching/redis_manager.py +++ b/COBY/caching/redis_manager.py @@ -3,7 +3,7 @@ Simple Redis manager stub. """ import logging -from typing import Any, Optional +from typing import Any, Optional, List, Dict logger = logging.getLogger(__name__) @@ -14,6 +14,12 @@ class RedisManager: def __init__(self): self.connected = False self.cache = {} # In-memory cache as fallback + self.stats = { + 'hits': 0, + 'misses': 0, + 'sets': 0, + 'deletes': 0 + } async def connect(self): """Connect to Redis (stub)""" @@ -28,22 +34,88 @@ class RedisManager: """Disconnect from Redis""" self.connected = False + async def close(self): + """Close Redis connection (alias for disconnect)""" + await self.disconnect() + def is_connected(self) -> bool: """Check if connected""" return self.connected + async def ping(self) -> bool: + """Ping Redis to check connection""" + return self.connected + async def set(self, key: str, value: Any, ttl: Optional[int] = None): """Set value in cache""" self.cache[key] = value + self.stats['sets'] += 1 logger.debug(f"Cached key: {key}") async def get(self, key: str) -> Optional[Any]: """Get value from cache""" - return self.cache.get(key) + value = self.cache.get(key) + if value is not None: + self.stats['hits'] += 1 + else: + self.stats['misses'] += 1 + return value async def delete(self, key: str): """Delete key from cache""" self.cache.pop(key, None) + self.stats['deletes'] += 1 + + async def keys(self, pattern: str) -> List[str]: + """Get keys matching pattern""" + if pattern.endswith('*'): + prefix = pattern[:-1] + return [key for key in self.cache.keys() if key.startswith(prefix)] + return [key for key in self.cache.keys() if key == pattern] + + async def get_heatmap(self, symbol: str, exchange: Optional[str] = None) -> Optional[Any]: + """Get heatmap data for symbol""" + key = f"heatmap:{symbol}:{exchange or 'consolidated'}" + return await self.get(key) + + async def get_orderbook(self, symbol: str, exchange: str) -> Optional[Any]: + """Get order book data for symbol on exchange""" + key = f"orderbook:{symbol}:{exchange}" + return await self.get(key) + + async def get_metrics(self, symbol: str, exchange: str) -> Optional[Any]: + """Get metrics data for symbol on exchange""" + key = f"metrics:{symbol}:{exchange}" + return await self.get(key) + + async def get_exchange_status(self, exchange: str) -> Optional[Any]: + """Get exchange status""" + key = f"st:{exchange}" + return await self.get(key) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics""" + total_requests = self.stats['hits'] + self.stats['misses'] + hit_rate = self.stats['hits'] / max(1, total_requests) + + return { + 'hits': self.stats['hits'], + 'misses': self.stats['misses'], + 'sets': self.stats['sets'], + 'deletes': self.stats['deletes'], + 'hit_rate': hit_rate, + 'total_keys': len(self.cache), + 'connected': self.connected + } + + async def health_check(self) -> Dict[str, Any]: + """Get Redis health status""" + return { + 'connected': self.connected, + 'total_keys': len(self.cache), + 'memory_usage': 'N/A (stub mode)', + 'uptime': 'N/A (stub mode)' + } # Global instance diff --git a/COBY/monitoring/memory_monitor.py b/COBY/monitoring/memory_monitor.py index e0ebaf2..d1089bc 100644 --- a/COBY/monitoring/memory_monitor.py +++ b/COBY/monitoring/memory_monitor.py @@ -52,7 +52,7 @@ class MemoryMonitor: Provides detailed memory analytics and automatic GC optimization. """ - def __init__(self, enable_tracemalloc: bool = True, snapshot_interval: float = 30.0): + def __init__(self, enable_tracemalloc: bool = False, snapshot_interval: float = 60.0): """ Initialize memory monitor. @@ -132,6 +132,15 @@ class MemoryMonitor: if self.auto_gc_enabled: self._optimize_gc() + # Periodic cleanup to prevent memory leaks in the monitor itself + if hasattr(self, '_cleanup_counter'): + self._cleanup_counter += 1 + else: + self._cleanup_counter = 1 + + if self._cleanup_counter % 10 == 0: # Every 10 cycles + self._cleanup_monitor_data() + time.sleep(self.snapshot_interval) except Exception as e: @@ -195,30 +204,47 @@ class MemoryMonitor: logger.error(f"Error taking memory snapshot: {e}") def _update_object_counts(self) -> None: - """Update object counts by type""" + """Update object counts by type (limited to prevent memory leaks)""" try: - # Count objects by type - object_counts = defaultdict(int) + # Only track specific object types to avoid creating too many objects + tracked_types = { + 'dict', 'list', 'tuple', 'str', 'function', 'type', + 'SystemMetrics', 'MetricPoint', 'MemorySnapshot' + } - for obj in gc.get_objects(): - obj_type = type(obj).__name__ - object_counts[obj_type] += 1 + # Count only tracked object types + object_counts = {} + all_objects = gc.get_objects() - # Store counts with timestamp + for tracked_type in tracked_types: + count = sum(1 for obj in all_objects if type(obj).__name__ == tracked_type) + if count > 0: + object_counts[tracked_type] = count + + # Store counts with timestamp (only for tracked types) timestamp = get_current_timestamp() for obj_type, count in object_counts.items(): self.object_counts[obj_type].append((timestamp, count)) - # Update metrics for common types + # Clean up old entries to prevent memory growth + for obj_type in list(self.object_counts.keys()): + if len(self.object_counts[obj_type]) > 50: # Keep only last 50 entries + # Remove oldest entries + while len(self.object_counts[obj_type]) > 50: + self.object_counts[obj_type].popleft() + + # Update metrics for common types (less frequently) try: from .metrics_collector import metrics_collector - common_types = ['dict', 'list', 'tuple', 'str', 'function', 'type'] - for obj_type in common_types: - if obj_type in object_counts: - metrics_collector.set_gauge( - f'memory_objects_{obj_type}', - object_counts[obj_type] - ) + # Only update metrics every 5th call to reduce object creation + if not hasattr(self, '_metrics_update_counter'): + self._metrics_update_counter = 0 + + self._metrics_update_counter += 1 + if self._metrics_update_counter % 5 == 0: + for obj_type, count in object_counts.items(): + metrics_collector.set_gauge(f'memory_objects_{obj_type}', count) + except ImportError: pass # Metrics collector not available @@ -226,22 +252,36 @@ class MemoryMonitor: logger.error(f"Error updating object counts: {e}") def _check_for_leaks(self) -> None: - """Check for potential memory leaks""" + """Check for potential memory leaks (less aggressive)""" try: - if len(self.memory_snapshots) < 10: - return # Need more data + if len(self.memory_snapshots) < 20: # Need more data for reliable detection + return - # Check for consistent memory growth - recent_snapshots = list(self.memory_snapshots)[-10:] + # Only check every 10th call to reduce overhead + if not hasattr(self, '_leak_check_counter'): + self._leak_check_counter = 0 + + self._leak_check_counter += 1 + if self._leak_check_counter % 10 != 0: + return + + # Check for consistent memory growth over longer period + recent_snapshots = list(self.memory_snapshots)[-20:] memory_values = [s.process_memory_mb for s in recent_snapshots] - # Simple linear regression to detect growth trend - if self._is_memory_growing(memory_values): + # More conservative growth detection + if self._is_memory_growing(memory_values, threshold=20.0): # Increased threshold # Check object count growth potential_leaks = self._analyze_object_growth() for leak in potential_leaks: - if leak not in self.detected_leaks: + # Check if we already reported this leak recently + existing_leak = next( + (l for l in self.detected_leaks if l.object_type == leak.object_type), + None + ) + + if not existing_leak and leak.severity in ['medium', 'high']: self.detected_leaks.append(leak) logger.warning(f"Potential memory leak detected: {leak.object_type}") @@ -252,6 +292,10 @@ class MemoryMonitor: except ImportError: pass + # Clean up old leak reports (keep only last 10) + if len(self.detected_leaks) > 10: + self.detected_leaks = self.detected_leaks[-10:] + except Exception as e: logger.error(f"Error checking for leaks: {e}") @@ -265,42 +309,53 @@ class MemoryMonitor: return growth > threshold def _analyze_object_growth(self) -> List[MemoryLeak]: - """Analyze object count growth to identify potential leaks""" + """Analyze object count growth to identify potential leaks (more conservative)""" leaks = [] for obj_type, counts in self.object_counts.items(): - if len(counts) < 10: + if len(counts) < 20: # Need more data points continue - # Get recent counts - recent_counts = list(counts)[-10:] + # Get recent counts over longer period + recent_counts = list(counts)[-20:] timestamps = [item[0] for item in recent_counts] count_values = [item[1] for item in recent_counts] - # Check for growth - if len(count_values) >= 2: - growth = count_values[-1] - count_values[0] + # Check for sustained growth + if len(count_values) >= 10: + # Calculate growth over the period + start_avg = sum(count_values[:5]) / 5 # Average of first 5 values + end_avg = sum(count_values[-5:]) / 5 # Average of last 5 values + growth = end_avg - start_avg + time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours - if growth > 100 and time_diff > 0: # More than 100 objects growth + # More conservative thresholds + if growth > 500 and time_diff > 0.5: # More than 500 objects growth over 30+ minutes growth_rate = growth / time_diff - # Determine severity - if growth_rate > 1000: + # Skip common types that naturally fluctuate + if obj_type in ['dict', 'list', 'tuple', 'str']: + continue + + # Determine severity with higher thresholds + if growth_rate > 2000: severity = 'high' - elif growth_rate > 100: + elif growth_rate > 500: severity = 'medium' else: severity = 'low' - leak = MemoryLeak( - object_type=obj_type, - count_increase=growth, - size_increase_mb=growth * 0.001, # Rough estimate - growth_rate_per_hour=growth_rate, - severity=severity - ) - leaks.append(leak) + # Only report medium and high severity leaks + if severity in ['medium', 'high']: + leak = MemoryLeak( + object_type=obj_type, + count_increase=int(growth), + size_increase_mb=growth * 0.001, # Rough estimate + growth_rate_per_hour=growth_rate, + severity=severity + ) + leaks.append(leak) return leaks @@ -346,6 +401,38 @@ class MemoryMonitor: except Exception as e: logger.error(f"Error optimizing GC: {e}") + def _cleanup_monitor_data(self) -> None: + """Clean up monitor data to prevent memory leaks""" + try: + # Limit memory snapshots + if len(self.memory_snapshots) > 500: + # Keep only the most recent 300 snapshots + while len(self.memory_snapshots) > 300: + self.memory_snapshots.popleft() + + # Clean up object counts + for obj_type in list(self.object_counts.keys()): + if len(self.object_counts[obj_type]) > 30: + # Keep only the most recent 20 entries + while len(self.object_counts[obj_type]) > 20: + self.object_counts[obj_type].popleft() + + # Remove empty deques + if len(self.object_counts[obj_type]) == 0: + del self.object_counts[obj_type] + + # Limit detected leaks + if len(self.detected_leaks) > 5: + self.detected_leaks = self.detected_leaks[-5:] + + # Force a small garbage collection + gc.collect() + + logger.debug("Cleaned up memory monitor data") + + except Exception as e: + logger.error(f"Error cleaning up monitor data: {e}") + def force_garbage_collection(self) -> Dict[str, int]: """Force garbage collection and return statistics""" try: diff --git a/COBY/web/static/index.html b/COBY/web/static/index.html index 152f218..b7ab135 100644 --- a/COBY/web/static/index.html +++ b/COBY/web/static/index.html @@ -267,7 +267,9 @@ async function loadInitialData() { try { - const response = await fetch('/api/health'); + // Use port 8080 for API calls since that's where our API runs + const apiUrl = `http://${window.location.hostname}:8080/api/health`; + const response = await fetch(apiUrl); if (response.ok) { const data = await response.json(); updateSystemStatus(data); @@ -283,7 +285,9 @@ function connectWebSocket() { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const wsUrl = `${protocol}//${window.location.host}/ws/dashboard`; + // Use port 8080 for WebSocket connection since that's where our API runs + const host = window.location.hostname; + const wsUrl = `${protocol}//${host}:8080/ws/dashboard`; ws = new WebSocket(wsUrl);