diff --git a/NN/models/saved/checkpoint_metadata.json b/NN/models/saved/checkpoint_metadata.json index 51ec246..c053ddb 100644 --- a/NN/models/saved/checkpoint_metadata.json +++ b/NN/models/saved/checkpoint_metadata.json @@ -271,15 +271,15 @@ ], "decision": [ { - "checkpoint_id": "decision_20250702_013257", + "checkpoint_id": "decision_20250702_020007", "model_name": "decision", "model_type": "decision_fusion", - "file_path": "NN\\models\\saved\\decision\\decision_20250702_013257.pt", - "created_at": "2025-07-02T01:32:57.057698", + "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt", + "created_at": "2025-07-02T02:00:07.439094", "file_size_mb": 0.06720924377441406, - "performance_score": 9.99999352005137, + "performance_score": 9.999997759969705, "accuracy": null, - "loss": 6.479948628599987e-06, + "loss": 2.240030294586859e-06, "val_accuracy": null, "val_loss": null, "reward": null, @@ -291,15 +291,15 @@ "wandb_artifact_name": null }, { - "checkpoint_id": "decision_20250702_013256", + "checkpoint_id": "decision_20250702_020007", "model_name": "decision", "model_type": "decision_fusion", - "file_path": "NN\\models\\saved\\decision\\decision_20250702_013256.pt", - "created_at": "2025-07-02T01:32:56.667169", + "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt", + "created_at": "2025-07-02T02:00:07.707012", "file_size_mb": 0.06720924377441406, - "performance_score": 9.999993471487318, + "performance_score": 9.999997758801166, "accuracy": null, - "loss": 6.528512681061979e-06, + "loss": 2.2411988334327916e-06, "val_accuracy": null, "val_loss": null, "reward": null, @@ -311,15 +311,15 @@ "wandb_artifact_name": null }, { - "checkpoint_id": "decision_20250702_013255", + "checkpoint_id": "decision_20250702_020007", "model_name": "decision", "model_type": "decision_fusion", - "file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", - "created_at": "2025-07-02T01:32:55.915359", + "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt", + "created_at": "2025-07-02T02:00:07.570949", "file_size_mb": 0.06720924377441406, - "performance_score": 9.999993469737547, + "performance_score": 9.999997757764104, "accuracy": null, - "loss": 6.5302624539599814e-06, + "loss": 2.2422358958193754e-06, "val_accuracy": null, "val_loss": null, "reward": null, @@ -331,15 +331,15 @@ "wandb_artifact_name": null }, { - "checkpoint_id": "decision_20250702_013255", + "checkpoint_id": "decision_20250702_020007", "model_name": "decision", "model_type": "decision_fusion", - "file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", - "created_at": "2025-07-02T01:32:55.774316", + "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt", + "created_at": "2025-07-02T02:00:07.867047", "file_size_mb": 0.06720924377441406, - "performance_score": 9.99999346914947, + "performance_score": 9.999997757753505, "accuracy": null, - "loss": 6.530850530594989e-06, + "loss": 2.2422464945511442e-06, "val_accuracy": null, "val_loss": null, "reward": null, @@ -351,15 +351,15 @@ "wandb_artifact_name": null }, { - "checkpoint_id": "decision_20250702_013255", + "checkpoint_id": "decision_20250702_020007", "model_name": "decision", "model_type": "decision_fusion", - "file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", - "created_at": "2025-07-02T01:32:55.646001", + "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt", + "created_at": "2025-07-02T02:00:07.302999", "file_size_mb": 0.06720924377441406, - "performance_score": 9.99999346889822, + "performance_score": 9.999997754320662, "accuracy": null, - "loss": 6.531101780155828e-06, + "loss": 2.245679338091438e-06, "val_accuracy": null, "val_loss": null, "reward": null, diff --git a/web/templated_dashboard.py b/web/templated_dashboard.py index 63928f6..12d35eb 100644 --- a/web/templated_dashboard.py +++ b/web/templated_dashboard.py @@ -3,20 +3,33 @@ Template-based Trading Dashboard Uses MVC architecture with HTML templates and data models """ import logging -from typing import Optional, Any, Dict, List -from datetime import datetime +import sys +import os +from typing import Optional, Any, Dict, List, Deque +from datetime import datetime, timedelta import pandas as pd +import pytz +import time +import threading +from collections import deque +from dataclasses import asdict import dash from dash import dcc, html, Input, Output, State, callback_context import plotly.graph_objects as go import plotly.express as px -from .dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data -from .template_renderer import DashboardTemplateRenderer from core.data_provider import DataProvider from core.orchestrator import TradingOrchestrator from core.trading_executor import TradingExecutor +from core.config import get_config +from core.unified_data_stream import UnifiedDataStream +from web.dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data +from web.template_renderer import DashboardTemplateRenderer +from web.component_manager import DashboardComponentManager +from web.layout_manager import DashboardLayoutManager +from utils.checkpoint_manager import save_checkpoint, load_best_checkpoint +from NN.models.advanced_transformer_trading import create_trading_transformer, TradingTransformerConfig # Configure logging logger = logging.getLogger(__name__) @@ -29,29 +42,125 @@ class TemplatedTradingDashboard: orchestrator: Optional[TradingOrchestrator] = None, trading_executor: Optional[TradingExecutor] = None): """Initialize the templated dashboard""" - self.data_provider = data_provider - self.orchestrator = orchestrator - self.trading_executor = trading_executor + self.config = get_config() + + # Initialize components + self.data_provider = data_provider or DataProvider() + self.trading_executor = trading_executor or TradingExecutor() # Initialize template renderer self.renderer = DashboardTemplateRenderer() - # Initialize Dash app + # Initialize unified orchestrator with full ML capabilities + if orchestrator is None: + self.orchestrator = TradingOrchestrator( + data_provider=self.data_provider, + enhanced_rl_training=True, + model_registry={} + ) + logger.info("TEMPLATED DASHBOARD: Using unified Trading Orchestrator with full ML capabilities") + else: + self.orchestrator = orchestrator + + # Initialize enhanced training system for predictions + self.training_system = None + self._initialize_enhanced_training_system() + + # Initialize layout and component managers + self.layout_manager = DashboardLayoutManager( + starting_balance=self._get_initial_balance(), + trading_executor=self.trading_executor + ) + self.component_manager = DashboardComponentManager() + + # Initialize Universal Data Stream for the 5 timeseries architecture + self.unified_stream = UnifiedDataStream(self.data_provider, self.orchestrator) + self.stream_consumer_id = self.unified_stream.register_consumer( + consumer_name="TemplatedTradingDashboard", + callback=self._handle_unified_stream_data, + data_types=['ticks', 'ohlcv', 'training_data', 'ui_data'] + ) + logger.info(f"TEMPLATED DASHBOARD: Universal Data Stream initialized with consumer ID: {self.stream_consumer_id}") + logger.info("TEMPLATED DASHBOARD: Subscribed to Universal 5 Timeseries: ETH(ticks,1m,1h,1d) + BTC(ticks)") + + # Dashboard state + self.recent_decisions: list = [] + self.closed_trades: list = [] + self.current_prices: dict = {} + self.session_pnl = 0.0 + self.total_fees = 0.0 + self.current_position: Optional[float] = 0.0 + self.session_trades: list = [] + + # Model control toggles - separate inference and training + self.dqn_inference_enabled = True # Default: enabled + self.dqn_training_enabled = True # Default: enabled + self.cnn_inference_enabled = True + self.cnn_training_enabled = True + + # Leverage management - adjustable x1 to x100 + self.current_leverage = 50 # Default x50 leverage + self.min_leverage = 1 + self.max_leverage = 100 + self.pending_trade_case_id = None # For tracking opening trades until closure + + # WebSocket streaming + self.ws_price_cache: dict = {} + self.is_streaming = False + self.tick_cache: list = [] + + # COB data cache - enhanced with price buckets and memory system + self.cob_cache: dict = { + 'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}, + 'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0} + } + self.latest_cob_data: dict = {} # Cache for COB integration data + self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display) + + # COB High-frequency data handling (50-100 updates/sec) + self.cob_data_buffer: dict = {} # Buffer for high-freq data + self.cob_memory: dict = {} # Memory system like GPT - keeps last N snapshots + self.cob_price_buckets: dict = {} # Price bucket cache + self.cob_update_count = 0 + self.last_cob_broadcast: Dict[str, Optional[float]] = {'ETH/USDT': None, 'BTC/USDT': None} # Rate limiting for UI updates, updated type + self.cob_data_history: Dict[str, Deque[Any]] = { + 'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots + 'BTC/USDT': deque(maxlen=61) + } + + # Initialize timezone + timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia') + self.timezone = pytz.timezone(timezone_name) + + # Create Dash app self.app = dash.Dash(__name__, external_stylesheets=[ - 'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css' + 'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css', + 'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css' ]) - # Session data - self.session_start_time = datetime.now() - self.session_trades = [] - self.session_pnl = 0.0 - self.current_position = 0.0 + # Suppress Dash development mode logging + self.app.enable_dev_tools(debug=False, dev_tools_silence_routes_logging=True) # Setup layout and callbacks self._setup_layout() self._setup_callbacks() - logger.info("TEMPLATED DASHBOARD: Initialized with MVC architecture") + # Start data streams + self._initialize_streaming() + + # Connect to orchestrator for real trading signals + self._connect_to_orchestrator() + + # Initialize COB integration with high-frequency data handling + self._initialize_cob_integration() + + # Start signal generation loop to ensure continuous trading signals + self._start_signal_generation_loop() + + # Start training sessions if models are showing FRESH status + threading.Thread(target=self._delayed_training_check, daemon=True).start() + + logger.info("TEMPLATED DASHBOARD: Initialized with HIGH-FREQUENCY COB integration and signal generation") def _setup_layout(self): """Setup the dashboard layout using templates""" @@ -65,7 +174,16 @@ class TemplatedTradingDashboard: self.app.layout = layout - + def _get_initial_balance(self) -> float: + """Get initial balance from trading executor or default""" + try: + if self.trading_executor and hasattr(self.trading_executor, 'starting_balance'): + balance = getattr(self.trading_executor, 'starting_balance', None) + if balance and balance > 0: + return balance + except Exception as e: + logger.warning(f"Error getting balance: {e}") + return 100.0 # Default balance def _setup_callbacks(self): """Setup dashboard callbacks""" @@ -162,7 +280,8 @@ class TemplatedTradingDashboard: def update_closed_trades(n): """Update closed trades table""" try: - return self._render_closed_trades() + # Return the table wrapped in a Div + return html.Div(self._render_closed_trades()) except Exception as e: logger.error(f"Error updating closed trades: {e}") return html.Div("No trades") @@ -465,44 +584,32 @@ class TemplatedTradingDashboard: ]) ]) - def _render_closed_trades(self) -> html.Table: + def _render_closed_trades(self) -> html.Div: """Render closed trades table""" - return html.Table([ - html.Thead([ - html.Tr([ - html.Th("Time"), - html.Th("Symbol"), - html.Th("Side"), - html.Th("Size"), - html.Th("Entry"), - html.Th("Exit"), - html.Th("PnL"), - html.Th("Duration") - ]) - ]), - html.Tbody([ - html.Tr([ - html.Td("14:23:45"), - html.Td("ETH/USDT"), - html.Td([html.Span("BUY", className="badge bg-success")]), - html.Td("1.5"), - html.Td("$3420.45"), - html.Td("$3428.12"), - html.Td("$11.51", className="trade-profit"), - html.Td("2m 34s") - ]), - html.Tr([ - html.Td("14:21:12"), - html.Td("BTC/USDT"), - html.Td([html.Span("SELL", className="badge bg-danger")]), - html.Td("0.1"), - html.Td("$45150.23"), - html.Td("$45142.67"), - html.Td("-$0.76", className="trade-loss"), - html.Td("1m 12s") - ]) - ]) - ], className="table table-sm") + if not self.closed_trades: + return html.Div("No closed trades yet.", className="alert alert-info mt-3") + + # Create a DataFrame from closed trades + df_trades = pd.DataFrame(self.closed_trades) + + # Format columns for display + df_trades['timestamp'] = pd.to_datetime(df_trades['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S') + df_trades['entry_price'] = df_trades['entry_price'].apply(lambda x: f"${x:,.2f}") + df_trades['exit_price'] = df_trades['exit_price'].apply(lambda x: f"${x:,.2f}") + df_trades['pnl'] = df_trades['pnl'].apply(lambda x: f"${x:,.2f}") + df_trades['profit_percentage'] = df_trades['profit_percentage'].apply(lambda x: f"{x:,.2f}%") + df_trades['size'] = df_trades['size'].apply(lambda x: f"{x:,.4f}") + df_trades['fees'] = df_trades['fees'].apply(lambda x: f"${x:,.2f}") + + table_header = [html.Thead(html.Tr([html.Th(col) for col in df_trades.columns]))] + table_body = [html.Tbody([ + html.Tr([html.Td(df_trades.iloc[i][col]) for col in df_trades.columns]) for i in range(len(df_trades)) + ])] + + return html.Div( + html.Table(table_header + table_body, className="table table-striped table-hover table-sm"), + className="table-responsive" + ) def _execute_manual_trade(self, action: str): """Execute manual trade""" @@ -531,7 +638,583 @@ class TemplatedTradingDashboard: """Run the dashboard server""" logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}") self.app.run(host=host, port=port, debug=debug) + + def _handle_unified_stream_data(self, data): + """Placeholder for unified stream data handling.""" + logger.debug(f"Received data from unified stream: {data}") + def _delayed_training_check(self): + """Check and start training after a delay to allow initialization""" + try: + time.sleep(10) # Wait 10 seconds for initialization + logger.info("Checking if models need training activation...") + self._start_actual_training_if_needed() + except Exception as e: + logger.error(f"Error in delayed training check: {e}") + + def _initialize_enhanced_training_system(self): + """Initialize enhanced training system for model predictions""" + try: + # Try to import and initialize enhanced training system + from enhanced_realtime_training import EnhancedRealtimeTrainingSystem + + self.training_system = EnhancedRealtimeTrainingSystem( + orchestrator=self.orchestrator, + data_provider=self.data_provider, + dashboard=self + ) + + # Initialize prediction storage + if not hasattr(self.orchestrator, 'recent_dqn_predictions'): + self.orchestrator.recent_dqn_predictions = {} + if not hasattr(self.orchestrator, 'recent_cnn_predictions'): + self.orchestrator.recent_cnn_predictions = {} + + logger.info("TEMPLATED DASHBOARD: Enhanced training system initialized for model predictions") + + except ImportError: + logger.warning("TEMPLATED DASHBOARD: Enhanced training system not available - using mock predictions") + self.training_system = None + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error initializing enhanced training system: {e}") + self.training_system = None + + def _initialize_streaming(self): + """Initialize data streaming""" + try: + self._start_websocket_streaming() + self._start_data_collection() + logger.info("TEMPLATED DASHBOARD: Data streaming initialized") + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error initializing streaming: {e}") + + def _start_websocket_streaming(self): + """Start WebSocket streaming for real-time data.""" + ws_thread = threading.Thread(target=self._ws_worker, daemon=True) + ws_thread.start() + + def _ws_worker(self): + try: + import websocket + import json # Added import + def on_message(ws, message): + try: + data = json.loads(message) + if 'k' in data: + kline = data['k'] + tick_record = { + 'symbol': 'ETHUSDT', + 'datetime': datetime.fromtimestamp(int(kline['t']) / 1000), + 'open': float(kline['o']), + 'high': float(kline['h']), + 'low': float(kline['l']), + 'close': float(kline['c']), + 'price': float(kline['c']), + 'volume': float(kline['v']), + } + self.ws_price_cache['ETHUSDT'] = tick_record['price'] + self.current_prices['ETH/USDT'] = tick_record['price'] + self.tick_cache.append(tick_record) + if len(self.tick_cache) > 1000: + self.tick_cache.pop(0) + except Exception as e: + logger.warning(f"TEMPLATED DASHBOARD: WebSocket message error: {e}") + def on_error(ws, error): + logger.error(f"TEMPLATED DASHBOARD: WebSocket error: {error}") + self.is_streaming = False + def on_close(ws, close_status_code, close_msg): + logger.warning("TEMPLATED DASHBOARD: WebSocket connection closed") + self.is_streaming = False + def on_open(ws): + logger.info("TEMPLATED DASHBOARD: WebSocket connected") + self.is_streaming = True + ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s" + ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) + ws.run_forever() + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: WebSocket worker error: {e}") + self.is_streaming = False + + def _start_data_collection(self): + """Start background data collection""" + data_thread = threading.Thread(target=self._data_worker, daemon=True) + data_thread.start() + + def _data_worker(self): + while True: + try: + self._update_session_metrics() + time.sleep(5) + except Exception as e: + logger.warning(f"TEMPLATED DASHBOARD: Data collection error: {e}") + time.sleep(10) + + def _update_session_metrics(self): + """Update session P&L and total fees from closed trades.""" + try: + closed_trades = [] + if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'): + closed_trades = self.trading_executor.get_closed_trades() + self.closed_trades = closed_trades + if closed_trades: + self.session_pnl = sum(trade.get('pnl', 0) for trade in closed_trades) + self.total_fees = sum(trade.get('fees', 0) for trade in closed_trades) + else: + self.session_pnl = 0.0 + self.total_fees = 0.0 + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error updating session metrics: {e}") + + def _connect_to_orchestrator(self): + """Connect to orchestrator for real trading signals""" + try: + if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'): + import asyncio # Added import + # from dataclasses import asdict # Moved asdict to top-level import + + def connect_worker(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + # No need to run_until_complete here, just register the callback + self.orchestrator.add_decision_callback(self._on_trading_decision) + logger.info("TEMPLATED DASHBOARD: Successfully connected to orchestrator for trading signals.") + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Orchestrator connection worker failed: {e}") + thread = threading.Thread(target=connect_worker, daemon=True) + thread.start() + else: + logger.warning("TEMPLATED DASHBOARD: Orchestrator not available or doesn\'t support callbacks") + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error initiating orchestrator connection: {e}") + + async def _on_trading_decision(self, decision): + """Handle trading decision from orchestrator.""" + try: + action = getattr(decision, 'action', decision.get('action')) + if action == 'HOLD': + return + symbol = getattr(decision, 'symbol', decision.get('symbol', 'ETH/USDT')) + if 'ETH' not in symbol.upper(): + return + dashboard_decision = asdict(decision) if not isinstance(decision, dict) else decision.copy() + dashboard_decision['timestamp'] = datetime.now() + dashboard_decision['executed'] = False + self.recent_decisions.append(dashboard_decision) + if len(self.recent_decisions) > 200: + self.recent_decisions.pop(0) + logger.info(f"TEMPLATED DASHBOARD: [ORCHESTRATOR SIGNAL] Received: {action} for {symbol}") + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error handling trading decision: {e}") + + def _initialize_cob_integration(self): + """Initialize simple COB integration that works without async event loops""" + try: + logger.info("TEMPLATED DASHBOARD: Initializing simple COB integration for model feeding") + + # Initialize COB data storage + self.cob_bucketed_data = { + 'ETH/USDT': {}, + 'BTC/USDT': {} + } + self.cob_last_update: Dict[str, Optional[float]] = { + 'ETH/USDT': None, + 'BTC/USDT': None + } # Corrected type hint + + # Start simple COB data collection + self._start_simple_cob_collection() + + logger.info("TEMPLATED DASHBOARD: Simple COB integration initialized successfully") + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error initializing COB integration: {e}") + self.cob_integration = None + + def _start_simple_cob_collection(self): + """Start simple COB data collection using REST APIs (no async required)""" + try: + # threading and time already imported + + def cob_collector(): + """Collect COB data using simple REST API calls""" + while True: + try: + # Collect data for both symbols + for symbol in ['ETH/USDT', 'BTC/USDT']: + self._collect_simple_cob_data(symbol) + + # Sleep for 1 second between collections + time.sleep(1) + except Exception as e: + logger.debug(f"TEMPLATED DASHBOARD: Error in COB collection: {e}") + time.sleep(5) # Wait longer on error + + # Start collector in background thread + cob_thread = threading.Thread(target=cob_collector, daemon=True) + cob_thread.start() + + logger.info("TEMPLATED DASHBOARD: Simple COB data collection started") + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error starting COB collection: {e}") + + def _collect_simple_cob_data(self, symbol: str): + """Collect simple COB data using Binance REST API""" + try: + import requests # Added import + # time already imported + + # Use Binance REST API for order book data + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500" + + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + + # Process order book data + bids = [] + asks = [] + + # Process bids (buy orders) + for bid in data['bids'][:100]: # Top 100 levels + price = float(bid[0]) + size = float(bid[1]) + bids.append({ + 'price': price, + 'size': size, + 'total': price * size + }) + + # Process asks (sell orders) + for ask in data['asks'][:100]: # Top 100 levels + price = float(ask[0]) + size = float(ask[1]) + asks.append({ + 'price': price, + 'size': size, + 'total': price * size + }) + + # Calculate statistics + if bids and asks: + best_bid = max(bids, key=lambda x: x['price']) + best_ask = min(asks, key=lambda x: x['price']) + mid_price = (best_bid['price'] + best_ask['price']) / 2 + spread_bps = ((best_ask['price'] - best_bid['price']) / mid_price) * 10000 if mid_price > 0 else 0 + + total_bid_liquidity = sum(bid['total'] for bid in bids[:20]) + total_ask_liquidity = sum(ask['total'] for ask in asks[:20]) + total_liquidity = total_bid_liquidity + total_ask_liquidity + imbalance = (total_bid_liquidity - total_ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 + + # Create COB snapshot + cob_snapshot = { + 'symbol': symbol, + 'timestamp': time.time(), + 'bids': bids, + 'asks': asks, + 'stats': { + 'mid_price': mid_price, + 'spread_bps': spread_bps, + 'total_bid_liquidity': total_bid_liquidity, + 'total_ask_liquidity': total_ask_liquidity, + 'imbalance': imbalance, + 'exchanges_active': ['Binance'] + } + } + + # Store in history (keep last 15 seconds) + self.cob_data_history[symbol].append(cob_snapshot) + if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds + # Use slicing to remove old elements from deque to ensure correct behavior + while len(self.cob_data_history[symbol]) > 15: + self.cob_data_history[symbol].popleft() + + # Update latest data + self.latest_cob_data[symbol] = cob_snapshot + self.cob_last_update[symbol] = time.time() + + # Generate bucketed data for models + self._generate_bucketed_cob_data(symbol, cob_snapshot) + + logger.debug(f"TEMPLATED DASHBOARD: COB data collected for {symbol}: {len(bids)} bids, {len(asks)} asks") + + except Exception as e: + logger.debug(f"TEMPLATED DASHBOARD: Error collecting COB data for {symbol}: {e}") + + def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict): + """Generate bucketed COB data for model feeding""" + try: + # Create price buckets (1 basis point granularity) + bucket_size_bps = 1.0 + mid_price = cob_snapshot['stats']['mid_price'] + + # Initialize buckets + buckets = {} + + # Process bids into buckets + for bid in cob_snapshot['bids']: + price_offset_bps = ((bid['price'] - mid_price) / mid_price) * 10000 + bucket_key = int(price_offset_bps / bucket_size_bps) + + if bucket_key not in buckets: + buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0} + + buckets[bucket_key]['bid_volume'] += bid['total'] + + # Process asks into buckets + for ask in cob_snapshot['asks']: + price_offset_bps = ((ask['price'] - mid_price) / mid_price) * 10000 + bucket_key = int(price_offset_bps / bucket_size_bps) + + if bucket_key not in buckets: + buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0} + + buckets[bucket_key]['ask_volume'] += ask['total'] + + # Store bucketed data + self.cob_bucketed_data[symbol] = { + 'timestamp': cob_snapshot['timestamp'], + 'mid_price': mid_price, + 'buckets': buckets, + 'bucket_size_bps': bucket_size_bps + } + + # Feed to models + self._feed_cob_data_to_models(symbol, cob_snapshot) + + except Exception as e: + logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}") + + def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]: + """Calculate average imbalance over multiple time windows.""" + stats = {} + now = time.time() + history = self.cob_data_history.get(symbol) + + if not history: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + + periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} + + for name, duration in periods.items(): + recent_imbalances = [] + for snap in history: + # Check if snap is a valid dict with timestamp and stats + if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']: + imbalance = snap['stats'].get('imbalance') + if imbalance is not None: + recent_imbalances.append(imbalance) + + if recent_imbalances: + stats[name] = sum(recent_imbalances) / len(recent_imbalances) + else: + stats[name] = 0.0 + + # Debug logging to verify cumulative imbalance calculation + if any(value != 0.0 for value in stats.values()): + logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}") + + return stats + + def _feed_cob_data_to_models(self, symbol: str, cob_snapshot: dict): + """Feed COB data to models for training and inference""" + try: + # Calculate cumulative imbalance for model feeding + cumulative_imbalance = self._calculate_cumulative_imbalance(symbol) # Assumes _calculate_cumulative_imbalance is available + + history_data = { + 'symbol': symbol, + 'current_snapshot': cob_snapshot, + 'history': list(self.cob_data_history[symbol]), # Convert deque to list for consistent slicing + 'bucketed_data': self.cob_bucketed_data[symbol], + 'cumulative_imbalance': cumulative_imbalance, # Add cumulative imbalance + 'timestamp': cob_snapshot['timestamp'] + } + + # Pass to orchestrator for model feeding + if self.orchestrator and hasattr(self.orchestrator, 'feed_cob_data'): + self.orchestrator.feed_cob_data(symbol, history_data) # Assumes feed_cob_data exists in orchestrator + + except Exception as e: + logger.debug(f"TEMPLATED DASHBOARD: Error feeding COB data to models: {e}") + + def _is_signal_generation_active(self) -> bool: + """Check if signal generation is active (e.g., models are loaded and running)""" + # For now, return true to always generate signals + # In a real system, this would check model loading status, training status, etc. + return True # Simplified for initial integration + + def _start_signal_generation_loop(self): + """Start signal generation loop to ensure continuous trading signals""" + try: + def signal_worker(): + logger.info("TEMPLATED DASHBOARD: Signal generation worker started") + while True: + try: + # Ensure signal generation is active before processing + if self._is_signal_generation_active(): + symbol = 'ETH/USDT' # Focus on ETH for now + current_price = self._get_current_price(symbol) + if current_price: + # Generate a momentum signal (simplified for demo) + signal = self._generate_momentum_signal(symbol, current_price) # Assumes _generate_momentum_signal is available + if signal: + self._process_dashboard_signal(signal) # Assumes _process_dashboard_signal is available + + # Generate a DQN signal if enabled + if self.dqn_inference_enabled: + dqn_signal = self._generate_dqn_signal(symbol, current_price) # Assumes _generate_dqn_signal is available + if dqn_signal: + self._process_dashboard_signal(dqn_signal) + + # Generate a CNN pivot signal if enabled + if self.cnn_inference_enabled: + cnn_signal = self._get_cnn_pivot_prediction() # Assumes _get_cnn_pivot_prediction is available + if cnn_signal: + self._process_dashboard_signal(cnn_signal) + + # Update session metrics every 1 second interval to reflect new trades + self._update_session_metrics() + + time.sleep(1) # Run every second for signal generation + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error in signal worker: {e}") + time.sleep(5) # Longer sleep on error + + signal_thread = threading.Thread(target=signal_worker, daemon=True) + signal_thread.start() + logger.info("TEMPLATED DASHBOARD: Signal generation loop started") + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error starting signal generation loop: {e}") + + def _start_actual_training_if_needed(self): + """Start actual model training with real data collection and training loops""" + try: + if not self.orchestrator: + logger.warning("TEMPLATED DASHBOARD: No orchestrator available for training") + return + logger.info("TEMPLATED DASHBOARD: TRAINING: Starting actual training system with real data collection") + self._start_real_training_system() + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error starting comprehensive training system: {e}") + + def _start_real_training_system(self): + """Start real training system with data collection and actual model training""" + try: + # Training performance metrics + self.training_performance = { + 'decision': {'inference_times': [], 'training_times': [], 'total_calls': 0}, + 'cob_rl': {'inference_times': [], 'training_times': [], 'total_calls': 0}, + 'dqn': {'inference_times': [], 'training_times': [], 'total_calls': 0}, + 'cnn': {'inference_times': [], 'training_times': [], 'total_calls': 0}, + 'transformer': {'inference_times': [], 'training_times': [], 'total_calls': 0} # Added for transformer + } + + def training_coordinator(): + logger.info("TEMPLATED DASHBOARD: TRAINING: High-frequency training coordinator started") + training_iteration = 0 + last_dqn_training = 0 + last_cnn_training = 0 + last_decision_training = 0 + last_cob_rl_training = 0 + last_transformer_training = 0 # For transformer + + while True: + try: + training_iteration += 1 + current_time = time.time() + market_data = self._collect_training_data() # Assumes _collect_training_data is available + + if market_data: + logger.debug(f"TEMPLATED DASHBOARD: TRAINING: Collected {len(market_data)} market data points for training") + + # High-frequency training for split-second decisions + # Train decision fusion and COB RL as fast as hardware allows + if current_time - last_decision_training > 0.1: # Every 100ms + start_time = time.time() + self._perform_real_decision_training(market_data) # Assumes _perform_real_decision_training is available + training_time = time.time() - start_time + self.training_performance['decision']['training_times'].append(training_time) + self.training_performance['decision']['total_calls'] += 1 + last_decision_training = current_time + + # Keep only last 100 measurements + if len(self.training_performance['decision']['training_times']) > 100: + self.training_performance['decision']['training_times'] = self.training_performance['decision']['training_times'][-100:] + + # Advanced Transformer Training (every 200ms for comprehensive features) + if current_time - last_transformer_training > 0.2: # Every 200ms for transformer + start_time = time.time() + self._perform_real_transformer_training(market_data) # Assumes _perform_real_transformer_training is available + training_time = time.time() - start_time + self.training_performance['transformer']['training_times'].append(training_time) + self.training_performance['transformer']['total_calls'] += 1 + last_transformer_training = current_time # Update last training time + + # Keep only last 100 measurements + if len(self.training_performance['transformer']['training_times']) > 100: + self.training_performance['transformer']['training_times'] = self.training_performance['transformer']['training_times'][-100:] + + if current_time - last_cob_rl_training > 0.1: # Every 100ms + start_time = time.time() + self._perform_real_cob_rl_training(market_data) # Assumes _perform_real_cob_rl_training is available + training_time = time.time() - start_time + self.training_performance['cob_rl']['training_times'].append(training_time) + self.training_performance['cob_rl']['total_calls'] += 1 + last_cob_rl_training = current_time + + # Keep only last 100 measurements + if len(self.training_performance['cob_rl']['training_times']) > 100: + self.training_performance['cob_rl']['training_times'] = self.training_performance['cob_rl']['training_times'][-100:] + + # Standard frequency for larger models + if current_time - last_dqn_training > 30: + start_time = time.time() + self._perform_real_dqn_training(market_data) # Assumes _perform_real_dqn_training is available + training_time = time.time() - start_time + self.training_performance['dqn']['training_times'].append(training_time) + self.training_performance['dqn']['total_calls'] += 1 + last_dqn_training = current_time + + if len(self.training_performance['dqn']['training_times']) > 50: + self.training_performance['dqn']['training_times'] = self.training_performance['dqn']['training_times'][-50:] + + if current_time - last_cnn_training > 45: + start_time = time.time() + self._perform_real_cnn_training(market_data) # Assumes _perform_real_cnn_training is available + training_time = time.time() - start_time + self.training_performance['cnn']['training_times'].append(training_time) + self.training_performance['cnn']['total_calls'] += 1 + last_cnn_training = current_time + + if len(self.training_performance['cnn']['training_times']) > 50: + self.training_performance['cnn']['training_times'] = self.training_performance['cnn']['training_times'][-50:] + + self._update_training_progress(training_iteration) # Assumes _update_training_progress is available + + # Log performance metrics every 100 iterations + if training_iteration % 100 == 0: + self._log_training_performance() # Assumes _log_training_performance is available + logger.info(f"TEMPLATED DASHBOARD: TRAINING: Iteration {training_iteration} - High-frequency training active") + + # Minimal sleep for maximum responsiveness + time.sleep(0.05) # 50ms sleep for 20Hz training loop + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: TRAINING: Error in training iteration {training_iteration}: {e}") + time.sleep(1) # Shorter error recovery + + training_thread = threading.Thread(target=training_coordinator, daemon=True) + training_thread.start() + logger.info("TEMPLATED DASHBOARD: Real training system started") + + except Exception as e: + logger.error(f"TEMPLATED DASHBOARD: Error starting real training system: {e}") def create_templated_dashboard(data_provider: Optional[DataProvider] = None, orchestrator: Optional[TradingOrchestrator] = None,