""" Live Pivot Trainer - Automatic Training on L2 Pivot Points This module monitors live 1s and 1m charts for L2 pivot points (peaks/troughs) and automatically creates training samples when they occur. Integrates with: - Williams Market Structure for pivot detection - Real Training Adapter for model training - Data Provider for live market data """ import logging import threading import time from typing import Dict, List, Optional, Tuple from datetime import datetime, timezone from collections import deque import numpy as np import pandas as pd logger = logging.getLogger(__name__) class LivePivotTrainer: """ Monitors live charts for L2 pivots and automatically trains models Features: - Detects L2 pivot points on 1s and 1m timeframes - Creates training samples automatically - Trains models in background without blocking inference - Tracks training history to avoid duplicate training """ def __init__(self, orchestrator, data_provider, training_adapter): """ Initialize Live Pivot Trainer Args: orchestrator: TradingOrchestrator instance data_provider: DataProvider for market data training_adapter: RealTrainingAdapter for training """ self.orchestrator = orchestrator self.data_provider = data_provider self.training_adapter = training_adapter # Tracking self.running = False self.trained_pivots = deque(maxlen=1000) # Track last 1000 trained pivots self.pivot_history = { '1s': deque(maxlen=100), '1m': deque(maxlen=100) } # Configuration self.check_interval = 5 # Check for new pivots every 5 seconds self.min_pivot_spacing = 60 # Minimum 60 seconds between training on same timeframe self.last_training_time = { '1s': 0, '1m': 0 } # Williams Market Structure for pivot detection try: from core.williams_market_structure import WilliamsMarketStructure self.williams_1s = WilliamsMarketStructure(num_levels=5) self.williams_1m = WilliamsMarketStructure(num_levels=5) logger.info("Williams Market Structure initialized for pivot detection") except Exception as e: logger.error(f"Failed to initialize Williams Market Structure: {e}") self.williams_1s = None self.williams_1m = None logger.info("LivePivotTrainer initialized") def start(self, symbol: str = 'ETH/USDT'): """Start monitoring for L2 pivots""" if self.running: logger.warning("LivePivotTrainer already running") return self.running = True self.symbol = symbol # Start monitoring thread thread = threading.Thread( target=self._monitoring_loop, args=(symbol,), daemon=True ) thread.start() logger.info(f"LivePivotTrainer started for {symbol}") def stop(self): """Stop monitoring""" self.running = False logger.info("LivePivotTrainer stopped") def _monitoring_loop(self, symbol: str): """Main monitoring loop - checks for new L2 pivots""" logger.info(f"LivePivotTrainer monitoring loop started for {symbol}") while self.running: try: # Check 1s timeframe self._check_timeframe_for_pivots(symbol, '1s') # Check 1m timeframe self._check_timeframe_for_pivots(symbol, '1m') # Sleep before next check time.sleep(self.check_interval) except Exception as e: logger.error(f"Error in LivePivotTrainer monitoring loop: {e}") time.sleep(10) # Wait longer on error def _check_timeframe_for_pivots(self, symbol: str, timeframe: str): """ Check a specific timeframe for new L2 pivots Args: symbol: Trading symbol timeframe: '1s' or '1m' """ try: # Rate limiting - don't train too frequently on same timeframe current_time = time.time() if current_time - self.last_training_time[timeframe] < self.min_pivot_spacing: return # Get recent candles candles = self.data_provider.get_historical_data( symbol=symbol, timeframe=timeframe, limit=200 # Need enough candles to detect pivots ) if candles is None or candles.empty: logger.debug(f"No candles available for {symbol} {timeframe}") return # Detect pivots using Williams Market Structure williams = self.williams_1s if timeframe == '1s' else self.williams_1m if williams is None: return # Prepare data for Williams Market Structure # Convert DataFrame to numpy array format df = candles.copy() ohlcv_array = df[['open', 'high', 'low', 'close', 'volume']].copy() # Handle timestamp conversion based on index type if isinstance(df.index, pd.DatetimeIndex): # Convert ns to ms timestamps = df.index.astype(np.int64) // 10**6 else: # Assume it's already timestamp or handle accordingly timestamps = df.index ohlcv_array.insert(0, 'timestamp', timestamps) ohlcv_array = ohlcv_array.to_numpy() # Calculate pivots pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array) if not pivot_levels or 2 not in pivot_levels: return # Get Level 2 pivots l2_trend_level = pivot_levels[2] l2_pivots_objs = l2_trend_level.pivot_points if not l2_pivots_objs: return # Check for new L2 pivots (not in history) new_pivots = [] for p in l2_pivots_objs: # Convert pivot object to dict for compatibility pivot_dict = { 'timestamp': p.timestamp, # Keep as datetime object for compatibility 'price': p.price, 'type': p.pivot_type, 'strength': p.strength } pivot_id = f"{symbol}_{timeframe}_{pivot_dict['timestamp']}_{pivot_dict['type']}" if pivot_id not in self.trained_pivots: new_pivots.append(pivot_dict) self.trained_pivots.append(pivot_id) if new_pivots: logger.info(f"Found {len(new_pivots)} new L2 pivots on {symbol} {timeframe}") # Train on the most recent pivot latest_pivot = new_pivots[-1] self._train_on_pivot(symbol, timeframe, latest_pivot, candles) self.last_training_time[timeframe] = current_time except Exception as e: logger.error(f"Error checking {timeframe} for pivots: {e}") def _train_on_pivot(self, symbol: str, timeframe: str, pivot: Dict, candles): """ Create training sample from pivot and train model Args: symbol: Trading symbol timeframe: Timeframe of pivot pivot: Pivot point data candles: DataFrame with OHLCV data """ try: logger.info(f"Training on L2 {pivot['type']} pivot @ {pivot['price']} on {symbol} {timeframe}") # Determine trade direction based on pivot type if pivot['type'] == 'high': # High pivot = potential SHORT entry direction = 'SHORT' action = 'SELL' else: # Low pivot = potential LONG entry direction = 'LONG' action = 'BUY' # Create training sample training_sample = { 'test_case_id': f"live_pivot_{symbol}_{timeframe}_{pivot['timestamp']}", 'symbol': symbol, 'timestamp': pivot['timestamp'], 'action': action, 'expected_outcome': { 'direction': direction, 'entry_price': pivot['price'], 'exit_price': None, # Will be determined by model 'profit_loss_pct': 0.0, # Unknown yet 'holding_period_seconds': 300 # 5 minutes default }, 'training_config': { 'timeframes': ['1s', '1m', '1h', '1d'], 'candles_per_timeframe': 200 }, 'annotation_metadata': { 'source': 'live_pivot_detection', 'pivot_level': 'L2', 'pivot_type': pivot['type'], 'confidence': pivot.get('strength', 1.0) } } # Train model in background (non-blocking) thread = threading.Thread( target=self._background_training, args=(training_sample,), daemon=True ) thread.start() logger.info(f"Started background training on L2 pivot") except Exception as e: logger.error(f"Error training on pivot: {e}") def _background_training(self, training_sample: Dict): """ Execute training in background thread Args: training_sample: Training sample data """ try: # Use Transformer model for live pivot training model_name = 'Transformer' logger.info(f"Background training started for {training_sample['test_case_id']}") # Start training session training_id = self.training_adapter.start_training( model_name=model_name, test_cases=[training_sample] ) logger.info(f"Live pivot training session started: {training_id}") # Monitor training (optional - could poll status) # For now, just fire and forget except Exception as e: logger.error(f"Error in background training: {e}") def get_stats(self) -> Dict: """Get training statistics""" return { 'running': self.running, 'total_trained_pivots': len(self.trained_pivots), 'last_training_1s': self.last_training_time.get('1s', 0), 'last_training_1m': self.last_training_time.get('1m', 0), 'pivot_history_1s': len(self.pivot_history['1s']), 'pivot_history_1m': len(self.pivot_history['1m']) } # Global instance _live_pivot_trainer = None def get_live_pivot_trainer(orchestrator=None, data_provider=None, training_adapter=None): """Get or create global LivePivotTrainer instance""" global _live_pivot_trainer if _live_pivot_trainer is None and all([orchestrator, data_provider, training_adapter]): _live_pivot_trainer = LivePivotTrainer(orchestrator, data_provider, training_adapter) return _live_pivot_trainer