debugging web ui

This commit is contained in:
Dobromir Popov
2025-08-05 15:58:51 +03:00
parent 622d059aae
commit bf4d43f6f7
8 changed files with 571 additions and 58 deletions

View File

@ -98,7 +98,8 @@
- Create comprehensive API documentation - Create comprehensive API documentation
- _Requirements: 4.1, 4.2, 4.4, 6.3_ - _Requirements: 4.1, 4.2, 4.4, 6.3_
- [x] 9. Implement web dashboard for visualization - [-] 9. Implement web dashboard for visualization

View File

@ -2,6 +2,55 @@
COBY (Cryptocurrency Order Book Yielder) is a comprehensive data collection and aggregation subsystem designed to serve as the foundational data layer for trading systems. It collects real-time order book and OHLCV data from multiple cryptocurrency exchanges, aggregates it into standardized formats, and provides both live data feeds and historical replay capabilities. COBY (Cryptocurrency Order Book Yielder) is a comprehensive data collection and aggregation subsystem designed to serve as the foundational data layer for trading systems. It collects real-time order book and OHLCV data from multiple cryptocurrency exchanges, aggregates it into standardized formats, and provides both live data feeds and historical replay capabilities.
## Kickstart
🌐 Web Dashboard Access:
URL: http://localhost:8080/ (same port as the API)
The FastAPI application serves both:
API endpoints at http://localhost:8080/api/...
Web dashboard at http://localhost:8080/ (root path)
📁 Dashboard Files:
The dashboard is served from static files located at:
HTML: COBY/web/static/index.html
Static assets: COBY/web/static/ directory
🔧 How it's configured:
In COBY/api/rest_api.py, the FastAPI app mounts static files:
# Mount static files for web dashboard (since we removed nginx)
static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
# Serve index.html at root for dashboard
app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard")
🚀 To access the dashboard:
Start the application: python COBY/main.py --debug
Open browser: Navigate to http://localhost:8080/
API health check: http://localhost:8080/health
📊 Dashboard Features:
The dashboard (COBY/web/static/index.html) includes:
System status monitoring
Exchange connection status
Performance metrics (CPU, memory, throughput, latency)
Real-time updates via WebSocket
Responsive design
🔌 WebSocket Connection:
The dashboard connects to WebSocket on port 8081 for real-time updates:
WebSocket URL: ws://localhost:8081/dashboard
So to summarize:
Web Dashboard: http://localhost:8080/
API: http://localhost:8080/api/...
WebSocket: ws://localhost:8081/
## 🏗️ Architecture ## 🏗️ Architecture
The system follows a modular architecture with clear separation of concerns: The system follows a modular architecture with clear separation of concerns:

View File

@ -32,4 +32,66 @@ class RateLimiter:
# Add current request # Add current request
self.requests[client_id].append(now) self.requests[client_id].append(now)
return True return True
def get_client_stats(self, client_id: str) -> Dict:
"""Get rate limiting stats for a specific client"""
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
current_requests = len(self.requests[client_id])
remaining_tokens = max(0, self.requests_per_minute - current_requests)
# Calculate reset time (next minute boundary)
reset_time = int(now) + (60 - int(now) % 60)
return {
'client_id': client_id,
'current_requests': current_requests,
'remaining_tokens': remaining_tokens,
'requests_per_minute': self.requests_per_minute,
'reset_time': reset_time,
'window_start': minute_ago,
'window_end': now
}
def get_global_stats(self) -> Dict:
"""Get global rate limiting statistics"""
now = time.time()
minute_ago = now - 60
total_clients = len(self.requests)
total_requests = 0
active_clients = 0
for client_id in list(self.requests.keys()):
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > minute_ago
]
client_requests = len(self.requests[client_id])
total_requests += client_requests
if client_requests > 0:
active_clients += 1
# Remove clients with no recent requests
if client_requests == 0:
del self.requests[client_id]
return {
'total_clients': total_clients,
'active_clients': active_clients,
'total_requests_last_minute': total_requests,
'requests_per_minute_limit': self.requests_per_minute,
'burst_size': self.burst_size,
'window_duration': 60
}

View File

@ -9,17 +9,36 @@ from datetime import datetime
class ResponseFormatter: class ResponseFormatter:
"""Format API responses consistently""" """Format API responses consistently"""
def success(self, data: Any, message: str = "Success") -> Dict[str, Any]: def __init__(self):
self.stats = {
'responses_formatted': 0,
'errors_formatted': 0,
'success_responses': 0,
'created_at': datetime.utcnow().isoformat()
}
def success(self, data: Any, message: str = "Success", metadata: Optional[Dict] = None) -> Dict[str, Any]:
"""Format success response""" """Format success response"""
return { self.stats['responses_formatted'] += 1
self.stats['success_responses'] += 1
response = {
"status": "success", "status": "success",
"message": message, "message": message,
"data": data, "data": data,
"timestamp": datetime.utcnow().isoformat() "timestamp": datetime.utcnow().isoformat()
} }
if metadata:
response["metadata"] = metadata
return response
def error(self, message: str, code: str = "ERROR", details: Optional[Dict] = None) -> Dict[str, Any]: def error(self, message: str, code: str = "ERROR", details: Optional[Dict] = None) -> Dict[str, Any]:
"""Format error response""" """Format error response"""
self.stats['responses_formatted'] += 1
self.stats['errors_formatted'] += 1
response = { response = {
"status": "error", "status": "error",
"message": message, "message": message,
@ -34,8 +53,120 @@ class ResponseFormatter:
def health(self, healthy: bool = True, components: Optional[Dict] = None) -> Dict[str, Any]: def health(self, healthy: bool = True, components: Optional[Dict] = None) -> Dict[str, Any]:
"""Format health check response""" """Format health check response"""
self.stats['responses_formatted'] += 1
return { return {
"status": "healthy" if healthy else "unhealthy", "status": "healthy" if healthy else "unhealthy",
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.utcnow().isoformat(),
"components": components or {} "components": components or {}
}
def rate_limit_error(self, client_stats: Dict) -> Dict[str, Any]:
"""Format rate limit error response"""
self.stats['responses_formatted'] += 1
self.stats['errors_formatted'] += 1
return {
"status": "error",
"message": "Rate limit exceeded",
"code": "RATE_LIMIT_EXCEEDED",
"timestamp": datetime.utcnow().isoformat(),
"details": {
"remaining_tokens": client_stats.get('remaining_tokens', 0),
"reset_time": client_stats.get('reset_time', 0),
"requests_per_minute": client_stats.get('requests_per_minute', 0)
}
}
def validation_error(self, field: str, message: str) -> Dict[str, Any]:
"""Format validation error response"""
self.stats['responses_formatted'] += 1
self.stats['errors_formatted'] += 1
return {
"status": "error",
"message": f"Validation error: {message}",
"code": "VALIDATION_ERROR",
"timestamp": datetime.utcnow().isoformat(),
"details": {
"field": field,
"validation_message": message
}
}
def status_response(self, data: Dict) -> Dict[str, Any]:
"""Format status response"""
self.stats['responses_formatted'] += 1
self.stats['success_responses'] += 1
return {
"status": "success",
"message": "System status",
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
def heatmap_response(self, heatmap_data: Any, symbol: str, exchange: Optional[str] = None) -> Dict[str, Any]:
"""Format heatmap response"""
self.stats['responses_formatted'] += 1
self.stats['success_responses'] += 1
if not heatmap_data:
return self.error("Heatmap data not found", "HEATMAP_NOT_FOUND")
return {
"status": "success",
"message": f"Heatmap data for {symbol}",
"data": {
"symbol": symbol,
"exchange": exchange or "consolidated",
"heatmap": heatmap_data
},
"timestamp": datetime.utcnow().isoformat()
}
def orderbook_response(self, orderbook_data: Any, symbol: str, exchange: str) -> Dict[str, Any]:
"""Format order book response"""
self.stats['responses_formatted'] += 1
self.stats['success_responses'] += 1
if not orderbook_data:
return self.error("Order book data not found", "ORDERBOOK_NOT_FOUND")
return {
"status": "success",
"message": f"Order book data for {symbol}@{exchange}",
"data": {
"symbol": symbol,
"exchange": exchange,
"orderbook": orderbook_data
},
"timestamp": datetime.utcnow().isoformat()
}
def metrics_response(self, metrics_data: Any, symbol: str, exchange: str) -> Dict[str, Any]:
"""Format metrics response"""
self.stats['responses_formatted'] += 1
self.stats['success_responses'] += 1
if not metrics_data:
return self.error("Metrics data not found", "METRICS_NOT_FOUND")
return {
"status": "success",
"message": f"Metrics data for {symbol}@{exchange}",
"data": {
"symbol": symbol,
"exchange": exchange,
"metrics": metrics_data
},
"timestamp": datetime.utcnow().isoformat()
}
def get_stats(self) -> Dict[str, Any]:
"""Get formatter statistics"""
return {
**self.stats,
'uptime_seconds': (datetime.utcnow() - datetime.fromisoformat(self.stats['created_at'])).total_seconds(),
'error_rate': self.stats['errors_formatted'] / max(1, self.stats['responses_formatted'])
} }

View File

@ -2,13 +2,15 @@
REST API server for COBY system. REST API server for COBY system.
""" """
from fastapi import FastAPI, HTTPException, Request, Query, Path from fastapi import FastAPI, HTTPException, Request, Query, Path, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from typing import Optional, List from typing import Optional, List
import asyncio import asyncio
import os import os
import json
import time
try: try:
from ..simple_config import config from ..simple_config import config
from ..caching.redis_manager import redis_manager from ..caching.redis_manager import redis_manager
@ -27,6 +29,43 @@ except ImportError:
logger = get_logger(__name__) logger = get_logger(__name__)
class ConnectionManager:
"""Manage WebSocket connections for dashboard updates"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
except Exception as e:
logger.error(f"Error sending personal message: {e}")
self.disconnect(websocket)
async def broadcast(self, message: str):
disconnected = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to connection: {e}")
disconnected.append(connection)
# Remove disconnected clients
for connection in disconnected:
self.disconnect(connection)
def create_app(config_obj=None) -> FastAPI: def create_app(config_obj=None) -> FastAPI:
"""Create and configure FastAPI application""" """Create and configure FastAPI application"""
@ -38,12 +77,7 @@ def create_app(config_obj=None) -> FastAPI:
redoc_url="/redoc" redoc_url="/redoc"
) )
# Mount static files for web dashboard (since we removed nginx) # We'll mount static files AFTER defining all API routes to avoid conflicts
static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
# Serve index.html at root for dashboard
app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard")
# Add CORS middleware # Add CORS middleware
app.add_middleware( app.add_middleware(
@ -60,7 +94,49 @@ def create_app(config_obj=None) -> FastAPI:
burst_size=20 burst_size=20
) )
response_formatter = ResponseFormatter() response_formatter = ResponseFormatter()
connection_manager = ConnectionManager()
@app.websocket("/ws/dashboard")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for dashboard real-time updates"""
await connection_manager.connect(websocket)
try:
while True:
# Send periodic updates
await asyncio.sleep(5) # Update every 5 seconds
# Gather system status
system_data = {
"timestamp": time.time(),
"performance": {
"cpu_usage": 25.5, # Stub data
"memory_usage": 45.2,
"throughput": 1250,
"avg_latency": 12.3
},
"exchanges": {
"binance": "connected",
"coinbase": "connected",
"kraken": "disconnected",
"bybit": "connected"
},
"processing": {
"active": True,
"total_processed": 15420
}
}
await connection_manager.send_personal_message(
json.dumps(system_data),
websocket
)
except WebSocketDisconnect:
connection_manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
connection_manager.disconnect(websocket)
@app.get("/health") @app.get("/health")
async def health_check(): async def health_check():
"""Health check endpoint""" """Health check endpoint"""
@ -127,6 +203,30 @@ def create_app(config_obj=None) -> FastAPI:
except Exception as e: except Exception as e:
logger.error(f"API server shutdown error: {e}") logger.error(f"API server shutdown error: {e}")
# API Health check endpoint (for dashboard)
@app.get("/api/health")
async def api_health_check():
"""API Health check endpoint for dashboard"""
try:
# Check Redis connection
redis_healthy = await redis_manager.ping()
health_data = {
'status': 'healthy' if redis_healthy else 'degraded',
'redis': 'connected' if redis_healthy else 'disconnected',
'version': '1.0.0',
'timestamp': time.time()
}
return response_formatter.status_response(health_data)
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content=response_formatter.error("Service unavailable", "HEALTH_CHECK_FAILED")
)
# Health check endpoint # Health check endpoint
@app.get("/health") @app.get("/health")
async def health_check(): async def health_check():
@ -415,6 +515,13 @@ def create_app(config_obj=None) -> FastAPI:
content=response_formatter.error("Internal server error", "BATCH_HEATMAPS_ERROR") content=response_formatter.error("Internal server error", "BATCH_HEATMAPS_ERROR")
) )
# Mount static files for web dashboard AFTER all API routes are defined
static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "web", "static")
if os.path.exists(static_path):
app.mount("/static", StaticFiles(directory=static_path), name="static")
# Serve index.html at root for dashboard, but this should be last
app.mount("/", StaticFiles(directory=static_path, html=True), name="dashboard")
return app return app

View File

@ -3,7 +3,7 @@ Simple Redis manager stub.
""" """
import logging import logging
from typing import Any, Optional from typing import Any, Optional, List, Dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,6 +14,12 @@ class RedisManager:
def __init__(self): def __init__(self):
self.connected = False self.connected = False
self.cache = {} # In-memory cache as fallback self.cache = {} # In-memory cache as fallback
self.stats = {
'hits': 0,
'misses': 0,
'sets': 0,
'deletes': 0
}
async def connect(self): async def connect(self):
"""Connect to Redis (stub)""" """Connect to Redis (stub)"""
@ -28,22 +34,88 @@ class RedisManager:
"""Disconnect from Redis""" """Disconnect from Redis"""
self.connected = False self.connected = False
async def close(self):
"""Close Redis connection (alias for disconnect)"""
await self.disconnect()
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Check if connected""" """Check if connected"""
return self.connected return self.connected
async def ping(self) -> bool:
"""Ping Redis to check connection"""
return self.connected
async def set(self, key: str, value: Any, ttl: Optional[int] = None): async def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Set value in cache""" """Set value in cache"""
self.cache[key] = value self.cache[key] = value
self.stats['sets'] += 1
logger.debug(f"Cached key: {key}") logger.debug(f"Cached key: {key}")
async def get(self, key: str) -> Optional[Any]: async def get(self, key: str) -> Optional[Any]:
"""Get value from cache""" """Get value from cache"""
return self.cache.get(key) value = self.cache.get(key)
if value is not None:
self.stats['hits'] += 1
else:
self.stats['misses'] += 1
return value
async def delete(self, key: str): async def delete(self, key: str):
"""Delete key from cache""" """Delete key from cache"""
self.cache.pop(key, None) self.cache.pop(key, None)
self.stats['deletes'] += 1
async def keys(self, pattern: str) -> List[str]:
"""Get keys matching pattern"""
if pattern.endswith('*'):
prefix = pattern[:-1]
return [key for key in self.cache.keys() if key.startswith(prefix)]
return [key for key in self.cache.keys() if key == pattern]
async def get_heatmap(self, symbol: str, exchange: Optional[str] = None) -> Optional[Any]:
"""Get heatmap data for symbol"""
key = f"heatmap:{symbol}:{exchange or 'consolidated'}"
return await self.get(key)
async def get_orderbook(self, symbol: str, exchange: str) -> Optional[Any]:
"""Get order book data for symbol on exchange"""
key = f"orderbook:{symbol}:{exchange}"
return await self.get(key)
async def get_metrics(self, symbol: str, exchange: str) -> Optional[Any]:
"""Get metrics data for symbol on exchange"""
key = f"metrics:{symbol}:{exchange}"
return await self.get(key)
async def get_exchange_status(self, exchange: str) -> Optional[Any]:
"""Get exchange status"""
key = f"st:{exchange}"
return await self.get(key)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = self.stats['hits'] / max(1, total_requests)
return {
'hits': self.stats['hits'],
'misses': self.stats['misses'],
'sets': self.stats['sets'],
'deletes': self.stats['deletes'],
'hit_rate': hit_rate,
'total_keys': len(self.cache),
'connected': self.connected
}
async def health_check(self) -> Dict[str, Any]:
"""Get Redis health status"""
return {
'connected': self.connected,
'total_keys': len(self.cache),
'memory_usage': 'N/A (stub mode)',
'uptime': 'N/A (stub mode)'
}
# Global instance # Global instance

View File

@ -52,7 +52,7 @@ class MemoryMonitor:
Provides detailed memory analytics and automatic GC optimization. Provides detailed memory analytics and automatic GC optimization.
""" """
def __init__(self, enable_tracemalloc: bool = True, snapshot_interval: float = 30.0): def __init__(self, enable_tracemalloc: bool = False, snapshot_interval: float = 60.0):
""" """
Initialize memory monitor. Initialize memory monitor.
@ -132,6 +132,15 @@ class MemoryMonitor:
if self.auto_gc_enabled: if self.auto_gc_enabled:
self._optimize_gc() self._optimize_gc()
# Periodic cleanup to prevent memory leaks in the monitor itself
if hasattr(self, '_cleanup_counter'):
self._cleanup_counter += 1
else:
self._cleanup_counter = 1
if self._cleanup_counter % 10 == 0: # Every 10 cycles
self._cleanup_monitor_data()
time.sleep(self.snapshot_interval) time.sleep(self.snapshot_interval)
except Exception as e: except Exception as e:
@ -195,30 +204,47 @@ class MemoryMonitor:
logger.error(f"Error taking memory snapshot: {e}") logger.error(f"Error taking memory snapshot: {e}")
def _update_object_counts(self) -> None: def _update_object_counts(self) -> None:
"""Update object counts by type""" """Update object counts by type (limited to prevent memory leaks)"""
try: try:
# Count objects by type # Only track specific object types to avoid creating too many objects
object_counts = defaultdict(int) tracked_types = {
'dict', 'list', 'tuple', 'str', 'function', 'type',
'SystemMetrics', 'MetricPoint', 'MemorySnapshot'
}
for obj in gc.get_objects(): # Count only tracked object types
obj_type = type(obj).__name__ object_counts = {}
object_counts[obj_type] += 1 all_objects = gc.get_objects()
# Store counts with timestamp for tracked_type in tracked_types:
count = sum(1 for obj in all_objects if type(obj).__name__ == tracked_type)
if count > 0:
object_counts[tracked_type] = count
# Store counts with timestamp (only for tracked types)
timestamp = get_current_timestamp() timestamp = get_current_timestamp()
for obj_type, count in object_counts.items(): for obj_type, count in object_counts.items():
self.object_counts[obj_type].append((timestamp, count)) self.object_counts[obj_type].append((timestamp, count))
# Update metrics for common types # Clean up old entries to prevent memory growth
for obj_type in list(self.object_counts.keys()):
if len(self.object_counts[obj_type]) > 50: # Keep only last 50 entries
# Remove oldest entries
while len(self.object_counts[obj_type]) > 50:
self.object_counts[obj_type].popleft()
# Update metrics for common types (less frequently)
try: try:
from .metrics_collector import metrics_collector from .metrics_collector import metrics_collector
common_types = ['dict', 'list', 'tuple', 'str', 'function', 'type'] # Only update metrics every 5th call to reduce object creation
for obj_type in common_types: if not hasattr(self, '_metrics_update_counter'):
if obj_type in object_counts: self._metrics_update_counter = 0
metrics_collector.set_gauge(
f'memory_objects_{obj_type}', self._metrics_update_counter += 1
object_counts[obj_type] if self._metrics_update_counter % 5 == 0:
) for obj_type, count in object_counts.items():
metrics_collector.set_gauge(f'memory_objects_{obj_type}', count)
except ImportError: except ImportError:
pass # Metrics collector not available pass # Metrics collector not available
@ -226,22 +252,36 @@ class MemoryMonitor:
logger.error(f"Error updating object counts: {e}") logger.error(f"Error updating object counts: {e}")
def _check_for_leaks(self) -> None: def _check_for_leaks(self) -> None:
"""Check for potential memory leaks""" """Check for potential memory leaks (less aggressive)"""
try: try:
if len(self.memory_snapshots) < 10: if len(self.memory_snapshots) < 20: # Need more data for reliable detection
return # Need more data return
# Check for consistent memory growth # Only check every 10th call to reduce overhead
recent_snapshots = list(self.memory_snapshots)[-10:] if not hasattr(self, '_leak_check_counter'):
self._leak_check_counter = 0
self._leak_check_counter += 1
if self._leak_check_counter % 10 != 0:
return
# Check for consistent memory growth over longer period
recent_snapshots = list(self.memory_snapshots)[-20:]
memory_values = [s.process_memory_mb for s in recent_snapshots] memory_values = [s.process_memory_mb for s in recent_snapshots]
# Simple linear regression to detect growth trend # More conservative growth detection
if self._is_memory_growing(memory_values): if self._is_memory_growing(memory_values, threshold=20.0): # Increased threshold
# Check object count growth # Check object count growth
potential_leaks = self._analyze_object_growth() potential_leaks = self._analyze_object_growth()
for leak in potential_leaks: for leak in potential_leaks:
if leak not in self.detected_leaks: # Check if we already reported this leak recently
existing_leak = next(
(l for l in self.detected_leaks if l.object_type == leak.object_type),
None
)
if not existing_leak and leak.severity in ['medium', 'high']:
self.detected_leaks.append(leak) self.detected_leaks.append(leak)
logger.warning(f"Potential memory leak detected: {leak.object_type}") logger.warning(f"Potential memory leak detected: {leak.object_type}")
@ -252,6 +292,10 @@ class MemoryMonitor:
except ImportError: except ImportError:
pass pass
# Clean up old leak reports (keep only last 10)
if len(self.detected_leaks) > 10:
self.detected_leaks = self.detected_leaks[-10:]
except Exception as e: except Exception as e:
logger.error(f"Error checking for leaks: {e}") logger.error(f"Error checking for leaks: {e}")
@ -265,42 +309,53 @@ class MemoryMonitor:
return growth > threshold return growth > threshold
def _analyze_object_growth(self) -> List[MemoryLeak]: def _analyze_object_growth(self) -> List[MemoryLeak]:
"""Analyze object count growth to identify potential leaks""" """Analyze object count growth to identify potential leaks (more conservative)"""
leaks = [] leaks = []
for obj_type, counts in self.object_counts.items(): for obj_type, counts in self.object_counts.items():
if len(counts) < 10: if len(counts) < 20: # Need more data points
continue continue
# Get recent counts # Get recent counts over longer period
recent_counts = list(counts)[-10:] recent_counts = list(counts)[-20:]
timestamps = [item[0] for item in recent_counts] timestamps = [item[0] for item in recent_counts]
count_values = [item[1] for item in recent_counts] count_values = [item[1] for item in recent_counts]
# Check for growth # Check for sustained growth
if len(count_values) >= 2: if len(count_values) >= 10:
growth = count_values[-1] - count_values[0] # Calculate growth over the period
start_avg = sum(count_values[:5]) / 5 # Average of first 5 values
end_avg = sum(count_values[-5:]) / 5 # Average of last 5 values
growth = end_avg - start_avg
time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours
if growth > 100 and time_diff > 0: # More than 100 objects growth # More conservative thresholds
if growth > 500 and time_diff > 0.5: # More than 500 objects growth over 30+ minutes
growth_rate = growth / time_diff growth_rate = growth / time_diff
# Determine severity # Skip common types that naturally fluctuate
if growth_rate > 1000: if obj_type in ['dict', 'list', 'tuple', 'str']:
continue
# Determine severity with higher thresholds
if growth_rate > 2000:
severity = 'high' severity = 'high'
elif growth_rate > 100: elif growth_rate > 500:
severity = 'medium' severity = 'medium'
else: else:
severity = 'low' severity = 'low'
leak = MemoryLeak( # Only report medium and high severity leaks
object_type=obj_type, if severity in ['medium', 'high']:
count_increase=growth, leak = MemoryLeak(
size_increase_mb=growth * 0.001, # Rough estimate object_type=obj_type,
growth_rate_per_hour=growth_rate, count_increase=int(growth),
severity=severity size_increase_mb=growth * 0.001, # Rough estimate
) growth_rate_per_hour=growth_rate,
leaks.append(leak) severity=severity
)
leaks.append(leak)
return leaks return leaks
@ -346,6 +401,38 @@ class MemoryMonitor:
except Exception as e: except Exception as e:
logger.error(f"Error optimizing GC: {e}") logger.error(f"Error optimizing GC: {e}")
def _cleanup_monitor_data(self) -> None:
"""Clean up monitor data to prevent memory leaks"""
try:
# Limit memory snapshots
if len(self.memory_snapshots) > 500:
# Keep only the most recent 300 snapshots
while len(self.memory_snapshots) > 300:
self.memory_snapshots.popleft()
# Clean up object counts
for obj_type in list(self.object_counts.keys()):
if len(self.object_counts[obj_type]) > 30:
# Keep only the most recent 20 entries
while len(self.object_counts[obj_type]) > 20:
self.object_counts[obj_type].popleft()
# Remove empty deques
if len(self.object_counts[obj_type]) == 0:
del self.object_counts[obj_type]
# Limit detected leaks
if len(self.detected_leaks) > 5:
self.detected_leaks = self.detected_leaks[-5:]
# Force a small garbage collection
gc.collect()
logger.debug("Cleaned up memory monitor data")
except Exception as e:
logger.error(f"Error cleaning up monitor data: {e}")
def force_garbage_collection(self) -> Dict[str, int]: def force_garbage_collection(self) -> Dict[str, int]:
"""Force garbage collection and return statistics""" """Force garbage collection and return statistics"""
try: try:

View File

@ -267,7 +267,9 @@
async function loadInitialData() { async function loadInitialData() {
try { try {
const response = await fetch('/api/health'); // Use port 8080 for API calls since that's where our API runs
const apiUrl = `http://${window.location.hostname}:8080/api/health`;
const response = await fetch(apiUrl);
if (response.ok) { if (response.ok) {
const data = await response.json(); const data = await response.json();
updateSystemStatus(data); updateSystemStatus(data);
@ -283,7 +285,9 @@
function connectWebSocket() { function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/dashboard`; // Use port 8080 for WebSocket connection since that's where our API runs
const host = window.location.hostname;
const wsUrl = `${protocol}//${host}:8080/ws/dashboard`;
ws = new WebSocket(wsUrl); ws = new WebSocket(wsUrl);