#!/usr/bin/env python3 """ Run Clean Trading Dashboard with Full Training Pipeline Integrated system with both training loop and clean web dashboard """ import os # Fix OpenMP library conflicts before importing other modules os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' os.environ['OMP_NUM_THREADS'] = '4' # Fix matplotlib backend issue - set non-interactive backend before any imports import matplotlib matplotlib.use('Agg') # Use non-interactive Agg backend import asyncio import logging import sys import platform from safe_logging import setup_safe_logging import threading import time from pathlib import Path # Windows-specific async event loop configuration if platform.system() == "Windows": # Use ProactorEventLoop on Windows for better I/O handling asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from core.config import get_config, setup_logging from core.data_provider import DataProvider # Import checkpoint management from utils.checkpoint_manager import get_checkpoint_manager from utils.training_integration import get_training_integration # Setup logging setup_safe_logging() logger = logging.getLogger(__name__) async def start_training_pipeline(orchestrator, trading_executor): """Start the training pipeline in the background with comprehensive error handling""" logger.info("=" * 70) logger.info("STARTING TRAINING PIPELINE WITH CLEAN DASHBOARD") logger.info("=" * 70) # Set up async exception handler def handle_async_exception(loop, context): """Handle uncaught async exceptions""" exception = context.get('exception') if exception: logger.error(f"Uncaught async exception: {exception}") logger.error(f"Context: {context}") else: logger.error(f"Async error: {context.get('message', 'Unknown error')}") # Get current event loop and set exception handler loop = asyncio.get_running_loop() loop.set_exception_handler(handle_async_exception) # Initialize checkpoint management checkpoint_manager = get_checkpoint_manager() training_integration = get_training_integration() # Training statistics training_stats = { 'iteration_count': 0, 'total_decisions': 0, 'successful_trades': 0, 'best_performance': 0.0, 'last_checkpoint_iteration': 0 } try: # Start real-time processing with error handling try: if hasattr(orchestrator, 'start_realtime_processing'): await orchestrator.start_realtime_processing() logger.info("Real-time processing started") except Exception as e: logger.error(f"Error starting real-time processing: {e}") # Start COB integration with error handling try: if hasattr(orchestrator, 'start_cob_integration'): await orchestrator.start_cob_integration() logger.info("COB integration started - 5-minute data matrix active") else: logger.info("COB integration not available") except Exception as e: logger.error(f"Error starting COB integration: {e}") # Main training loop iteration = 0 last_checkpoint_time = time.time() while True: try: iteration += 1 training_stats['iteration_count'] = iteration # Get symbols to process symbols = orchestrator.symbols if hasattr(orchestrator, 'symbols') else ['ETH/USDT'] # Process each symbol for symbol in symbols: try: # Make trading decision (this triggers model training) decision = await orchestrator.make_trading_decision(symbol) if decision: training_stats['total_decisions'] += 1 logger.debug(f"[{symbol}] Decision: {decision.action} @ {decision.confidence:.1%}") except Exception as e: logger.warning(f"Error processing {symbol}: {e}") # Status logging every 100 iterations if iteration % 100 == 0: current_time = time.time() elapsed = current_time - last_checkpoint_time logger.info(f"[TRAINING] Iteration {iteration}, Decisions: {training_stats['total_decisions']}, Time: {elapsed:.1f}s") # Models will save their own checkpoints when performance improves training_stats['last_checkpoint_iteration'] = iteration last_checkpoint_time = current_time # Brief pause to prevent overwhelming the system await asyncio.sleep(0.1) # 100ms between iterations except Exception as e: logger.error(f"Training loop error: {e}") await asyncio.sleep(5) # Wait longer on error except Exception as e: logger.error(f"Training pipeline error: {e}") import traceback logger.error(traceback.format_exc()) def start_clean_dashboard_with_training(): """Start clean dashboard with full training pipeline""" try: logger.info("=" * 80) logger.info("CLEAN TRADING DASHBOARD + FULL TRAINING PIPELINE") logger.info("=" * 80) logger.info("Features: Real-time Training, COB Integration, Clean UI") logger.info("Universal Data Stream: ENABLED") logger.info("Neural Decision Fusion: ENABLED") logger.info("COB Integration: ENABLED") logger.info("GPU Training: ENABLED") logger.info("TensorBoard Integration: ENABLED") logger.info("Multi-symbol: ETH/USDT, BTC/USDT") # Get port from environment or use default dashboard_port = int(os.environ.get('DASHBOARD_PORT', '8051')) tensorboard_port = int(os.environ.get('TENSORBOARD_PORT', '6006')) logger.info(f"Dashboard: http://127.0.0.1:{dashboard_port}") logger.info(f"TensorBoard: http://127.0.0.1:{tensorboard_port}") logger.info("=" * 80) # Check environment variables enable_universal_stream = os.environ.get('ENABLE_UNIVERSAL_DATA_STREAM', '1') == '1' enable_nn_fusion = os.environ.get('ENABLE_NN_DECISION_FUSION', '1') == '1' enable_cob = os.environ.get('ENABLE_COB_INTEGRATION', '1') == '1' logger.info(f"Universal Data Stream: {'ENABLED' if enable_universal_stream else 'DISABLED'}") logger.info(f"Neural Decision Fusion: {'ENABLED' if enable_nn_fusion else 'DISABLED'}") logger.info(f"COB Integration: {'ENABLED' if enable_cob else 'DISABLED'}") # Get configuration config = get_config() # Initialize core components with standardized versions from core.standardized_data_provider import StandardizedDataProvider from core.orchestrator import TradingOrchestrator from core.trading_executor import TradingExecutor # Create standardized data provider data_provider = StandardizedDataProvider() logger.info("StandardizedDataProvider created with BaseDataInput support") # Create enhanced orchestrator with standardized data provider orchestrator = TradingOrchestrator(data_provider, enhanced_rl_training=True) logger.info("Enhanced Trading Orchestrator created with COB integration") # Create trading executor trading_executor = TradingExecutor(config_path="config.yaml") logger.info(f"Creating trading executor with {trading_executor.primary_name} configuration...") # Connect trading executor to orchestrator orchestrator.trading_executor = trading_executor logger.info("Trading Executor connected to Orchestrator") # Initialize system resource monitoring from utils.system_monitor import start_system_monitoring system_monitor = start_system_monitoring() # Set up cleanup callback for memory management def cleanup_callback(): """Custom cleanup for memory management""" try: # Clear orchestrator caches if hasattr(orchestrator, 'recent_decisions'): for symbol in orchestrator.recent_decisions: if len(orchestrator.recent_decisions[symbol]) > 50: orchestrator.recent_decisions[symbol] = orchestrator.recent_decisions[symbol][-25:] # Clear data provider caches if hasattr(data_provider, 'clear_old_data'): data_provider.clear_old_data() logger.info("Custom memory cleanup completed") except Exception as e: logger.error(f"Error in custom cleanup: {e}") system_monitor.set_callbacks(cleanup=cleanup_callback) logger.info("System resource monitoring started with memory cleanup") # Import clean dashboard from web.clean_dashboard import create_clean_dashboard # Create clean dashboard logger.info("Creating clean dashboard...") dashboard = create_clean_dashboard(data_provider, orchestrator, trading_executor) logger.info("Clean Trading Dashboard created") # Add memory cleanup method to dashboard def cleanup_dashboard_memory(): """Clean up dashboard memory caches""" try: if hasattr(dashboard, 'recent_decisions'): dashboard.recent_decisions = dashboard.recent_decisions[-50:] # Keep last 50 if hasattr(dashboard, 'closed_trades'): dashboard.closed_trades = dashboard.closed_trades[-100:] # Keep last 100 if hasattr(dashboard, 'tick_cache'): dashboard.tick_cache = dashboard.tick_cache[-1000:] # Keep last 1000 logger.debug("Dashboard memory cleanup completed") except Exception as e: logger.error(f"Error in dashboard memory cleanup: {e}") # Set cleanup method on dashboard dashboard.cleanup_memory = cleanup_dashboard_memory # Start training pipeline in background thread with enhanced error handling def training_worker(): """Run training pipeline in background with comprehensive error handling""" try: asyncio.run(start_training_pipeline(orchestrator, trading_executor)) except KeyboardInterrupt: logger.info("Training worker stopped by user") except Exception as e: logger.error(f"Training worker error: {e}") import traceback logger.error(f"Training worker traceback: {traceback.format_exc()}") # Don't exit - let main thread handle restart training_thread = threading.Thread(target=training_worker, daemon=True) training_thread.start() logger.info("Training pipeline started in background with error handling") # Wait a moment for training to initialize time.sleep(3) # Start TensorBoard in background from web.tensorboard_integration import get_tensorboard_integration tensorboard_port = int(os.environ.get('TENSORBOARD_PORT', '6006')) tensorboard_integration = get_tensorboard_integration(log_dir="runs", port=tensorboard_port) # Start TensorBoard server tensorboard_started = tensorboard_integration.start_tensorboard(open_browser=False) if tensorboard_started: logger.info(f"TensorBoard started at {tensorboard_integration.get_tensorboard_url()}") else: logger.warning("Failed to start TensorBoard - training metrics will not be visualized") # Start dashboard server with error handling (this blocks) logger.info("Starting Clean Dashboard Server with error handling...") try: dashboard.run_server(host='127.0.0.1', port=dashboard_port, debug=False) except Exception as e: logger.error(f"Dashboard server error: {e}") import traceback logger.error(f"Dashboard server traceback: {traceback.format_exc()}") raise # Re-raise to trigger main error handling except KeyboardInterrupt: logger.info("System stopped by user") # Stop TensorBoard try: tensorboard_integration = get_tensorboard_integration() tensorboard_integration.stop_tensorboard() except: pass except Exception as e: logger.error(f"Error running clean dashboard with training: {e}") import traceback traceback.print_exc() sys.exit(1) def main(): """Main function with comprehensive error handling""" try: start_clean_dashboard_with_training() except KeyboardInterrupt: logger.info("Dashboard stopped by user (Ctrl+C)") sys.exit(0) except Exception as e: logger.error(f"Critical error in main: {e}") import traceback logger.error(traceback.format_exc()) sys.exit(1) if __name__ == "__main__": # Ensure logging is flushed on exit import atexit def flush_logs(): logging.shutdown() atexit.register(flush_logs) main()