From 2ef7ed011d62d30524801472ab8ee91adf7dd088 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Thu, 19 Jun 2025 01:12:10 +0300 Subject: [PATCH] cob integration (wip) --- core/enhanced_orchestrator.py | 271 ++++++++++++++++++++++++++++++- main.py | 187 ++++++++++++++++----- run_enhanced_cob_training.py | 233 ++++++++++++++++++++++++++ run_main_dashboard.py | 79 +++++++++ test_dashboard_simple.py | 138 +++++++++++----- test_enhanced_cob_integration.py | 201 +++++++++++++++++++++++ 6 files changed, 1011 insertions(+), 98 deletions(-) create mode 100644 run_enhanced_cob_training.py create mode 100644 run_main_dashboard.py create mode 100644 test_enhanced_cob_integration.py diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index 1e56399..2fa14ba 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -8,6 +8,7 @@ This enhanced orchestrator implements: 4. Perfect move marking for CNN backpropagation training 5. Market environment adaptation through RL evaluation 6. Universal data format compliance (5 timeseries streams) +7. Consolidated Order Book (COB) integration for real-time market microstructure """ import asyncio @@ -32,6 +33,7 @@ from .trading_action import TradingAction from .negative_case_trainer import NegativeCaseTrainer from .trading_executor import TradingExecutor from .cnn_monitor import log_cnn_prediction, start_cnn_training_session +from .cob_integration import COBIntegration # Enhanced pivot RL trainer functionality integrated into orchestrator logger = logging.getLogger(__name__) @@ -71,7 +73,7 @@ class TradingAction: @dataclass class MarketState: - """Complete market state for RL evaluation with comprehensive data""" + """Complete market state for RL evaluation with comprehensive data including COB""" symbol: str timestamp: datetime prices: Dict[str, float] # {timeframe: current_price} @@ -90,6 +92,14 @@ class MarketState: cnn_predictions: Optional[Dict[str, np.ndarray]] = None # CNN predictions by timeframe pivot_points: Optional[Dict[str, Any]] = None # Williams market structure data market_microstructure: Dict[str, Any] = field(default_factory=dict) # Tick-level patterns + + # COB (Consolidated Order Book) data for market microstructure analysis + cob_features: Optional[np.ndarray] = None # COB CNN features (200 dimensions) + cob_state: Optional[np.ndarray] = None # COB DQN state features (50 dimensions) + order_book_imbalance: float = 0.0 # Bid/ask imbalance ratio + liquidity_depth: float = 0.0 # Total liquidity within 1% of mid price + exchange_diversity: float = 0.0 # Number of exchanges contributing to liquidity + market_impact_estimate: float = 0.0 # Estimated market impact for standard trade size @dataclass class PerfectMove: @@ -136,7 +146,7 @@ class EnhancedTradingOrchestrator: symbols: List[str] = None, enhanced_rl_training: bool = True, model_registry: Dict = None): - """Initialize the enhanced orchestrator with 2-action system""" + """Initialize the enhanced orchestrator with 2-action system and COB integration""" self.config = get_config() self.data_provider = data_provider or DataProvider() self.model_registry = model_registry or get_model_registry() @@ -155,6 +165,22 @@ class EnhancedTradingOrchestrator: if self.enhanced_rl_training: logger.info("Enhanced RL training enabled") + # Initialize COB Integration for real-time market microstructure + self.cob_integration = COBIntegration( + data_provider=self.data_provider, + symbols=self.symbols + ) + # Register COB callbacks for CNN and RL models + self.cob_integration.add_cnn_callback(self._on_cob_cnn_features) + self.cob_integration.add_dqn_callback(self._on_cob_dqn_state) + + # COB feature storage for model integration + self.latest_cob_features: Dict[str, np.ndarray] = {} + self.latest_cob_state: Dict[str, np.ndarray] = {} + self.cob_feature_history: Dict[str, deque] = {symbol: deque(maxlen=100) for symbol in self.symbols} + + logger.info("COB Integration initialized for real-time market microstructure") + # Position tracking for 2-action system self.current_positions = {} # symbol -> {'side': 'LONG'|'SHORT'|'FLAT', 'entry_price': float, 'timestamp': datetime} self.last_signals = {} # symbol -> {'action': 'BUY'|'SELL', 'timestamp': datetime, 'confidence': float} @@ -414,6 +440,46 @@ class EnhancedTradingOrchestrator: # Analyze market microstructure market_microstructure = self._analyze_market_microstructure(raw_ticks) + # Get COB (Consolidated Order Book) data if available + cob_features = self.latest_cob_features.get(symbol) + cob_state = self.latest_cob_state.get(symbol) + + # Get COB snapshot for additional metrics + cob_snapshot = None + order_book_imbalance = 0.0 + liquidity_depth = 0.0 + exchange_diversity = 0.0 + market_impact_estimate = 0.0 + + try: + if self.cob_integration: + cob_snapshot = self.cob_integration.get_cob_snapshot(symbol) + if cob_snapshot: + # Calculate order book imbalance + bid_liquidity = sum(level.total_volume_usd for level in cob_snapshot.consolidated_bids[:10]) + ask_liquidity = sum(level.total_volume_usd for level in cob_snapshot.consolidated_asks[:10]) + if ask_liquidity > 0: + order_book_imbalance = (bid_liquidity - ask_liquidity) / (bid_liquidity + ask_liquidity) + + # Calculate liquidity depth (within 1% of mid price) + mid_price = cob_snapshot.volume_weighted_mid + price_range = mid_price * 0.01 # 1% + depth_bids = [l for l in cob_snapshot.consolidated_bids if l.price >= mid_price - price_range] + depth_asks = [l for l in cob_snapshot.consolidated_asks if l.price <= mid_price + price_range] + liquidity_depth = sum(l.total_volume_usd for l in depth_bids + depth_asks) + + # Calculate exchange diversity + all_exchanges = set() + for level in cob_snapshot.consolidated_bids[:20] + cob_snapshot.consolidated_asks[:20]: + all_exchanges.update(level.exchange_breakdown.keys()) + exchange_diversity = len(all_exchanges) + + # Estimate market impact for 10k USD trade + market_impact_estimate = self._estimate_market_impact(cob_snapshot, 10000) + + except Exception as e: + logger.warning(f"Error calculating COB metrics for {symbol}: {e}") + # Create comprehensive market state market_state = MarketState( symbol=symbol, @@ -431,17 +497,51 @@ class EnhancedTradingOrchestrator: cnn_hidden_features=cnn_hidden_features, cnn_predictions=cnn_predictions, pivot_points=pivot_points, - market_microstructure=market_microstructure + market_microstructure=market_microstructure, + # COB data integration + cob_features=cob_features, + cob_state=cob_state, + order_book_imbalance=order_book_imbalance, + liquidity_depth=liquidity_depth, + exchange_diversity=exchange_diversity, + market_impact_estimate=market_impact_estimate ) market_states[symbol] = market_state - logger.debug(f"Created comprehensive market state for {symbol} with {len(raw_ticks)} ticks") + logger.debug(f"Created comprehensive market state for {symbol} with COB integration") except Exception as e: logger.error(f"Error creating market state for {symbol}: {e}") return market_states + def _estimate_market_impact(self, cob_snapshot, trade_size_usd: float) -> float: + """Estimate market impact for a given trade size""" + try: + # Simple market impact estimation based on order book depth + cumulative_volume = 0 + weighted_price = 0 + mid_price = cob_snapshot.volume_weighted_mid + + # For buy orders, walk through asks + for level in cob_snapshot.consolidated_asks: + if cumulative_volume >= trade_size_usd: + break + volume_needed = min(level.total_volume_usd, trade_size_usd - cumulative_volume) + weighted_price += level.price * volume_needed + cumulative_volume += volume_needed + + if cumulative_volume > 0: + avg_execution_price = weighted_price / cumulative_volume + impact = (avg_execution_price - mid_price) / mid_price + return abs(impact) + + return 0.0 + + except Exception as e: + logger.warning(f"Error estimating market impact: {e}") + return 0.0 + def _get_recent_tick_data_for_rl(self, symbol: str, seconds: int = 300) -> List[Dict[str, Any]]: """Get recent tick data for RL state building""" try: @@ -600,18 +700,130 @@ class EnhancedTradingOrchestrator: hidden_features[model_name] = model_hidden if model_pred is not None: predictions[model_name] = model_pred - except Exception as e: - logger.warning(f"Error getting features from CNN model {model_name}: {e}") + logger.warning(f"Error extracting CNN features from {model_name}: {e}") + + return (hidden_features if hidden_features else None, + predictions if predictions else None) - return hidden_features if hidden_features else None, predictions if predictions else None - return None, None except Exception as e: logger.warning(f"Error getting CNN features for {symbol}: {e}") return None, None + def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]: + """Get latest price for symbol and timeframe from universal data stream""" + try: + if symbol == 'ETH/USDT': + if timeframe == '1s' and len(universal_stream.eth_ticks) > 0: + # Get latest tick price (close price is at index 4) + return float(universal_stream.eth_ticks[-1, 4]) # close price + elif timeframe == '1m' and len(universal_stream.eth_1m) > 0: + return float(universal_stream.eth_1m[-1, 4]) # close price + elif timeframe == '1h' and len(universal_stream.eth_1h) > 0: + return float(universal_stream.eth_1h[-1, 4]) # close price + elif timeframe == '1d' and len(universal_stream.eth_1d) > 0: + return float(universal_stream.eth_1d[-1, 4]) # close price + elif symbol == 'BTC/USDT': + if timeframe == '1s' and len(universal_stream.btc_ticks) > 0: + return float(universal_stream.btc_ticks[-1, 4]) # close price + + # Fallback to data provider + return self._get_latest_price_fallback(symbol, timeframe) + + except Exception as e: + logger.warning(f"Error getting latest price for {symbol} {timeframe}: {e}") + return self._get_latest_price_fallback(symbol, timeframe) + + def _get_latest_price_fallback(self, symbol: str, timeframe: str) -> Optional[float]: + """Fallback method to get latest price from data provider""" + try: + df = self.data_provider.get_historical_data(symbol, timeframe, limit=1) + if df is not None and not df.empty: + return float(df['close'].iloc[-1]) + return None + except Exception as e: + logger.warning(f"Error in price fallback for {symbol} {timeframe}: {e}") + return None + + def _calculate_volatility_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float: + """Calculate volatility from universal data stream""" + try: + if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 1: + # Calculate volatility from 1m candles + closes = universal_stream.eth_1m[:, 4] # close prices + if len(closes) > 1: + returns = np.diff(np.log(closes)) + return float(np.std(returns) * np.sqrt(1440)) # Daily volatility + elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 1: + # Calculate volatility from tick data + closes = universal_stream.btc_ticks[:, 4] # close prices + if len(closes) > 1: + returns = np.diff(np.log(closes)) + return float(np.std(returns) * np.sqrt(86400)) # Daily volatility + return 0.0 + except Exception as e: + logger.warning(f"Error calculating volatility for {symbol}: {e}") + return 0.0 + + def _calculate_volume_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float: + """Calculate volume from universal data stream""" + try: + if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 0: + # Get latest volume from 1m candles + volumes = universal_stream.eth_1m[:, 5] # volume + return float(np.mean(volumes[-10:])) # Average of last 10 candles + elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 0: + # Calculate volume from tick data + volumes = universal_stream.btc_ticks[:, 5] # volume + return float(np.sum(volumes[-100:])) # Sum of last 100 ticks + return 0.0 + except Exception as e: + logger.warning(f"Error calculating volume for {symbol}: {e}") + return 0.0 + + def _calculate_trend_strength_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float: + """Calculate trend strength from universal data stream""" + try: + if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 20: + # Calculate trend strength using 20-period moving average + closes = universal_stream.eth_1m[-20:, 4] # last 20 closes + if len(closes) >= 20: + sma = np.mean(closes) + current_price = closes[-1] + return float((current_price - sma) / sma) # Relative trend strength + elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 100: + # Calculate trend from tick data + closes = universal_stream.btc_ticks[-100:, 4] # last 100 ticks + if len(closes) >= 100: + start_price = closes[0] + end_price = closes[-1] + return float((end_price - start_price) / start_price) + return 0.0 + except Exception as e: + logger.warning(f"Error calculating trend strength for {symbol}: {e}") + return 0.0 + + def _determine_market_regime(self, symbol: str, universal_stream: UniversalDataStream) -> str: + """Determine market regime from universal data stream""" + try: + # Calculate volatility and trend strength + volatility = self._calculate_volatility_from_universal(symbol, universal_stream) + trend_strength = abs(self._calculate_trend_strength_from_universal(symbol, universal_stream)) + + # Classify market regime + if volatility > 0.05: # High volatility threshold + return 'volatile' + elif trend_strength > 0.02: # Strong trend threshold + return 'trending' + else: + return 'ranging' + + except Exception as e: + logger.warning(f"Error determining market regime for {symbol}: {e}") + return 'unknown' + def _extract_cnn_features(self, model, feature_matrix: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Extract hidden features and predictions from CNN model""" try: @@ -2535,4 +2747,45 @@ class EnhancedTradingOrchestrator: 'positions': {sym: pos for sym, pos in self.current_positions.items()}, 'total_positions': len(self.current_positions), 'last_signals': self.last_signals - } \ No newline at end of file + } + + def _on_cob_cnn_features(self, symbol: str, cob_data: Dict): + """Handle COB features for CNN model integration""" + try: + if 'features' in cob_data: + features = cob_data['features'] + self.latest_cob_features[symbol] = features + self.cob_feature_history[symbol].append({ + 'timestamp': cob_data.get('timestamp', datetime.now()), + 'features': features + }) + logger.debug(f"COB CNN features updated for {symbol}: {features.shape}") + except Exception as e: + logger.error(f"Error processing COB CNN features for {symbol}: {e}") + + def _on_cob_dqn_state(self, symbol: str, cob_data: Dict): + """Handle COB state features for DQN model integration""" + try: + if 'state' in cob_data: + state = cob_data['state'] + self.latest_cob_state[symbol] = state + logger.debug(f"COB DQN state updated for {symbol}: {state.shape}") + except Exception as e: + logger.error(f"Error processing COB DQN state for {symbol}: {e}") + + async def start_cob_integration(self): + """Start COB integration for real-time data feed""" + try: + logger.info("Starting COB integration for real-time market microstructure...") + await self.cob_integration.start() + logger.info("COB integration started successfully") + except Exception as e: + logger.error(f"Error starting COB integration: {e}") + + async def stop_cob_integration(self): + """Stop COB integration""" + try: + await self.cob_integration.stop() + logger.info("COB integration stopped") + except Exception as e: + logger.error(f"Error stopping COB integration: {e}") \ No newline at end of file diff --git a/main.py b/main.py index 91ffecb..8efb1cc 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 """ -Streamlined Trading System - Web Dashboard Only +Streamlined Trading System - Web Dashboard + Training -Simplified entry point with only the web dashboard mode: -- Streamlined Flow: Data -> Indicators/Pivots -> CNN -> RL -> Orchestrator -> Execution +Integrated system with both training loop and web dashboard: +- Training Pipeline: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution +- Web Dashboard: Real-time monitoring and control interface - 2-Action System: BUY/SELL with intelligent position management - Always invested approach with smart risk/reward setup detection @@ -11,6 +12,11 @@ Usage: python main.py [--symbol ETH/USDT] [--port 8050] """ +import os +# Fix OpenMP library conflicts before importing other modules +os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' +os.environ['OMP_NUM_THREADS'] = '4' + import asyncio import argparse import logging @@ -28,7 +34,7 @@ from core.data_provider import DataProvider logger = logging.getLogger(__name__) -def run_web_dashboard(): +async def run_web_dashboard(): """Run the streamlined web dashboard with 2-action system and always-invested approach""" try: logger.info("Starting Streamlined Trading Dashboard...") @@ -60,9 +66,9 @@ def run_web_dashboard(): # Load model registry for integrated pipeline try: - from core.model_registry import get_model_registry - model_registry = get_model_registry() - logger.info("[MODELS] Model registry loaded for integrated training") + from models import get_model_registry + model_registry = {} # Use simple dict for now + logger.info("[MODELS] Model registry initialized for training") except ImportError: model_registry = {} logger.warning("Model registry not available, using empty registry") @@ -77,56 +83,139 @@ def run_web_dashboard(): logger.info("Enhanced Trading Orchestrator with 2-Action System initialized") logger.info("Always Invested: Learning to spot high risk/reward setups") + # Start COB integration for real-time market microstructure + try: + # Create and start COB integration task + cob_task = asyncio.create_task(orchestrator.start_cob_integration()) + logger.info("COB Integration startup task created") + except Exception as e: + logger.warning(f"COB Integration startup failed (will retry): {e}") + # Create trading executor for live execution trading_executor = TradingExecutor() - # Import and create streamlined dashboard - from web.dashboard import TradingDashboard - dashboard = TradingDashboard( - data_provider=data_provider, - orchestrator=orchestrator, - trading_executor=trading_executor - ) - - # Start the integrated dashboard - port = config.get('web', {}).get('port', 8050) - host = config.get('web', {}).get('host', '127.0.0.1') - - logger.info(f"Starting Streamlined Dashboard at http://{host}:{port}") + # Start the training and monitoring loop + logger.info(f"Starting Enhanced Training Pipeline") logger.info("Live Data Processing: ENABLED") + logger.info("COB Integration: ENABLED (Real-time market microstructure)") logger.info("Integrated CNN Training: ENABLED") logger.info("Integrated RL Training: ENABLED") logger.info("Real-time Indicators & Pivots: ENABLED") logger.info("Live Trading Execution: ENABLED") logger.info("2-Action System: BUY/SELL with position intelligence") logger.info("Always Invested: Different thresholds for entry/exit") - logger.info("Pipeline: Data -> Indicators -> CNN -> RL -> Orchestrator -> Execution") - logger.info(f"Dashboard optimized: 300ms updates for sub-1s responsiveness") + logger.info("Pipeline: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution") + logger.info("Starting training loop...") - dashboard.run(host=host, port=port, debug=False) + # Start the training loop + await start_training_loop(orchestrator, trading_executor) except Exception as e: logger.error(f"Error in streamlined dashboard: {e}") - logger.error("Dashboard stopped - trying minimal fallback") + logger.error("Training stopped") + import traceback + logger.error(traceback.format_exc()) + +def start_web_ui(): + """Start the main TradingDashboard UI in a separate thread""" + try: + logger.info("=" * 50) + logger.info("Starting Main Trading Dashboard UI...") + logger.info("Trading Dashboard: http://127.0.0.1:8051") + logger.info("=" * 50) - try: - # Minimal fallback dashboard - from web.dashboard import TradingDashboard - from core.data_provider import DataProvider + # Import and create the main TradingDashboard (simplified approach) + from web.dashboard import TradingDashboard + from core.data_provider import DataProvider + from core.orchestrator import TradingOrchestrator + from core.trading_executor import TradingExecutor + + # Initialize components for the dashboard + config = get_config() + data_provider = DataProvider() + + # Create orchestrator for the dashboard (standard version for UI compatibility) + dashboard_orchestrator = TradingOrchestrator(data_provider=data_provider) + + trading_executor = TradingExecutor() + + # Create the main trading dashboard + dashboard = TradingDashboard( + data_provider=data_provider, + orchestrator=dashboard_orchestrator, + trading_executor=trading_executor + ) + + logger.info("Main TradingDashboard created successfully") + logger.info("Features: Live trading, RL training monitoring, Position management") + + # Run the dashboard server (simplified - no async loop) + dashboard.app.run(host='127.0.0.1', port=8051, debug=False, use_reloader=False) + + except Exception as e: + logger.error(f"Error starting main trading dashboard UI: {e}") + import traceback + logger.error(traceback.format_exc()) + +async def start_training_loop(orchestrator, trading_executor): + """Start the main training and monitoring loop""" + logger.info("=" * 70) + logger.info("STARTING ENHANCED TRAINING LOOP WITH COB INTEGRATION") + logger.info("=" * 70) + + try: + # Start real-time processing + await orchestrator.start_realtime_processing() + + # Main training loop + iteration = 0 + while True: + iteration += 1 - data_provider = DataProvider() - dashboard = TradingDashboard(data_provider) - logger.info("Using minimal fallback dashboard") - dashboard.run(host='127.0.0.1', port=8050, debug=False) - except Exception as fallback_error: - logger.error(f"Fallback dashboard failed: {fallback_error}") - logger.error(f"Fatal error: {e}") - import traceback - logger.error(traceback.format_exc()) + logger.info(f"Training iteration {iteration}") + + # Make coordinated decisions (this triggers CNN and RL training) + decisions = await orchestrator.make_coordinated_decisions() + + # Log decisions and performance + for symbol, decision in decisions.items(): + if decision: + logger.info(f"{symbol}: {decision.action} (confidence: {decision.confidence:.3f})") + + # Execute if confidence is high enough + if decision.confidence > 0.7: + logger.info(f"Executing {symbol}: {decision.action}") + # trading_executor.execute_action(decision) + + # Log performance metrics every 10 iterations + if iteration % 10 == 0: + metrics = orchestrator.get_performance_metrics() + logger.info(f"Performance metrics: {metrics}") + + # Log COB integration status + for symbol in orchestrator.symbols: + cob_features = orchestrator.latest_cob_features.get(symbol) + cob_state = orchestrator.latest_cob_state.get(symbol) + if cob_features is not None: + logger.info(f"{symbol} COB: CNN features {cob_features.shape}, DQN state {cob_state.shape if cob_state is not None else 'None'}") + + # Sleep between iterations + await asyncio.sleep(5) # 5 second intervals + + except KeyboardInterrupt: + logger.info("Training interrupted by user") + except Exception as e: + logger.error(f"Error in training loop: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + await orchestrator.stop_realtime_processing() + await orchestrator.stop_cob_integration() + logger.info("Training loop stopped") async def main(): - """Main entry point with streamlined web-only operation""" - parser = argparse.ArgumentParser(description='Streamlined Trading System - 2-Action Web Dashboard') + """Main entry point with both training loop and web dashboard""" + parser = argparse.ArgumentParser(description='Streamlined Trading System - Training + Web Dashboard') parser.add_argument('--symbol', type=str, default='ETH/USDT', help='Primary trading symbol (default: ETH/USDT)') parser.add_argument('--port', type=int, default=8050, @@ -141,16 +230,26 @@ async def main(): try: logger.info("=" * 70) - logger.info("STREAMLINED TRADING SYSTEM - 2-ACTION WEB DASHBOARD") + logger.info("STREAMLINED TRADING SYSTEM - TRAINING + MAIN DASHBOARD") logger.info(f"Primary Symbol: {args.symbol}") - logger.info(f"Web Port: {args.port}") + logger.info(f"Training Port: {args.port}") + logger.info(f"Main Trading Dashboard: http://127.0.0.1:8051") logger.info("2-Action System: BUY/SELL with intelligent position management") logger.info("Always Invested: Learning to spot high risk/reward setups") - logger.info("Flow: Data -> Indicators -> CNN -> RL -> Orchestrator -> Execution") + logger.info("Flow: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution") + logger.info("Main Dashboard: Live trading, RL monitoring, Position management") logger.info("=" * 70) - # Run the web dashboard - run_web_dashboard() + # Start main trading dashboard UI in a separate thread + web_thread = Thread(target=start_web_ui, daemon=True) + web_thread.start() + logger.info("Main trading dashboard UI thread started") + + # Give web UI time to start + await asyncio.sleep(2) + + # Run the training loop (this will run indefinitely) + await run_web_dashboard() logger.info("[SUCCESS] Operation completed successfully!") diff --git a/run_enhanced_cob_training.py b/run_enhanced_cob_training.py new file mode 100644 index 0000000..1a58f79 --- /dev/null +++ b/run_enhanced_cob_training.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +""" +Enhanced COB + ML Training Pipeline + +Runs the complete pipeline: +Data -> COB Integration -> CNN Features -> RL States -> Model Training -> Trading Decisions + +Real-time training with COB market microstructure integration. +""" + +import asyncio +import logging +import sys +from pathlib import Path +import time +from datetime import datetime + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from core.config import setup_logging, get_config +from core.data_provider import DataProvider +from core.enhanced_orchestrator import EnhancedTradingOrchestrator +from core.trading_executor import TradingExecutor + +# Setup logging +setup_logging() +logger = logging.getLogger(__name__) + +class EnhancedCOBTrainer: + """Enhanced COB + ML Training Pipeline""" + + def __init__(self): + self.config = get_config() + self.symbols = ['BTC/USDT', 'ETH/USDT'] + self.data_provider = DataProvider() + self.orchestrator = None + self.trading_executor = None + self.running = False + + async def start_training(self): + """Start the enhanced training pipeline""" + logger.info("=" * 80) + logger.info("ENHANCED COB + ML TRAINING PIPELINE") + logger.info("=" * 80) + logger.info("Pipeline: Data -> COB -> CNN Features -> RL States -> Model Training") + logger.info(f"Symbols: {self.symbols}") + logger.info(f"Start time: {datetime.now()}") + logger.info("=" * 80) + + try: + # Initialize components + await self._initialize_components() + + # Start training loop + await self._run_training_loop() + + except KeyboardInterrupt: + logger.info("Training interrupted by user") + except Exception as e: + logger.error(f"Training error: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + await self._cleanup() + + async def _initialize_components(self): + """Initialize all training components""" + logger.info("1. Initializing Enhanced Trading Orchestrator...") + + self.orchestrator = EnhancedTradingOrchestrator( + data_provider=self.data_provider, + symbols=self.symbols, + enhanced_rl_training=True, + model_registry={} + ) + + logger.info("2. Starting COB Integration...") + await self.orchestrator.start_cob_integration() + + logger.info("3. Starting Real-time Processing...") + await self.orchestrator.start_realtime_processing() + + logger.info("4. Initializing Trading Executor...") + self.trading_executor = TradingExecutor() + + logger.info("āœ… All components initialized successfully") + + # Wait for initial data collection + logger.info("Collecting initial data...") + await asyncio.sleep(10) + + async def _run_training_loop(self): + """Main training loop with monitoring""" + logger.info("Starting main training loop...") + self.running = True + iteration = 0 + + while self.running: + iteration += 1 + start_time = time.time() + + try: + # Make coordinated decisions (triggers CNN and RL training) + decisions = await self.orchestrator.make_coordinated_decisions() + + # Process decisions + active_decisions = 0 + for symbol, decision in decisions.items(): + if decision and decision.action != 'HOLD': + active_decisions += 1 + logger.info(f"šŸŽÆ {symbol}: {decision.action} " + f"(confidence: {decision.confidence:.3f})") + + # Monitor every 5 iterations + if iteration % 5 == 0: + await self._log_training_status(iteration, active_decisions) + + # Detailed monitoring every 20 iterations + if iteration % 20 == 0: + await self._detailed_monitoring(iteration) + + # Sleep to maintain 5-second intervals + elapsed = time.time() - start_time + sleep_time = max(0, 5.0 - elapsed) + await asyncio.sleep(sleep_time) + + except Exception as e: + logger.error(f"Error in training iteration {iteration}: {e}") + await asyncio.sleep(5) + + async def _log_training_status(self, iteration, active_decisions): + """Log current training status""" + logger.info(f"šŸ“Š Iteration {iteration} - Active decisions: {active_decisions}") + + # Log COB integration status + for symbol in self.symbols: + cob_features = self.orchestrator.latest_cob_features.get(symbol) + cob_state = self.orchestrator.latest_cob_state.get(symbol) + + if cob_features is not None: + logger.info(f" {symbol}: COB CNN features: {cob_features.shape}") + if cob_state is not None: + logger.info(f" {symbol}: COB RL state: {cob_state.shape}") + + async def _detailed_monitoring(self, iteration): + """Detailed monitoring and metrics""" + logger.info("=" * 60) + logger.info(f"DETAILED MONITORING - Iteration {iteration}") + logger.info("=" * 60) + + # Performance metrics + try: + metrics = self.orchestrator.get_performance_metrics() + logger.info(f"šŸ“ˆ Performance Metrics:") + for key, value in metrics.items(): + logger.info(f" {key}: {value}") + except Exception as e: + logger.warning(f"Could not get performance metrics: {e}") + + # COB integration status + logger.info("šŸ”„ COB Integration Status:") + for symbol in self.symbols: + try: + # Check COB features + cob_features = self.orchestrator.latest_cob_features.get(symbol) + cob_state = self.orchestrator.latest_cob_state.get(symbol) + history_len = len(self.orchestrator.cob_feature_history[symbol]) + + logger.info(f" {symbol}:") + logger.info(f" CNN Features: {cob_features.shape if cob_features is not None else 'None'}") + logger.info(f" RL State: {cob_state.shape if cob_state is not None else 'None'}") + logger.info(f" History Length: {history_len}") + + # Get COB snapshot if available + if self.orchestrator.cob_integration: + snapshot = self.orchestrator.cob_integration.get_cob_snapshot(symbol) + if snapshot: + logger.info(f" Order Book: {len(snapshot.consolidated_bids)} bids, " + f"{len(snapshot.consolidated_asks)} asks") + logger.info(f" Mid Price: ${snapshot.volume_weighted_mid:.2f}") + + except Exception as e: + logger.warning(f"Error checking {symbol} status: {e}") + + # Model training status + logger.info("🧠 Model Training Status:") + # Add model-specific status here when available + + # Position status + try: + positions = self.orchestrator.get_position_status() + logger.info(f"šŸ’¼ Positions: {positions}") + except Exception as e: + logger.warning(f"Could not get position status: {e}") + + logger.info("=" * 60) + + async def _cleanup(self): + """Cleanup resources""" + logger.info("Cleaning up resources...") + + if self.orchestrator: + try: + await self.orchestrator.stop_realtime_processing() + logger.info("āœ… Real-time processing stopped") + except Exception as e: + logger.warning(f"Error stopping real-time processing: {e}") + + try: + await self.orchestrator.stop_cob_integration() + logger.info("āœ… COB integration stopped") + except Exception as e: + logger.warning(f"Error stopping COB integration: {e}") + + self.running = False + logger.info("šŸ Training pipeline stopped") + +async def main(): + """Main entry point""" + trainer = EnhancedCOBTrainer() + await trainer.start_training() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nTraining interrupted by user") + except Exception as e: + print(f"Training failed: {e}") + import traceback + traceback.print_exc() \ No newline at end of file diff --git a/run_main_dashboard.py b/run_main_dashboard.py new file mode 100644 index 0000000..8b6b8d7 --- /dev/null +++ b/run_main_dashboard.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +""" +Run Main Trading Dashboard + +Dedicated script to run the main TradingDashboard with all trading controls, +RL training monitoring, and position management features. + +Usage: + python run_main_dashboard.py +""" + +import sys +import logging +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from core.config import setup_logging, get_config +from core.data_provider import DataProvider +from core.orchestrator import TradingOrchestrator +from core.trading_executor import TradingExecutor +from web.dashboard import TradingDashboard + +def main(): + """Run the main TradingDashboard""" + # Setup logging + setup_logging() + logger = logging.getLogger(__name__) + + try: + logger.info("=" * 60) + logger.info("STARTING MAIN TRADING DASHBOARD") + logger.info("=" * 60) + logger.info("Features:") + logger.info("- Live trading with BUY/SELL controls") + logger.info("- Real-time RL training monitoring") + logger.info("- Position management & P&L tracking") + logger.info("- Performance metrics & trade history") + logger.info("- Model accuracy & confidence tracking") + logger.info("=" * 60) + + # Get configuration + config = get_config() + + # Initialize components + data_provider = DataProvider() + orchestrator = TradingOrchestrator(data_provider=data_provider) + trading_executor = TradingExecutor() + + # Create the main trading dashboard + dashboard = TradingDashboard( + data_provider=data_provider, + orchestrator=orchestrator, + trading_executor=trading_executor + ) + + logger.info("TradingDashboard created successfully") + logger.info("Starting web server at http://127.0.0.1:8051") + logger.info("Open your browser to access the trading interface") + + # Run the dashboard + dashboard.app.run( + host='127.0.0.1', + port=8051, + debug=False, + use_reloader=False + ) + + except KeyboardInterrupt: + logger.info("Dashboard shutdown requested by user") + except Exception as e: + logger.error(f"Error running main trading dashboard: {e}") + import traceback + logger.error(traceback.format_exc()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_dashboard_simple.py b/test_dashboard_simple.py index e1fd580..d8e4715 100644 --- a/test_dashboard_simple.py +++ b/test_dashboard_simple.py @@ -1,55 +1,103 @@ #!/usr/bin/env python3 """ -Simple test for the scalping dashboard with dynamic throttling +Simple Dashboard Test - Isolate dashboard startup issues """ -import requests -import time -def test_dashboard(): - """Test dashboard basic functionality""" - base_url = "http://127.0.0.1:8051" - - print("Testing Scalping Dashboard with Dynamic Throttling...") - +import os +# Fix OpenMP library conflicts before importing other modules +os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' +os.environ['OMP_NUM_THREADS'] = '4' + +import sys +import logging +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +# Setup basic logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def test_dashboard_startup(): + """Test dashboard creation and startup""" try: - # Test main page - response = requests.get(base_url, timeout=5) - print(f"Main page: {response.status_code}") + logger.info("=" * 50) + logger.info("TESTING DASHBOARD STARTUP") + logger.info("=" * 50) + + # Test imports first + logger.info("Step 1: Testing imports...") + from core.config import get_config, setup_logging + from core.data_provider import DataProvider + from core.orchestrator import TradingOrchestrator + from core.trading_executor import TradingExecutor + logger.info("āœ“ Core imports successful") + + from web.dashboard import TradingDashboard + logger.info("āœ“ Dashboard import successful") + + # Test configuration + logger.info("Step 2: Testing configuration...") + setup_logging() + config = get_config() + logger.info("āœ“ Configuration loaded") + + # Test core component creation + logger.info("Step 3: Testing core component creation...") + data_provider = DataProvider() + logger.info("āœ“ DataProvider created") + + orchestrator = TradingOrchestrator(data_provider=data_provider) + logger.info("āœ“ TradingOrchestrator created") + + trading_executor = TradingExecutor() + logger.info("āœ“ TradingExecutor created") + + # Test dashboard creation + logger.info("Step 4: Testing dashboard creation...") + dashboard = TradingDashboard( + data_provider=data_provider, + orchestrator=orchestrator, + trading_executor=trading_executor + ) + logger.info("āœ“ TradingDashboard created successfully") + + # Test dashboard startup + logger.info("Step 5: Testing dashboard server startup...") + logger.info("Dashboard will start on http://127.0.0.1:8052") + logger.info("Press Ctrl+C to stop the test") + + # Run the dashboard + dashboard.app.run( + host='127.0.0.1', + port=8052, + debug=False, + use_reloader=False + ) - if response.status_code == 200: - print("āœ… Dashboard is running successfully!") - print("āœ… Unicode encoding issues fixed") - print("āœ… Dynamic throttling implemented") - print("āœ… Charts should now display properly") - - print("\nDynamic Throttling Features:") - print("• Adaptive update frequency (500ms - 2000ms)") - print("• Performance-based throttling (0-5 levels)") - print("• Automatic optimization based on callback duration") - print("• Fallback to last known state when throttled") - print("• Real-time performance monitoring") - - return True - else: - print(f"āŒ Dashboard returned status {response.status_code}") - return False - - except requests.exceptions.ConnectionError: - print("āŒ Cannot connect to dashboard") - return False except Exception as e: - print(f"āŒ Error: {e}") + logger.error(f"āŒ Dashboard test failed: {e}") + import traceback + logger.error(traceback.format_exc()) return False + + return True if __name__ == "__main__": - success = test_dashboard() - if success: - print("\nšŸŽ‰ SCALPING DASHBOARD FIXED!") - print("The dashboard now has:") - print("1. Fixed Unicode encoding issues") - print("2. Proper Dash callback structure") - print("3. Dynamic throttling for optimal performance") - print("4. Adaptive update frequency") - print("5. Performance monitoring and optimization") - else: - print("\nāŒ Dashboard still has issues") \ No newline at end of file + try: + success = test_dashboard_startup() + if success: + logger.info("āœ“ Dashboard test completed successfully") + else: + logger.error("āŒ Dashboard test failed") + sys.exit(1) + except KeyboardInterrupt: + logger.info("Dashboard test interrupted by user") + except Exception as e: + logger.error(f"Fatal error in dashboard test: {e}") + sys.exit(1) \ No newline at end of file diff --git a/test_enhanced_cob_integration.py b/test_enhanced_cob_integration.py new file mode 100644 index 0000000..95cfb08 --- /dev/null +++ b/test_enhanced_cob_integration.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +""" +Test Enhanced COB Integration with RL and CNN Models + +This script tests the integration of Consolidated Order Book (COB) data +with the real-time RL and CNN training pipeline. +""" + +import asyncio +import logging +import sys +from pathlib import Path +import numpy as np +import time +from datetime import datetime + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from core.config import setup_logging +from core.data_provider import DataProvider +from core.enhanced_orchestrator import EnhancedTradingOrchestrator +from core.cob_integration import COBIntegration + +# Setup logging +setup_logging() +logger = logging.getLogger(__name__) + +class COBMLIntegrationTester: + """Test COB integration with ML models""" + + def __init__(self): + self.symbols = ['BTC/USDT', 'ETH/USDT'] + self.data_provider = DataProvider() + self.test_results = {} + + async def test_cob_ml_integration(self): + """Test full COB integration with ML pipeline""" + logger.info("=" * 60) + logger.info("TESTING COB INTEGRATION WITH RL AND CNN MODELS") + logger.info("=" * 60) + + try: + # Initialize enhanced orchestrator with COB integration + logger.info("1. Initializing Enhanced Trading Orchestrator with COB...") + orchestrator = EnhancedTradingOrchestrator( + data_provider=self.data_provider, + symbols=self.symbols, + enhanced_rl_training=True, + model_registry={} + ) + + # Start COB integration + logger.info("2. Starting COB Integration...") + await orchestrator.start_cob_integration() + await asyncio.sleep(5) # Allow startup and data collection + + # Test COB feature generation + logger.info("3. Testing COB feature generation...") + await self._test_cob_features(orchestrator) + + # Test market state with COB data + logger.info("4. Testing market state with COB data...") + await self._test_market_state_cob(orchestrator) + + # Test real-time COB callbacks + logger.info("5. Testing real-time COB callbacks...") + await self._test_realtime_callbacks(orchestrator) + + # Stop COB integration + await orchestrator.stop_cob_integration() + + # Print results + self._print_test_results() + + except Exception as e: + logger.error(f"Error in COB ML integration test: {e}") + import traceback + logger.error(traceback.format_exc()) + + async def _test_cob_features(self, orchestrator): + """Test COB feature availability""" + try: + for symbol in self.symbols: + # Check if COB features are available + cob_features = orchestrator.latest_cob_features.get(symbol) + cob_state = orchestrator.latest_cob_state.get(symbol) + + if cob_features is not None: + logger.info(f"āœ… {symbol}: COB CNN features available - shape: {cob_features.shape}") + self.test_results[f'{symbol}_cob_cnn_features'] = True + else: + logger.warning(f"āš ļø {symbol}: COB CNN features not available") + self.test_results[f'{symbol}_cob_cnn_features'] = False + + if cob_state is not None: + logger.info(f"āœ… {symbol}: COB DQN state available - shape: {cob_state.shape}") + self.test_results[f'{symbol}_cob_dqn_state'] = True + else: + logger.warning(f"āš ļø {symbol}: COB DQN state not available") + self.test_results[f'{symbol}_cob_dqn_state'] = False + + except Exception as e: + logger.error(f"Error testing COB features: {e}") + + async def _test_market_state_cob(self, orchestrator): + """Test market state includes COB data""" + try: + # Generate market states with COB data + from core.universal_data_adapter import UniversalDataAdapter + adapter = UniversalDataAdapter(self.data_provider) + universal_stream = await adapter.get_universal_stream(['BTC/USDT', 'ETH/USDT']) + + market_states = await orchestrator._get_all_market_states_universal(universal_stream) + + for symbol in self.symbols: + if symbol in market_states: + state = market_states[symbol] + + # Check COB integration in market state + tests = [ + ('cob_features', state.cob_features is not None), + ('cob_state', state.cob_state is not None), + ('order_book_imbalance', hasattr(state, 'order_book_imbalance')), + ('liquidity_depth', hasattr(state, 'liquidity_depth')), + ('exchange_diversity', hasattr(state, 'exchange_diversity')), + ('market_impact_estimate', hasattr(state, 'market_impact_estimate')) + ] + + for test_name, passed in tests: + status = "āœ…" if passed else "āŒ" + logger.info(f"{status} {symbol}: {test_name} - {passed}") + self.test_results[f'{symbol}_market_state_{test_name}'] = passed + + # Log COB metrics if available + if hasattr(state, 'order_book_imbalance'): + logger.info(f"šŸ“Š {symbol} COB Metrics:") + logger.info(f" Order Book Imbalance: {state.order_book_imbalance:.4f}") + logger.info(f" Liquidity Depth: ${state.liquidity_depth:,.0f}") + logger.info(f" Exchange Diversity: {state.exchange_diversity}") + logger.info(f" Market Impact (10k): {state.market_impact_estimate:.4f}%") + + except Exception as e: + logger.error(f"Error testing market state COB: {e}") + + async def _test_realtime_callbacks(self, orchestrator): + """Test real-time COB callbacks""" + try: + # Monitor COB callbacks for 10 seconds + initial_features = {s: len(orchestrator.cob_feature_history[s]) for s in self.symbols} + + logger.info("Monitoring COB callbacks for 10 seconds...") + await asyncio.sleep(10) + + final_features = {s: len(orchestrator.cob_feature_history[s]) for s in self.symbols} + + for symbol in self.symbols: + updates = final_features[symbol] - initial_features[symbol] + if updates > 0: + logger.info(f"āœ… {symbol}: Received {updates} COB feature updates") + self.test_results[f'{symbol}_realtime_callbacks'] = True + else: + logger.warning(f"āš ļø {symbol}: No COB feature updates received") + self.test_results[f'{symbol}_realtime_callbacks'] = False + + except Exception as e: + logger.error(f"Error testing realtime callbacks: {e}") + + def _print_test_results(self): + """Print comprehensive test results""" + logger.info("=" * 60) + logger.info("COB ML INTEGRATION TEST RESULTS") + logger.info("=" * 60) + + passed = sum(1 for result in self.test_results.values() if result) + total = len(self.test_results) + + logger.info(f"Overall: {passed}/{total} tests passed ({passed/total*100:.1f}%)") + logger.info("") + + for test_name, result in self.test_results.items(): + status = "āœ… PASS" if result else "āŒ FAIL" + logger.info(f"{status}: {test_name}") + + logger.info("=" * 60) + + if passed == total: + logger.info("šŸŽ‰ ALL TESTS PASSED - COB ML INTEGRATION WORKING!") + elif passed > total * 0.8: + logger.info("āš ļø MOSTLY WORKING - Some minor issues detected") + else: + logger.warning("🚨 INTEGRATION ISSUES - Significant problems detected") + +async def main(): + """Run COB ML integration tests""" + tester = COBMLIntegrationTester() + await tester.test_cob_ml_integration() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file