#!/usr/bin/env python3 """ Run Clean Trading Dashboard with Full Training Pipeline Integrated system with both training loop and clean web dashboard """ import os # Fix OpenMP library conflicts before importing other modules os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' os.environ['OMP_NUM_THREADS'] = '4' import asyncio import logging import sys import threading import time from pathlib import Path # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from core.config import get_config, setup_logging from core.data_provider import DataProvider # Import checkpoint management from utils.checkpoint_manager import get_checkpoint_manager from utils.training_integration import get_training_integration # Setup logging setup_logging() logger = logging.getLogger(__name__) async def start_training_pipeline(orchestrator, trading_executor): """Start the training pipeline in the background""" logger.info("=" * 70) logger.info("STARTING TRAINING PIPELINE WITH CLEAN DASHBOARD") logger.info("=" * 70) # Initialize checkpoint management checkpoint_manager = get_checkpoint_manager() training_integration = get_training_integration() # Training statistics training_stats = { 'iteration_count': 0, 'total_decisions': 0, 'successful_trades': 0, 'best_performance': 0.0, 'last_checkpoint_iteration': 0 } try: # Start real-time processing await orchestrator.start_realtime_processing() logger.info("✅ Real-time processing started") # Start COB integration if hasattr(orchestrator, 'start_cob_integration'): await orchestrator.start_cob_integration() logger.info("✅ COB integration started") # Main training loop iteration = 0 last_checkpoint_time = time.time() while True: try: iteration += 1 training_stats['iteration_count'] = iteration # Get symbols to process symbols = orchestrator.symbols if hasattr(orchestrator, 'symbols') else ['ETH/USDT'] # Process each symbol for symbol in symbols: try: # Make trading decision (this triggers model training) decision = await orchestrator.make_trading_decision(symbol) if decision: training_stats['total_decisions'] += 1 logger.debug(f"[{symbol}] Decision: {decision.action} @ {decision.confidence:.1%}") except Exception as e: logger.warning(f"Error processing {symbol}: {e}") # Status logging every 100 iterations if iteration % 100 == 0: current_time = time.time() elapsed = current_time - last_checkpoint_time logger.info(f"[TRAINING] Iteration {iteration}, Decisions: {training_stats['total_decisions']}, Time: {elapsed:.1f}s") # Models will save their own checkpoints when performance improves training_stats['last_checkpoint_iteration'] = iteration last_checkpoint_time = current_time # Brief pause to prevent overwhelming the system await asyncio.sleep(0.1) # 100ms between iterations except Exception as e: logger.error(f"Training loop error: {e}") await asyncio.sleep(5) # Wait longer on error except Exception as e: logger.error(f"Training pipeline error: {e}") import traceback logger.error(traceback.format_exc()) def start_clean_dashboard_with_training(): """Start clean dashboard with full training pipeline""" try: logger.info("=" * 80) logger.info("CLEAN TRADING DASHBOARD + FULL TRAINING PIPELINE") logger.info("=" * 80) logger.info("Features: Real-time Training, COB Integration, Clean UI") logger.info("Universal Data Stream: ENABLED") logger.info("Neural Decision Fusion: ENABLED") logger.info("COB Integration: ENABLED") logger.info("GPU Training: ENABLED") logger.info("Multi-symbol: ETH/USDT, BTC/USDT") # Get port from environment or use default dashboard_port = int(os.environ.get('DASHBOARD_PORT', '8051')) logger.info(f"Dashboard: http://127.0.0.1:{dashboard_port}") logger.info("=" * 80) # Check environment variables enable_universal_stream = os.environ.get('ENABLE_UNIVERSAL_DATA_STREAM', '1') == '1' enable_nn_fusion = os.environ.get('ENABLE_NN_DECISION_FUSION', '1') == '1' enable_cob = os.environ.get('ENABLE_COB_INTEGRATION', '1') == '1' logger.info(f"Universal Data Stream: {'ENABLED' if enable_universal_stream else 'DISABLED'}") logger.info(f"Neural Decision Fusion: {'ENABLED' if enable_nn_fusion else 'DISABLED'}") logger.info(f"COB Integration: {'ENABLED' if enable_cob else 'DISABLED'}") # Get configuration config = get_config() # Initialize core components from core.data_provider import DataProvider from core.enhanced_orchestrator import EnhancedTradingOrchestrator from core.trading_executor import TradingExecutor # Create data provider data_provider = DataProvider() # Create enhanced orchestrator with full training capabilities orchestrator = EnhancedTradingOrchestrator( data_provider=data_provider, symbols=['ETH/USDT', 'BTC/USDT'], enhanced_rl_training=True, # Enable RL training model_registry={} ) logger.info("✅ Enhanced Trading Orchestrator created with training enabled") # Create trading executor trading_executor = TradingExecutor() # Import clean dashboard from web.clean_dashboard import create_clean_dashboard # Create clean dashboard dashboard = create_clean_dashboard( data_provider=data_provider, orchestrator=orchestrator, trading_executor=trading_executor ) logger.info("✅ Clean Trading Dashboard created") # Start training pipeline in background thread def training_worker(): """Run training pipeline in background""" try: asyncio.run(start_training_pipeline(orchestrator, trading_executor)) except Exception as e: logger.error(f"Training worker error: {e}") training_thread = threading.Thread(target=training_worker, daemon=True) training_thread.start() logger.info("✅ Training pipeline started in background") # Wait a moment for training to initialize time.sleep(3) # Start dashboard server (this blocks) logger.info("🚀 Starting Clean Dashboard Server...") dashboard.run_server(host='127.0.0.1', port=dashboard_port, debug=False) except KeyboardInterrupt: logger.info("System stopped by user") except Exception as e: logger.error(f"Error running clean dashboard with training: {e}") import traceback traceback.print_exc() sys.exit(1) def main(): """Main function""" start_clean_dashboard_with_training() if __name__ == "__main__": main()