cob ma data for models

This commit is contained in:
Dobromir Popov
2025-09-09 02:07:04 +03:00
parent 317c703ea0
commit 729e0bccb1
3 changed files with 132 additions and 11 deletions

View File

@@ -220,6 +220,7 @@ class TradingOrchestrator:
# Load historical data for models and RL training # Load historical data for models and RL training
self._load_historical_data_for_models() self._load_historical_data_for_models()
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_ml_models(self): def _initialize_ml_models(self):
"""Initialize ML models for enhanced trading""" """Initialize ML models for enhanced trading"""
try: try:
@@ -592,6 +593,7 @@ class TradingOrchestrator:
logger.error(f"Error in extrema trainer prediction: {e}") logger.error(f"Error in extrema trainer prediction: {e}")
return None return None
# UNUSED FUNCTION - Not called anywhere in codebase
def get_memory_usage(self) -> float: def get_memory_usage(self) -> float:
return 30.0 # MB return 30.0 # MB
@@ -623,6 +625,7 @@ class TradingOrchestrator:
logger.error(f"Error in transformer prediction: {e}") logger.error(f"Error in transformer prediction: {e}")
return None return None
# UNUSED FUNCTION - Not called anywhere in codebase
def get_memory_usage(self) -> float: def get_memory_usage(self) -> float:
return 60.0 # MB estimate for transformer return 60.0 # MB estimate for transformer
@@ -649,6 +652,7 @@ class TradingOrchestrator:
logger.error(f"Error in decision model prediction: {e}") logger.error(f"Error in decision model prediction: {e}")
return None return None
# UNUSED FUNCTION - Not called anywhere in codebase
def get_memory_usage(self) -> float: def get_memory_usage(self) -> float:
return 40.0 # MB estimate for decision model return 40.0 # MB estimate for decision model
@@ -666,6 +670,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing ML models: {e}") logger.error(f"Error initializing ML models: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def update_model_loss(self, model_name: str, current_loss: float, best_loss: float = None): def update_model_loss(self, model_name: str, current_loss: float, best_loss: float = None):
"""Update model loss and potentially best loss""" """Update model loss and potentially best loss"""
if model_name in self.model_states: if model_name in self.model_states:
@@ -676,6 +681,7 @@ class TradingOrchestrator:
self.model_states[model_name]['best_loss'] = current_loss self.model_states[model_name]['best_loss'] = current_loss
logger.debug(f"Updated {model_name} loss: current={current_loss:.4f}, best={self.model_states[model_name]['best_loss']:.4f}") logger.debug(f"Updated {model_name} loss: current={current_loss:.4f}, best={self.model_states[model_name]['best_loss']:.4f}")
# UNUSED FUNCTION - Not called anywhere in codebase
def checkpoint_saved(self, model_name: str, checkpoint_data: Dict[str, Any]): def checkpoint_saved(self, model_name: str, checkpoint_data: Dict[str, Any]):
"""Callback when a model checkpoint is saved""" """Callback when a model checkpoint is saved"""
if model_name in self.model_states: if model_name in self.model_states:
@@ -689,6 +695,7 @@ class TradingOrchestrator:
self.model_states[model_name]['best_loss'] = saved_loss self.model_states[model_name]['best_loss'] = saved_loss
logger.info(f"New best loss for {model_name}: {saved_loss:.4f}") logger.info(f"New best loss for {model_name}: {saved_loss:.4f}")
# UNUSED FUNCTION - Not called anywhere in codebase
def get_recent_predictions(self, limit: int = 10) -> List[Dict[str, Any]]: def get_recent_predictions(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent predictions from all models for data streaming""" """Get recent predictions from all models for data streaming"""
try: try:
@@ -728,6 +735,7 @@ class TradingOrchestrator:
logger.debug(f"Error getting recent predictions: {e}") logger.debug(f"Error getting recent predictions: {e}")
return [] return []
# UNUSED FUNCTION - Not called anywhere in codebase
def _save_orchestrator_state(self): def _save_orchestrator_state(self):
"""Save the current state of the orchestrator, including model states.""" """Save the current state of the orchestrator, including model states."""
state = { state = {
@@ -742,6 +750,7 @@ class TradingOrchestrator:
json.dump(state, f, indent=4) json.dump(state, f, indent=4)
logger.info(f"Orchestrator state saved to {save_path}") logger.info(f"Orchestrator state saved to {save_path}")
# UNUSED FUNCTION - Not called anywhere in codebase
def _load_orchestrator_state(self): def _load_orchestrator_state(self):
"""Load the orchestrator state from a saved file.""" """Load the orchestrator state from a saved file."""
save_path = os.path.join(self.config.paths.get('checkpoint_dir', './models/saved'), 'orchestrator_state.json') save_path = os.path.join(self.config.paths.get('checkpoint_dir', './models/saved'), 'orchestrator_state.json')
@@ -777,6 +786,7 @@ class TradingOrchestrator:
self.trade_loop_task = asyncio.create_task(self._trading_decision_loop()) self.trade_loop_task = asyncio.create_task(self._trading_decision_loop())
logger.info("Continuous trading loop initiated.") logger.info("Continuous trading loop initiated.")
# UNUSED FUNCTION - Not called anywhere in codebase
def _initialize_cob_integration(self): def _initialize_cob_integration(self):
"""Initialize COB integration for real-time market microstructure data""" """Initialize COB integration for real-time market microstructure data"""
if COB_INTEGRATION_AVAILABLE: if COB_INTEGRATION_AVAILABLE:
@@ -807,12 +817,14 @@ class TradingOrchestrator:
else: else:
logger.warning("COB Integration not initialized. Cannot start streaming.") logger.warning("COB Integration not initialized. Cannot start streaming.")
# UNUSED FUNCTION - Not called anywhere in codebase
def _start_cob_matrix_worker(self): def _start_cob_matrix_worker(self):
"""Start a background worker to continuously update COB matrices for models""" """Start a background worker to continuously update COB matrices for models"""
if not self.cob_integration: if not self.cob_integration:
logger.warning("COB Integration not available, cannot start COB matrix worker.") logger.warning("COB Integration not available, cannot start COB matrix worker.")
return return
# UNUSED FUNCTION - Not called anywhere in codebase
def matrix_worker(): def matrix_worker():
logger.info("COB Matrix Worker started.") logger.info("COB Matrix Worker started.")
while self.realtime_processing: while self.realtime_processing:
@@ -851,6 +863,7 @@ class TradingOrchestrator:
matrix_thread = threading.Thread(target=matrix_worker, daemon=True) matrix_thread = threading.Thread(target=matrix_worker, daemon=True)
matrix_thread.start() matrix_thread.start()
# UNUSED FUNCTION - Not called anywhere in codebase
def _update_cob_matrix_for_symbol(self, symbol: str): def _update_cob_matrix_for_symbol(self, symbol: str):
"""Updates the COB matrix and features for a specific symbol.""" """Updates the COB matrix and features for a specific symbol."""
if not self.cob_integration: if not self.cob_integration:
@@ -967,6 +980,7 @@ class TradingOrchestrator:
logger.error(f"Error generating COB DQN features for {symbol}: {e}") logger.error(f"Error generating COB DQN features for {symbol}: {e}")
return None return None
# UNUSED FUNCTION - Not called anywhere in codebase
def _on_cob_cnn_features(self, symbol: str, cob_data: Dict): def _on_cob_cnn_features(self, symbol: str, cob_data: Dict):
"""Callback for when new COB CNN features are available""" """Callback for when new COB CNN features are available"""
if not self.realtime_processing: if not self.realtime_processing:
@@ -984,6 +998,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error in _on_cob_cnn_features for {symbol}: {e}") logger.error(f"Error in _on_cob_cnn_features for {symbol}: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def _on_cob_dqn_features(self, symbol: str, cob_data: Dict): def _on_cob_dqn_features(self, symbol: str, cob_data: Dict):
"""Callback for when new COB DQN features are available""" """Callback for when new COB DQN features are available"""
if not self.realtime_processing: if not self.realtime_processing:
@@ -1001,6 +1016,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error in _on_cob_dqn_features for {symbol}: {e}") logger.error(f"Error in _on_cob_dqn_features for {symbol}: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def _on_cob_dashboard_data(self, symbol: str, cob_data: Dict): def _on_cob_dashboard_data(self, symbol: str, cob_data: Dict):
"""Callback for when new COB data is available for the dashboard""" """Callback for when new COB data is available for the dashboard"""
if not self.realtime_processing: if not self.realtime_processing:
@@ -1013,20 +1029,24 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}") logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def get_cob_features(self, symbol: str) -> Optional[np.ndarray]: def get_cob_features(self, symbol: str) -> Optional[np.ndarray]:
"""Get the latest COB features for CNN model""" """Get the latest COB features for CNN model"""
return self.latest_cob_features.get(symbol) return self.latest_cob_features.get(symbol)
# UNUSED FUNCTION - Not called anywhere in codebase
def get_cob_state(self, symbol: str) -> Optional[np.ndarray]: def get_cob_state(self, symbol: str) -> Optional[np.ndarray]:
"""Get the latest COB state for DQN model""" """Get the latest COB state for DQN model"""
return self.latest_cob_state.get(symbol) return self.latest_cob_state.get(symbol)
# SINGLE-USE FUNCTION - Called only once in codebase
def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]: def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]:
"""Get the latest raw COB snapshot for a symbol""" """Get the latest raw COB snapshot for a symbol"""
if self.cob_integration: if self.cob_integration:
return self.cob_integration.get_latest_cob_snapshot(symbol) return self.cob_integration.get_latest_cob_snapshot(symbol)
return None return None
# SINGLE-USE FUNCTION - Called only once in codebase
def get_cob_feature_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]: def get_cob_feature_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]:
"""Get a sequence of COB CNN features for sequence models""" """Get a sequence of COB CNN features for sequence models"""
if symbol not in self.cob_feature_history or not self.cob_feature_history[symbol]: if symbol not in self.cob_feature_history or not self.cob_feature_history[symbol]:
@@ -1059,6 +1079,7 @@ class TradingOrchestrator:
# Weight normalization removed - handled by ModelManager # Weight normalization removed - handled by ModelManager
# UNUSED FUNCTION - Not called anywhere in codebase
def add_decision_callback(self, callback): def add_decision_callback(self, callback):
"""Add a callback function to be called when decisions are made""" """Add a callback function to be called when decisions are made"""
self.decision_callbacks.append(callback) self.decision_callbacks.append(callback)
@@ -1322,6 +1343,7 @@ class TradingOrchestrator:
logger.debug(f"Error building RL state for {symbol}: {e}") logger.debug(f"Error building RL state for {symbol}: {e}")
return None return None
# SINGLE-USE FUNCTION - Called only once in codebase
def _get_cob_state(self, symbol: str) -> Optional[np.ndarray]: def _get_cob_state(self, symbol: str) -> Optional[np.ndarray]:
"""Build COB state vector for COB RL agent""" """Build COB state vector for COB RL agent"""
try: try:
@@ -1478,6 +1500,7 @@ class TradingOrchestrator:
logger.error(f"Error creating RL state for {symbol}: {e}") logger.error(f"Error creating RL state for {symbol}: {e}")
return None return None
# SINGLE-USE FUNCTION - Called only once in codebase
def _combine_predictions(self, symbol: str, price: float, def _combine_predictions(self, symbol: str, price: float,
predictions: List[Prediction], predictions: List[Prediction],
timestamp: datetime) -> TradingDecision: timestamp: datetime) -> TradingDecision:
@@ -1593,6 +1616,7 @@ class TradingOrchestrator:
current_position_pnl=0.0 current_position_pnl=0.0
) )
# SINGLE-USE FUNCTION - Called only once in codebase
def _get_timeframe_weight(self, timeframe: str) -> float: def _get_timeframe_weight(self, timeframe: str) -> float:
"""Get importance weight for a timeframe""" """Get importance weight for a timeframe"""
# Higher timeframes get more weight in decision making # Higher timeframes get more weight in decision making
@@ -1605,12 +1629,14 @@ class TradingOrchestrator:
# Model performance and weight adaptation removed - handled by ModelManager # Model performance and weight adaptation removed - handled by ModelManager
# Use self.model_manager for all model performance tracking # Use self.model_manager for all model performance tracking
# UNUSED FUNCTION - Not called anywhere in codebase
def get_recent_decisions(self, symbol: str, limit: int = 10) -> List[TradingDecision]: def get_recent_decisions(self, symbol: str, limit: int = 10) -> List[TradingDecision]:
"""Get recent decisions for a symbol""" """Get recent decisions for a symbol"""
if symbol in self.recent_decisions: if symbol in self.recent_decisions:
return self.recent_decisions[symbol][-limit:] return self.recent_decisions[symbol][-limit:]
return [] return []
# UNUSED FUNCTION - Not called anywhere in codebase
def get_performance_metrics(self) -> Dict[str, Any]: def get_performance_metrics(self) -> Dict[str, Any]:
"""Get performance metrics for the orchestrator""" """Get performance metrics for the orchestrator"""
return { return {
@@ -1625,6 +1651,7 @@ class TradingOrchestrator:
} }
} }
# UNUSED FUNCTION - Not called anywhere in codebase
def get_model_states(self) -> Dict[str, Dict]: def get_model_states(self) -> Dict[str, Dict]:
"""Get current model states with REAL checkpoint data - SSOT for dashboard""" """Get current model states with REAL checkpoint data - SSOT for dashboard"""
try: try:
@@ -1749,6 +1776,7 @@ class TradingOrchestrator:
'extrema_trainer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False} 'extrema_trainer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}
} }
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_decision_fusion(self): def _initialize_decision_fusion(self):
"""Initialize the decision fusion neural network for learning model effectiveness""" """Initialize the decision fusion neural network for learning model effectiveness"""
try: try:
@@ -1767,6 +1795,7 @@ class TradingOrchestrator:
self.fc3 = nn.Linear(hidden_size, 3) # BUY, SELL, HOLD self.fc3 = nn.Linear(hidden_size, 3) # BUY, SELL, HOLD
self.dropout = nn.Dropout(0.2) self.dropout = nn.Dropout(0.2)
# UNUSED FUNCTION - Not called anywhere in codebase
def forward(self, x): def forward(self, x):
x = torch.relu(self.fc1(x)) x = torch.relu(self.fc1(x))
x = self.dropout(x) x = self.dropout(x)
@@ -1781,6 +1810,7 @@ class TradingOrchestrator:
logger.warning(f"Decision fusion initialization failed: {e}") logger.warning(f"Decision fusion initialization failed: {e}")
self.decision_fusion_enabled = False self.decision_fusion_enabled = False
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_enhanced_training_system(self): def _initialize_enhanced_training_system(self):
"""Initialize the enhanced real-time training system""" """Initialize the enhanced real-time training system"""
try: try:
@@ -1825,6 +1855,7 @@ class TradingOrchestrator:
self.training_enabled = False self.training_enabled = False
self.enhanced_training_system = None self.enhanced_training_system = None
# SINGLE-USE FUNCTION - Called only once in codebase
def start_enhanced_training(self): def start_enhanced_training(self):
"""Start the enhanced real-time training system""" """Start the enhanced real-time training system"""
try: try:
@@ -1845,6 +1876,7 @@ class TradingOrchestrator:
logger.error(f"Error starting enhanced training: {e}") logger.error(f"Error starting enhanced training: {e}")
return False return False
# UNUSED FUNCTION - Not called anywhere in codebase
def stop_enhanced_training(self): def stop_enhanced_training(self):
"""Stop the enhanced real-time training system""" """Stop the enhanced real-time training system"""
try: try:
@@ -1858,6 +1890,7 @@ class TradingOrchestrator:
logger.error(f"Error stopping enhanced training: {e}") logger.error(f"Error stopping enhanced training: {e}")
return False return False
# UNUSED FUNCTION - Not called anywhere in codebase
def get_enhanced_training_stats(self) -> Dict[str, Any]: def get_enhanced_training_stats(self) -> Dict[str, Any]:
"""Get enhanced training system statistics with orchestrator integration""" """Get enhanced training system statistics with orchestrator integration"""
try: try:
@@ -1954,6 +1987,7 @@ class TradingOrchestrator:
'error': str(e) 'error': str(e)
} }
# UNUSED FUNCTION - Not called anywhere in codebase
def set_training_dashboard(self, dashboard): def set_training_dashboard(self, dashboard):
"""Set the dashboard reference for the training system""" """Set the dashboard reference for the training system"""
try: try:
@@ -1972,6 +2006,7 @@ class TradingOrchestrator:
logger.error(f"Error getting universal data stream: {e}") logger.error(f"Error getting universal data stream: {e}")
return None return None
# UNUSED FUNCTION - Not called anywhere in codebase
def get_universal_data_for_model(self, model_type: str = 'cnn') -> Optional[Dict[str, Any]]: def get_universal_data_for_model(self, model_type: str = 'cnn') -> Optional[Dict[str, Any]]:
"""Get formatted universal data for specific model types""" """Get formatted universal data for specific model types"""
try: try:
@@ -2014,6 +2049,7 @@ class TradingOrchestrator:
except Exception: except Exception:
return False return False
# SINGLE-USE FUNCTION - Called only once in codebase
def _calculate_aggressiveness_thresholds(self, current_pnl: float, symbol: str) -> tuple: def _calculate_aggressiveness_thresholds(self, current_pnl: float, symbol: str) -> tuple:
"""Calculate confidence thresholds based on aggressiveness settings""" """Calculate confidence thresholds based on aggressiveness settings"""
# Base thresholds # Base thresholds
@@ -2036,6 +2072,7 @@ class TradingOrchestrator:
return entry_threshold, exit_threshold return entry_threshold, exit_threshold
# SINGLE-USE FUNCTION - Called only once in codebase
def _apply_pnl_feedback(self, action: str, confidence: float, current_pnl: float, def _apply_pnl_feedback(self, action: str, confidence: float, current_pnl: float,
symbol: str, reasoning: dict) -> tuple: symbol: str, reasoning: dict) -> tuple:
"""Apply P&L-based feedback to decision making""" """Apply P&L-based feedback to decision making"""
@@ -2069,6 +2106,7 @@ class TradingOrchestrator:
logger.debug(f"Error applying P&L feedback: {e}") logger.debug(f"Error applying P&L feedback: {e}")
return action, confidence return action, confidence
# SINGLE-USE FUNCTION - Called only once in codebase
def _calculate_dynamic_entry_aggressiveness(self, symbol: str) -> float: def _calculate_dynamic_entry_aggressiveness(self, symbol: str) -> float:
"""Calculate dynamic entry aggressiveness based on recent performance""" """Calculate dynamic entry aggressiveness based on recent performance"""
try: try:
@@ -2097,6 +2135,7 @@ class TradingOrchestrator:
logger.debug(f"Error calculating dynamic entry aggressiveness: {e}") logger.debug(f"Error calculating dynamic entry aggressiveness: {e}")
return 0.5 return 0.5
# SINGLE-USE FUNCTION - Called only once in codebase
def _calculate_dynamic_exit_aggressiveness(self, symbol: str, current_pnl: float) -> float: def _calculate_dynamic_exit_aggressiveness(self, symbol: str, current_pnl: float) -> float:
"""Calculate dynamic exit aggressiveness based on P&L and market conditions""" """Calculate dynamic exit aggressiveness based on P&L and market conditions"""
try: try:
@@ -2119,11 +2158,13 @@ class TradingOrchestrator:
logger.debug(f"Error calculating dynamic exit aggressiveness: {e}") logger.debug(f"Error calculating dynamic exit aggressiveness: {e}")
return 0.5 return 0.5
# UNUSED FUNCTION - Not called anywhere in codebase
def set_trading_executor(self, trading_executor): def set_trading_executor(self, trading_executor):
"""Set the trading executor for position tracking""" """Set the trading executor for position tracking"""
self.trading_executor = trading_executor self.trading_executor = trading_executor
logger.info("Trading executor set for position tracking and P&L feedback") logger.info("Trading executor set for position tracking and P&L feedback")
# SINGLE-USE FUNCTION - Called only once in codebase
def _get_current_price(self, symbol: str) -> float: def _get_current_price(self, symbol: str) -> float:
"""Get current price for symbol""" """Get current price for symbol"""
try: try:
@@ -2169,6 +2210,7 @@ class TradingOrchestrator:
else: else:
return 1000.0 return 1000.0
# SINGLE-USE FUNCTION - Called only once in codebase
def _generate_fallback_prediction(self, symbol: str) -> Dict[str, Any]: def _generate_fallback_prediction(self, symbol: str) -> Dict[str, Any]:
"""Generate fallback prediction when models fail""" """Generate fallback prediction when models fail"""
try: try:
@@ -2189,6 +2231,7 @@ class TradingOrchestrator:
'model': 'fallback' 'model': 'fallback'
} }
# UNUSED FUNCTION - Not called anywhere in codebase
def capture_dqn_prediction(self, symbol: str, action_idx: int, confidence: float, price: float, q_values: List[float] = None): def capture_dqn_prediction(self, symbol: str, action_idx: int, confidence: float, price: float, q_values: List[float] = None):
"""Capture DQN prediction for dashboard visualization""" """Capture DQN prediction for dashboard visualization"""
try: try:
@@ -2205,6 +2248,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.debug(f"Error capturing DQN prediction: {e}") logger.debug(f"Error capturing DQN prediction: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def capture_cnn_prediction(self, symbol: str, direction: int, confidence: float, current_price: float, predicted_price: float): def capture_cnn_prediction(self, symbol: str, direction: int, confidence: float, current_price: float, predicted_price: float):
"""Capture CNN prediction for dashboard visualization""" """Capture CNN prediction for dashboard visualization"""
try: try:
@@ -2270,6 +2314,7 @@ class TradingOrchestrator:
logger.warning(f"Data stream monitor initialization failed: {e}") logger.warning(f"Data stream monitor initialization failed: {e}")
self.data_stream_monitor = None self.data_stream_monitor = None
# UNUSED FUNCTION - Not called anywhere in codebase
def start_data_stream(self) -> bool: def start_data_stream(self) -> bool:
"""Start data streaming if not already active.""" """Start data streaming if not already active."""
try: try:
@@ -2282,6 +2327,7 @@ class TradingOrchestrator:
logger.error(f"Failed to start data stream: {e}") logger.error(f"Failed to start data stream: {e}")
return False return False
# UNUSED FUNCTION - Not called anywhere in codebase
def stop_data_stream(self) -> bool: def stop_data_stream(self) -> bool:
"""Stop data streaming if active.""" """Stop data streaming if active."""
try: try:
@@ -2292,6 +2338,7 @@ class TradingOrchestrator:
logger.error(f"Failed to stop data stream: {e}") logger.error(f"Failed to stop data stream: {e}")
return False return False
# SINGLE-USE FUNCTION - Called only once in codebase
def get_data_stream_status(self) -> Dict[str, any]: def get_data_stream_status(self) -> Dict[str, any]:
"""Return current data stream status and buffer sizes.""" """Return current data stream status and buffer sizes."""
status = { status = {
@@ -2310,6 +2357,7 @@ class TradingOrchestrator:
pass pass
return status return status
# UNUSED FUNCTION - Not called anywhere in codebase
def save_data_snapshot(self, filepath: str = None) -> str: def save_data_snapshot(self, filepath: str = None) -> str:
"""Save a snapshot of current data stream buffers to a file. """Save a snapshot of current data stream buffers to a file.
@@ -2337,6 +2385,7 @@ class TradingOrchestrator:
logger.error(f"Failed to save data snapshot: {e}") logger.error(f"Failed to save data snapshot: {e}")
raise raise
# UNUSED FUNCTION - Not called anywhere in codebase
def get_stream_summary(self) -> Dict[str, any]: def get_stream_summary(self) -> Dict[str, any]:
"""Get a summary of current data stream activity.""" """Get a summary of current data stream activity."""
status = self.get_data_stream_status() status = self.get_data_stream_status()
@@ -2360,6 +2409,7 @@ class TradingOrchestrator:
return summary return summary
# UNUSED FUNCTION - Not called anywhere in codebase
def get_cob_data(self, symbol: str, limit: int = 300) -> List: def get_cob_data(self, symbol: str, limit: int = 300) -> List:
"""Get COB data for a symbol with specified limit.""" """Get COB data for a symbol with specified limit."""
try: try:
@@ -2370,6 +2420,7 @@ class TradingOrchestrator:
logger.error(f"Error getting COB data: {e}") logger.error(f"Error getting COB data: {e}")
return [] return []
# SINGLE-USE FUNCTION - Called only once in codebase
def _load_historical_data_for_models(self): def _load_historical_data_for_models(self):
"""Load 300 historical candles for all required timeframes and symbols for model training""" """Load 300 historical candles for all required timeframes and symbols for model training"""
logger.info("Loading 300 historical candles for model training and RL context...") logger.info("Loading 300 historical candles for model training and RL context...")
@@ -2425,6 +2476,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error in historical data loading: {e}") logger.error(f"Error in historical data loading: {e}")
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_models_with_historical_data(self, symbols_timeframes: List[Tuple[str, str]]): def _initialize_models_with_historical_data(self, symbols_timeframes: List[Tuple[str, str]]):
"""Initialize all NN models with historical data using data provider's normalized methods""" """Initialize all NN models with historical data using data provider's normalized methods"""
try: try:
@@ -2458,6 +2510,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing models with historical data: {e}") logger.error(f"Error initializing models with historical data: {e}")
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_cnn_with_provider_data(self): def _initialize_cnn_with_provider_data(self):
"""Initialize CNN using data provider's normalized feature extraction""" """Initialize CNN using data provider's normalized feature extraction"""
try: try:
@@ -2488,6 +2541,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing CNN with provider data: {e}") logger.error(f"Error initializing CNN with provider data: {e}")
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_dqn_with_provider_data(self, symbols_timeframes: List[Tuple[str, str]]): def _initialize_dqn_with_provider_data(self, symbols_timeframes: List[Tuple[str, str]]):
"""Initialize DQN using data provider's normalized state vector creation""" """Initialize DQN using data provider's normalized state vector creation"""
try: try:
@@ -2505,6 +2559,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing DQN with provider data: {e}") logger.error(f"Error initializing DQN with provider data: {e}")
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_transformer_with_provider_data(self, symbols_timeframes: List[Tuple[str, str]]): def _initialize_transformer_with_provider_data(self, symbols_timeframes: List[Tuple[str, str]]):
"""Initialize Transformer using data provider's normalized sequence creation""" """Initialize Transformer using data provider's normalized sequence creation"""
try: try:
@@ -2522,6 +2577,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing Transformer with provider data: {e}") logger.error(f"Error initializing Transformer with provider data: {e}")
# SINGLE-USE FUNCTION - Called only once in codebase
def _initialize_decision_with_provider_data(self, symbol_features: Dict[str, Dict[str, pd.DataFrame]]): def _initialize_decision_with_provider_data(self, symbol_features: Dict[str, Dict[str, pd.DataFrame]]):
"""Initialize Decision Fusion using data provider's feature aggregation""" """Initialize Decision Fusion using data provider's feature aggregation"""
try: try:
@@ -2551,6 +2607,7 @@ class TradingOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"Error initializing Decision Fusion with provider data: {e}") logger.error(f"Error initializing Decision Fusion with provider data: {e}")
# UNUSED FUNCTION - Not called anywhere in codebase
def get_ohlcv_data(self, symbol: str, timeframe: str, limit: int = 300) -> List: def get_ohlcv_data(self, symbol: str, timeframe: str, limit: int = 300) -> List:
"""Get OHLCV data for a symbol with specified timeframe and limit.""" """Get OHLCV data for a symbol with specified timeframe and limit."""
try: try:

View File

@@ -166,8 +166,14 @@ class CleanTradingDashboard:
self.cob_update_count = 0 self.cob_update_count = 0
self.last_cob_broadcast: dict = {} # Rate limiting for UI updates self.last_cob_broadcast: dict = {} # Rate limiting for UI updates
self.cob_data_history: Dict[str, deque] = { self.cob_data_history: Dict[str, deque] = {
'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots 'ETH/USDT': deque(maxlen=120), # Store ~120 seconds of 1s snapshots for MA calculations
'BTC/USDT': deque(maxlen=61) 'BTC/USDT': deque(maxlen=120)
}
# COB imbalance moving averages for different timeframes
self.cob_imbalance_ma: Dict[str, Dict[str, float]] = {
'ETH/USDT': {},
'BTC/USDT': {}
} }
# Initialize timezone # Initialize timezone
@@ -314,9 +320,14 @@ class CleanTradingDashboard:
# Get COB data from orchestrator # Get COB data from orchestrator
cob_data = self._get_cob_data_with_buckets(symbol, limit) cob_data = self._get_cob_data_with_buckets(symbol, limit)
# Add COB imbalance moving averages
cob_imbalance_mas = self.cob_imbalance_ma.get(symbol, {})
return jsonify({ return jsonify({
'symbol': symbol, 'symbol': symbol,
'data': cob_data, 'data': cob_data,
'cob_imbalance_ma': cob_imbalance_mas,
'timestamp': datetime.now().isoformat() 'timestamp': datetime.now().isoformat()
}) })
except Exception as e: except Exception as e:
@@ -1183,8 +1194,12 @@ class CleanTradingDashboard:
# Determine COB data source mode # Determine COB data source mode
cob_mode = self._get_cob_mode() cob_mode = self._get_cob_mode()
eth_components = self.component_manager.format_cob_data(eth_snapshot, 'ETH/USDT', eth_imbalance_stats, cob_mode) # Get COB imbalance moving averages
btc_components = self.component_manager.format_cob_data(btc_snapshot, 'BTC/USDT', btc_imbalance_stats, cob_mode) eth_ma_data = self.cob_imbalance_ma.get('ETH/USDT', {})
btc_ma_data = self.cob_imbalance_ma.get('BTC/USDT', {})
eth_components = self.component_manager.format_cob_data(eth_snapshot, 'ETH/USDT', eth_imbalance_stats, cob_mode, eth_ma_data)
btc_components = self.component_manager.format_cob_data(btc_snapshot, 'BTC/USDT', btc_imbalance_stats, cob_mode, btc_ma_data)
return eth_components, btc_components return eth_components, btc_components
@@ -5170,10 +5185,11 @@ class CleanTradingDashboard:
} }
} }
# Store in history (keep last 15 seconds) # Store in history (keep last 120 seconds for MA calculations)
self.cob_data_history[symbol].append(cob_snapshot) self.cob_data_history[symbol].append(cob_snapshot)
if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds
self.cob_data_history[symbol] = self.cob_data_history[symbol][-15:] # Calculate COB imbalance moving averages for different timeframes
self._calculate_cob_imbalance_mas(symbol)
# Update latest data # Update latest data
self.latest_cob_data[symbol] = cob_snapshot self.latest_cob_data[symbol] = cob_snapshot
@@ -5189,7 +5205,41 @@ class CleanTradingDashboard:
except Exception as e: except Exception as e:
logger.debug(f"Error collecting COB data for {symbol}: {e}") logger.debug(f"Error collecting COB data for {symbol}: {e}")
def _calculate_cob_imbalance_mas(self, symbol: str):
"""Calculate COB imbalance moving averages for different timeframes"""
try:
history = self.cob_data_history[symbol]
if len(history) < 2:
return
# Extract imbalance values from history
imbalances = [snapshot['stats']['imbalance'] for snapshot in history if 'stats' in snapshot and 'imbalance' in snapshot['stats']]
if not imbalances:
return
# Calculate moving averages for different timeframes
timeframes = {
'10s': min(10, len(imbalances)), # 10 second MA
'30s': min(30, len(imbalances)), # 30 second MA
'60s': min(60, len(imbalances)), # 60 second MA
}
for timeframe, periods in timeframes.items():
if len(imbalances) >= periods:
# Calculate simple moving average
ma_value = sum(imbalances[-periods:]) / periods
self.cob_imbalance_ma[symbol][timeframe] = ma_value
else:
# If not enough data, use current imbalance
self.cob_imbalance_ma[symbol][timeframe] = imbalances[-1]
logger.debug(f"COB imbalance MAs for {symbol}: {self.cob_imbalance_ma[symbol]}")
except Exception as e:
logger.debug(f"Error calculating COB imbalance MAs for {symbol}: {e}")
def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict): def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict):
"""Generate bucketed COB data for model feeding""" """Generate bucketed COB data for model feeding"""
try: try:

View File

@@ -272,7 +272,7 @@ class DashboardComponentManager:
logger.error(f"Error formatting system status: {e}") logger.error(f"Error formatting system status: {e}")
return [html.P(f"Error: {str(e)}", className="text-danger small")] return [html.P(f"Error: {str(e)}", className="text-danger small")]
def format_cob_data(self, cob_snapshot, symbol, cumulative_imbalance_stats=None, cob_mode="Unknown"): def format_cob_data(self, cob_snapshot, symbol, cumulative_imbalance_stats=None, cob_mode="Unknown", imbalance_ma_data=None):
"""Format COB data into a split view with summary, imbalance stats, and a compact ladder.""" """Format COB data into a split view with summary, imbalance stats, and a compact ladder."""
try: try:
if not cob_snapshot: if not cob_snapshot:
@@ -317,7 +317,7 @@ class DashboardComponentManager:
} }
# --- Left Panel: Overview and Stats --- # --- Left Panel: Overview and Stats ---
overview_panel = self._create_cob_overview_panel(symbol, stats, cumulative_imbalance_stats, cob_mode) overview_panel = self._create_cob_overview_panel(symbol, stats, cumulative_imbalance_stats, cob_mode, imbalance_ma_data)
# --- Right Panel: Compact Ladder --- # --- Right Panel: Compact Ladder ---
ladder_panel = self._create_cob_ladder_panel(bids, asks, mid_price, symbol) ladder_panel = self._create_cob_ladder_panel(bids, asks, mid_price, symbol)
@@ -331,7 +331,7 @@ class DashboardComponentManager:
logger.error(f"Error formatting split COB data: {e}") logger.error(f"Error formatting split COB data: {e}")
return html.P(f"Error: {str(e)}", className="text-danger small") return html.P(f"Error: {str(e)}", className="text-danger small")
def _create_cob_overview_panel(self, symbol, stats, cumulative_imbalance_stats, cob_mode="Unknown"): def _create_cob_overview_panel(self, symbol, stats, cumulative_imbalance_stats, cob_mode="Unknown", imbalance_ma_data=None):
"""Creates the left panel with summary and imbalance stats.""" """Creates the left panel with summary and imbalance stats."""
mid_price = stats.get('mid_price', 0) mid_price = stats.get('mid_price', 0)
spread_bps = stats.get('spread_bps', 0) spread_bps = stats.get('spread_bps', 0)
@@ -373,6 +373,20 @@ class DashboardComponentManager:
html.Div(imbalance_stats_display), html.Div(imbalance_stats_display),
# COB Imbalance Moving Averages
ma_display = []
if imbalance_ma_data:
ma_display.append(html.H6("Imbalance MAs", className="mt-3 mb-2 small text-muted text-uppercase"))
for timeframe, ma_value in imbalance_ma_data.items():
ma_color = "text-success" if ma_value > 0 else "text-danger"
ma_text = f"MA {timeframe}: {ma_value:.3f}"
ma_display.append(html.Div([
html.Strong(f"{timeframe}: ", className="small"),
html.Span(ma_text, className=f"small {ma_color}")
], className="mb-1"))
html.Div(ma_display),
html.Hr(className="my-2"), html.Hr(className="my-2"),
html.Table([ html.Table([