""" Live Pivot Trainer - Automatic Training on L2 Pivot Points This module monitors live 1s and 1m charts for L2 pivot points (peaks/troughs) and automatically creates training samples when they occur. Integrates with: - Williams Market Structure for pivot detection - Real Training Adapter for model training - Data Provider for live market data """ import logging import threading import time from typing import Dict, List, Optional, Tuple from datetime import datetime, timezone from collections import deque logger = logging.getLogger(__name__) class LivePivotTrainer: """ Monitors live charts for L2 pivots and automatically trains models Features: - Detects L2 pivot points on 1s and 1m timeframes - Creates training samples automatically - Trains models in background without blocking inference - Tracks training history to avoid duplicate training """ def __init__(self, orchestrator, data_provider, training_adapter): """ Initialize Live Pivot Trainer Args: orchestrator: TradingOrchestrator instance data_provider: DataProvider for market data training_adapter: RealTrainingAdapter for training """ self.orchestrator = orchestrator self.data_provider = data_provider self.training_adapter = training_adapter # Tracking self.running = False self.trained_pivots = deque(maxlen=1000) # Track last 1000 trained pivots self.pivot_history = { '1s': deque(maxlen=100), '1m': deque(maxlen=100) } # Configuration self.check_interval = 5 # Check for new pivots every 5 seconds self.min_pivot_spacing = 60 # Minimum 60 seconds between training on same timeframe self.last_training_time = { '1s': 0, '1m': 0 } # Williams Market Structure for pivot detection try: from core.williams_market_structure import WilliamsMarketStructure self.williams_1s = WilliamsMarketStructure(num_levels=5) self.williams_1m = WilliamsMarketStructure(num_levels=5) logger.info("Williams Market Structure initialized for pivot detection") except Exception as e: logger.error(f"Failed to initialize Williams Market Structure: {e}") self.williams_1s = None self.williams_1m = None logger.info("LivePivotTrainer initialized") def start(self, symbol: str = 'ETH/USDT'): """Start monitoring for L2 pivots""" if self.running: logger.warning("LivePivotTrainer already running") return self.running = True self.symbol = symbol # Start monitoring thread thread = threading.Thread( target=self._monitoring_loop, args=(symbol,), daemon=True ) thread.start() logger.info(f"LivePivotTrainer started for {symbol}") def stop(self): """Stop monitoring""" self.running = False logger.info("LivePivotTrainer stopped") def _monitoring_loop(self, symbol: str): """Main monitoring loop - checks for new L2 pivots""" logger.info(f"LivePivotTrainer monitoring loop started for {symbol}") while self.running: try: # Check 1s timeframe self._check_timeframe_for_pivots(symbol, '1s') # Check 1m timeframe self._check_timeframe_for_pivots(symbol, '1m') # Sleep before next check time.sleep(self.check_interval) except Exception as e: logger.error(f"Error in LivePivotTrainer monitoring loop: {e}") time.sleep(10) # Wait longer on error def _check_timeframe_for_pivots(self, symbol: str, timeframe: str): """ Check a specific timeframe for new L2 pivots Args: symbol: Trading symbol timeframe: '1s' or '1m' """ try: # Rate limiting - don't train too frequently on same timeframe current_time = time.time() if current_time - self.last_training_time[timeframe] < self.min_pivot_spacing: return # Get recent candles candles = self.data_provider.get_historical_data( symbol=symbol, timeframe=timeframe, limit=200 # Need enough candles to detect pivots ) if candles is None or candles.empty: logger.debug(f"No candles available for {symbol} {timeframe}") return # Detect pivots using Williams Market Structure williams = self.williams_1s if timeframe == '1s' else self.williams_1m if williams is None: return pivots = williams.calculate_pivots(candles) if not pivots or 'L2' not in pivots: return l2_pivots = pivots['L2'] # Check for new L2 pivots (not in history) new_pivots = [] for pivot in l2_pivots: pivot_id = f"{symbol}_{timeframe}_{pivot['timestamp']}_{pivot['type']}" if pivot_id not in self.trained_pivots: new_pivots.append(pivot) self.trained_pivots.append(pivot_id) if new_pivots: logger.info(f"Found {len(new_pivots)} new L2 pivots on {symbol} {timeframe}") # Train on the most recent pivot latest_pivot = new_pivots[-1] self._train_on_pivot(symbol, timeframe, latest_pivot, candles) self.last_training_time[timeframe] = current_time except Exception as e: logger.error(f"Error checking {timeframe} for pivots: {e}") def _train_on_pivot(self, symbol: str, timeframe: str, pivot: Dict, candles): """ Create training sample from pivot and train model Args: symbol: Trading symbol timeframe: Timeframe of pivot pivot: Pivot point data candles: DataFrame with OHLCV data """ try: logger.info(f"Training on L2 {pivot['type']} pivot @ {pivot['price']} on {symbol} {timeframe}") # Determine trade direction based on pivot type if pivot['type'] == 'high': # High pivot = potential SHORT entry direction = 'SHORT' action = 'SELL' else: # Low pivot = potential LONG entry direction = 'LONG' action = 'BUY' # Create training sample training_sample = { 'test_case_id': f"live_pivot_{symbol}_{timeframe}_{pivot['timestamp']}", 'symbol': symbol, 'timestamp': pivot['timestamp'], 'action': action, 'expected_outcome': { 'direction': direction, 'entry_price': pivot['price'], 'exit_price': None, # Will be determined by model 'profit_loss_pct': 0.0, # Unknown yet 'holding_period_seconds': 300 # 5 minutes default }, 'training_config': { 'timeframes': ['1s', '1m', '1h', '1d'], 'candles_per_timeframe': 200 }, 'annotation_metadata': { 'source': 'live_pivot_detection', 'pivot_level': 'L2', 'pivot_type': pivot['type'], 'confidence': pivot.get('strength', 1.0) } } # Train model in background (non-blocking) thread = threading.Thread( target=self._background_training, args=(training_sample,), daemon=True ) thread.start() logger.info(f"Started background training on L2 pivot") except Exception as e: logger.error(f"Error training on pivot: {e}") def _background_training(self, training_sample: Dict): """ Execute training in background thread Args: training_sample: Training sample data """ try: # Use Transformer model for live pivot training model_name = 'Transformer' logger.info(f"Background training started for {training_sample['test_case_id']}") # Start training session training_id = self.training_adapter.start_training( model_name=model_name, test_cases=[training_sample] ) logger.info(f"Live pivot training session started: {training_id}") # Monitor training (optional - could poll status) # For now, just fire and forget except Exception as e: logger.error(f"Error in background training: {e}") def get_stats(self) -> Dict: """Get training statistics""" return { 'running': self.running, 'total_trained_pivots': len(self.trained_pivots), 'last_training_1s': self.last_training_time.get('1s', 0), 'last_training_1m': self.last_training_time.get('1m', 0), 'pivot_history_1s': len(self.pivot_history['1s']), 'pivot_history_1m': len(self.pivot_history['1m']) } # Global instance _live_pivot_trainer = None def get_live_pivot_trainer(orchestrator=None, data_provider=None, training_adapter=None): """Get or create global LivePivotTrainer instance""" global _live_pivot_trainer if _live_pivot_trainer is None and all([orchestrator, data_provider, training_adapter]): _live_pivot_trainer = LivePivotTrainer(orchestrator, data_provider, training_adapter) return _live_pivot_trainer