This commit is contained in:
Dobromir Popov
2025-10-01 10:49:35 +03:00
parent d49a473ed6
commit aee61e2e53
2 changed files with 9 additions and 7 deletions

View File

@@ -1,7 +1,7 @@
# Design Document # Design Document
## Overview ## Overview
The Multi-Exchange Data Aggregation System is a comprehensive data collection and processing subsystem designed to serve as the foundational data layer for the trading orchestrator. The system will collect real-time order book and OHLCV data from the top 10 cryptocurrency exchanges, aggregate it into standardized formats, store it in a TimescaleDB time-series database, and provide both live data feeds and historical replay capabilities. The Multi-Exchange Data Aggregation System is a comprehensive data collection and processing subsystem designed to serve as the foundational data layer for the trading orchestrator. The system will collect real-time order book and OHLCV data from the top 10 cryptocurrency exchanges, aggregate it into standardized formats, store it in a TimescaleDB time-series database, and provide both live data feeds and historical replay capabilities.
The system follows a microservices architecture with containerized components, ensuring scalability, maintainability, and seamless integration with the existing trading infrastructure. The system follows a microservices architecture with containerized components, ensuring scalability, maintainability, and seamless integration with the existing trading infrastructure.

View File

@@ -293,7 +293,9 @@ class CleanTradingDashboard:
# WebSocket streaming # WebSocket streaming
self.ws_price_cache: dict = {} self.ws_price_cache: dict = {}
self.is_streaming = False self.is_streaming = False
self.tick_cache: list = [] # Use bounded deque to avoid unbounded growth
from collections import deque as _deque
self.tick_cache: _deque = _deque(maxlen=3600) # keep last ~1h of 1s ticks
# COB data cache - enhanced with price buckets and memory system # COB data cache - enhanced with price buckets and memory system
self.cob_cache: dict = { self.cob_cache: dict = {
@@ -301,14 +303,14 @@ class CleanTradingDashboard:
'last_update': 0, 'last_update': 0,
'data': None, 'data': None,
'updates_count': 0, 'updates_count': 0,
'update_times': [], 'update_times': _deque(maxlen=7200), # cap ~2h of 1s timestamps
'update_rate': 0.0 'update_rate': 0.0
}, },
'BTC/USDT': { 'BTC/USDT': {
'last_update': 0, 'last_update': 0,
'data': None, 'data': None,
'updates_count': 0, 'updates_count': 0,
'update_times': [], 'update_times': _deque(maxlen=7200),
'update_rate': 0.0 'update_rate': 0.0
} }
} }
@@ -5795,9 +5797,9 @@ class CleanTradingDashboard:
# Add to recent decisions for display # Add to recent decisions for display
self.recent_decisions.append(signal) self.recent_decisions.append(signal)
# Keep more decisions for longer history - extend to 200 decisions # Keep decisions bounded to prevent growth
if len(self.recent_decisions) > 200: if len(self.recent_decisions) > 200:
self.recent_decisions = self.recent_decisions[-200:] del self.recent_decisions[:-200]
# Train ALL models on EVERY prediction result (not just executed ones) # Train ALL models on EVERY prediction result (not just executed ones)
# This ensures models learn from all predictions, not just successful trades # This ensures models learn from all predictions, not just successful trades
@@ -8866,7 +8868,7 @@ class CleanTradingDashboard:
# Add to recent decisions # Add to recent decisions
self.recent_decisions.append(signal) self.recent_decisions.append(signal)
if len(self.recent_decisions) > 200: if len(self.recent_decisions) > 200:
self.recent_decisions.pop(0) del self.recent_decisions[:-200]
logger.info(f"COB SIGNAL: {symbol} {signal['action']} signal generated - imbalance: {imbalance:.3f}, confidence: {signal['confidence']:.3f}") logger.info(f"COB SIGNAL: {symbol} {signal['action']} signal generated - imbalance: {imbalance:.3f}, confidence: {signal['confidence']:.3f}")