Files
gogo2/ANNOTATE/core/live_pivot_trainer.py
Dobromir Popov f967f0a142 T predictions WIP
2025-11-22 01:02:19 +02:00

323 lines
12 KiB
Python

"""
Live Pivot Trainer - Automatic Training on L2 Pivot Points
This module monitors live 1s and 1m charts for L2 pivot points (peaks/troughs)
and automatically creates training samples when they occur.
Integrates with:
- Williams Market Structure for pivot detection
- Real Training Adapter for model training
- Data Provider for live market data
"""
import logging
import threading
import time
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone
from collections import deque
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
class LivePivotTrainer:
"""
Monitors live charts for L2 pivots and automatically trains models
Features:
- Detects L2 pivot points on 1s and 1m timeframes
- Creates training samples automatically
- Trains models in background without blocking inference
- Tracks training history to avoid duplicate training
"""
def __init__(self, orchestrator, data_provider, training_adapter):
"""
Initialize Live Pivot Trainer
Args:
orchestrator: TradingOrchestrator instance
data_provider: DataProvider for market data
training_adapter: RealTrainingAdapter for training
"""
self.orchestrator = orchestrator
self.data_provider = data_provider
self.training_adapter = training_adapter
# Tracking
self.running = False
self.trained_pivots = deque(maxlen=1000) # Track last 1000 trained pivots
self.pivot_history = {
'1s': deque(maxlen=100),
'1m': deque(maxlen=100)
}
# Configuration
self.check_interval = 5 # Check for new pivots every 5 seconds
self.min_pivot_spacing = 60 # Minimum 60 seconds between training on same timeframe
self.last_training_time = {
'1s': 0,
'1m': 0
}
# Williams Market Structure for pivot detection
try:
from core.williams_market_structure import WilliamsMarketStructure
# Fix: WilliamsMarketStructure.__init__ does not accept num_levels
# It defaults to 5 levels internally
self.williams_1s = WilliamsMarketStructure()
self.williams_1m = WilliamsMarketStructure()
logger.info("Williams Market Structure initialized for pivot detection")
except Exception as e:
logger.error(f"Failed to initialize Williams Market Structure: {e}")
self.williams_1s = None
self.williams_1m = None
logger.info("LivePivotTrainer initialized")
def start(self, symbol: str = 'ETH/USDT'):
"""Start monitoring for L2 pivots"""
if self.running:
logger.warning("LivePivotTrainer already running")
return
self.running = True
self.symbol = symbol
# Start monitoring thread
thread = threading.Thread(
target=self._monitoring_loop,
args=(symbol,),
daemon=True
)
thread.start()
logger.info(f"LivePivotTrainer started for {symbol}")
def stop(self):
"""Stop monitoring"""
self.running = False
logger.info("LivePivotTrainer stopped")
def _monitoring_loop(self, symbol: str):
"""Main monitoring loop - checks for new L2 pivots"""
logger.info(f"LivePivotTrainer monitoring loop started for {symbol}")
while self.running:
try:
# Check 1s timeframe
self._check_timeframe_for_pivots(symbol, '1s')
# Check 1m timeframe
self._check_timeframe_for_pivots(symbol, '1m')
# Sleep before next check
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"Error in LivePivotTrainer monitoring loop: {e}")
time.sleep(10) # Wait longer on error
def _check_timeframe_for_pivots(self, symbol: str, timeframe: str):
"""
Check a specific timeframe for new L2 pivots
Args:
symbol: Trading symbol
timeframe: '1s' or '1m'
"""
try:
# Rate limiting - don't train too frequently on same timeframe
current_time = time.time()
if current_time - self.last_training_time[timeframe] < self.min_pivot_spacing:
return
# Get recent candles
candles = self.data_provider.get_historical_data(
symbol=symbol,
timeframe=timeframe,
limit=200 # Need enough candles to detect pivots
)
if candles is None or candles.empty:
logger.debug(f"No candles available for {symbol} {timeframe}")
return
# Detect pivots using Williams Market Structure
williams = self.williams_1s if timeframe == '1s' else self.williams_1m
if williams is None:
return
# Prepare data for Williams Market Structure
# Convert DataFrame to numpy array format
df = candles.copy()
ohlcv_array = df[['open', 'high', 'low', 'close', 'volume']].copy()
# Handle timestamp conversion based on index type
if isinstance(df.index, pd.DatetimeIndex):
# Convert ns to ms
timestamps = df.index.astype(np.int64) // 10**6
else:
# Assume it's already timestamp or handle accordingly
timestamps = df.index
ohlcv_array.insert(0, 'timestamp', timestamps)
ohlcv_array = ohlcv_array.to_numpy()
# Calculate pivots
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
if not pivot_levels or 2 not in pivot_levels:
return
# Get Level 2 pivots
l2_trend_level = pivot_levels[2]
l2_pivots_objs = l2_trend_level.pivot_points
if not l2_pivots_objs:
return
# Check for new L2 pivots (not in history)
new_pivots = []
for p in l2_pivots_objs:
# Convert pivot object to dict for compatibility
pivot_dict = {
'timestamp': p.timestamp, # Keep as datetime object for compatibility
'price': p.price,
'type': p.pivot_type,
'strength': p.strength
}
pivot_id = f"{symbol}_{timeframe}_{pivot_dict['timestamp']}_{pivot_dict['type']}"
if pivot_id not in self.trained_pivots:
new_pivots.append(pivot_dict)
self.trained_pivots.append(pivot_id)
if new_pivots:
logger.info(f"Found {len(new_pivots)} new L2 pivots on {symbol} {timeframe}")
# Train on the most recent pivot
latest_pivot = new_pivots[-1]
self._train_on_pivot(symbol, timeframe, latest_pivot, candles)
self.last_training_time[timeframe] = current_time
except Exception as e:
logger.error(f"Error checking {timeframe} for pivots: {e}")
def _train_on_pivot(self, symbol: str, timeframe: str, pivot: Dict, candles):
"""
Create training sample from pivot and train model
Args:
symbol: Trading symbol
timeframe: Timeframe of pivot
pivot: Pivot point data
candles: DataFrame with OHLCV data
"""
try:
logger.info(f"Training on L2 {pivot['type']} pivot @ {pivot['price']} on {symbol} {timeframe}")
# Determine trade direction based on pivot type
if pivot['type'] == 'high':
# High pivot = potential SHORT entry
direction = 'SHORT'
action = 'SELL'
else:
# Low pivot = potential LONG entry
direction = 'LONG'
action = 'BUY'
# Create training sample
training_sample = {
'test_case_id': f"live_pivot_{symbol}_{timeframe}_{pivot['timestamp']}",
'symbol': symbol,
'timestamp': pivot['timestamp'],
'action': action,
'expected_outcome': {
'direction': direction,
'entry_price': pivot['price'],
'exit_price': None, # Will be determined by model
'profit_loss_pct': 0.0, # Unknown yet
'holding_period_seconds': 300 # 5 minutes default
},
'training_config': {
'timeframes': ['1s', '1m', '1h', '1d'],
'candles_per_timeframe': 200
},
'annotation_metadata': {
'source': 'live_pivot_detection',
'pivot_level': 'L2',
'pivot_type': pivot['type'],
'confidence': pivot.get('strength', 1.0)
}
}
# Train model in background (non-blocking)
thread = threading.Thread(
target=self._background_training,
args=(training_sample,),
daemon=True
)
thread.start()
logger.info(f"Started background training on L2 pivot")
except Exception as e:
logger.error(f"Error training on pivot: {e}")
def _background_training(self, training_sample: Dict):
"""
Execute training in background thread
Args:
training_sample: Training sample data
"""
try:
# Use Transformer model for live pivot training
model_name = 'Transformer'
logger.info(f"Background training started for {training_sample['test_case_id']}")
# Start training session
training_id = self.training_adapter.start_training(
model_name=model_name,
test_cases=[training_sample]
)
logger.info(f"Live pivot training session started: {training_id}")
# Monitor training (optional - could poll status)
# For now, just fire and forget
except Exception as e:
logger.error(f"Error in background training: {e}")
def get_stats(self) -> Dict:
"""Get training statistics"""
return {
'running': self.running,
'total_trained_pivots': len(self.trained_pivots),
'last_training_1s': self.last_training_time.get('1s', 0),
'last_training_1m': self.last_training_time.get('1m', 0),
'pivot_history_1s': len(self.pivot_history['1s']),
'pivot_history_1m': len(self.pivot_history['1m'])
}
# Global instance
_live_pivot_trainer = None
def get_live_pivot_trainer(orchestrator=None, data_provider=None, training_adapter=None):
"""Get or create global LivePivotTrainer instance"""
global _live_pivot_trainer
if _live_pivot_trainer is None and all([orchestrator, data_provider, training_adapter]):
_live_pivot_trainer = LivePivotTrainer(orchestrator, data_provider, training_adapter)
return _live_pivot_trainer