From 97d348d5172800fd1a90360e69e64c8ffb012f39 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 27 May 2025 01:03:40 +0300 Subject: [PATCH] different confidence for open/close position --- core/config.py | 1 + core/data_provider.py | 102 ++++++- core/enhanced_orchestrator.py | 325 +++++++++++++++++++- core/tick_aggregator.py | 560 ++++++++++++++++++++++++++++++++++ web/scalping_dashboard.py | 186 +++++++---- 5 files changed, 1097 insertions(+), 77 deletions(-) create mode 100644 core/tick_aggregator.py diff --git a/core/config.py b/core/config.py index eaceadf..f86b26f 100644 --- a/core/config.py +++ b/core/config.py @@ -79,6 +79,7 @@ class Config: 'cnn_weight': 0.7, 'rl_weight': 0.3, 'confidence_threshold': 0.5, + 'confidence_threshold_close': 0.25, 'decision_frequency': 60 }, 'trading': { diff --git a/core/data_provider.py b/core/data_provider.py index 7324c1a..04a3247 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -27,10 +27,9 @@ from dataclasses import dataclass, field import ta from threading import Thread, Lock from collections import deque -from dataclasses import dataclass, field -import uuid from .config import get_config +from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar logger = logging.getLogger(__name__) @@ -88,13 +87,24 @@ class DataProvider: binance_symbol = symbol.replace('/', '').upper() self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size) + # Initialize tick aggregator for raw tick processing + binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols] + self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols) + + # Raw tick and OHLCV bar callbacks + self.raw_tick_callbacks = [] + self.ohlcv_bar_callbacks = [] + # Performance tracking for subscribers self.distribution_stats = { 'total_ticks_received': 0, 'total_ticks_distributed': 0, 'distribution_errors': 0, 'last_tick_time': {}, - 'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols} + 'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols}, + 'raw_ticks_processed': 0, + 'ohlcv_bars_created': 0, + 'patterns_detected': 0 } # Data validation @@ -474,14 +484,52 @@ class DataProvider: logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}") return - # Create standardized tick + # Process raw tick through aggregator + side = 'sell' if is_buyer_maker else 'buy' + raw_tick, completed_bar = self.tick_aggregator.process_tick( + symbol=symbol, + timestamp=timestamp, + price=price, + volume=volume_usdt, + quantity=quantity, + side=side, + trade_id=str(trade_id) + ) + + # Update statistics + self.distribution_stats['total_ticks_received'] += 1 + self.distribution_stats['ticks_per_symbol'][symbol] += 1 + self.distribution_stats['last_tick_time'][symbol] = timestamp + self.last_prices[symbol] = price + + if raw_tick: + self.distribution_stats['raw_ticks_processed'] += 1 + + # Notify raw tick callbacks + for callback in self.raw_tick_callbacks: + try: + callback(raw_tick) + except Exception as e: + logger.error(f"Error in raw tick callback: {e}") + + if completed_bar: + self.distribution_stats['ohlcv_bars_created'] += 1 + + # Notify OHLCV bar callbacks + for callback in self.ohlcv_bar_callbacks: + try: + callback(completed_bar) + except Exception as e: + logger.error(f"Error in OHLCV bar callback: {e}") + + # Create standardized tick for legacy compatibility tick = MarketTick( symbol=symbol, timestamp=timestamp, price=price, volume=volume_usdt, quantity=quantity, - side='sell' if is_buyer_maker else 'buy', + side=side, trade_id=str(trade_id), is_buyer_maker=is_buyer_maker, raw_data=trade_data @@ -490,12 +538,6 @@ class DataProvider: # Add to buffer self.tick_buffers[symbol].append(tick) - # Update statistics - self.distribution_stats['total_ticks_received'] += 1 - self.distribution_stats['ticks_per_symbol'][symbol] += 1 - self.distribution_stats['last_tick_time'][symbol] = timestamp - self.last_prices[symbol] = price - # Update current prices and candles await self._process_tick(symbol, tick) @@ -1033,6 +1075,36 @@ class DataProvider: return list(self.tick_buffers[binance_symbol])[-count:] return [] + def subscribe_to_raw_ticks(self, callback: Callable[[RawTick], None]) -> str: + """Subscribe to raw tick data with timing information""" + subscriber_id = str(uuid.uuid4()) + self.raw_tick_callbacks.append(callback) + logger.info(f"Raw tick subscriber added: {subscriber_id}") + return subscriber_id + + def subscribe_to_ohlcv_bars(self, callback: Callable[[OHLCVBar], None]) -> str: + """Subscribe to 1s OHLCV bars calculated from ticks""" + subscriber_id = str(uuid.uuid4()) + self.ohlcv_bar_callbacks.append(callback) + logger.info(f"OHLCV bar subscriber added: {subscriber_id}") + return subscriber_id + + def get_raw_tick_features(self, symbol: str, window_size: int = 50) -> Optional[np.ndarray]: + """Get raw tick features for model consumption""" + return self.tick_aggregator.get_tick_features_for_model(symbol, window_size) + + def get_ohlcv_features(self, symbol: str, window_size: int = 60) -> Optional[np.ndarray]: + """Get 1s OHLCV features for model consumption""" + return self.tick_aggregator.get_ohlcv_features_for_model(symbol, window_size) + + def get_detected_patterns(self, symbol: str, count: int = 50) -> List: + """Get recently detected tick patterns""" + return self.tick_aggregator.get_detected_patterns(symbol, count) + + def get_tick_aggregator_stats(self) -> Dict[str, Any]: + """Get tick aggregator statistics""" + return self.tick_aggregator.get_statistics() + def get_subscriber_stats(self) -> Dict[str, Any]: """Get subscriber and distribution statistics""" with self.subscriber_lock: @@ -1048,10 +1120,16 @@ class DataProvider: for sid, s in self.subscribers.items() } + # Get tick aggregator stats + aggregator_stats = self.get_tick_aggregator_stats() + return { 'active_subscribers': active_subscribers, 'total_subscribers': len(self.subscribers), + 'raw_tick_callbacks': len(self.raw_tick_callbacks), + 'ohlcv_bar_callbacks': len(self.ohlcv_bar_callbacks), 'subscriber_details': subscriber_stats, 'distribution_stats': self.distribution_stats.copy(), - 'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()} + 'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()}, + 'tick_aggregator': aggregator_stats } \ No newline at end of file diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index e1d8e28..00bcdd7 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -22,7 +22,7 @@ from collections import deque import torch from .config import get_config -from .data_provider import DataProvider +from .data_provider import DataProvider, RawTick, OHLCVBar from .universal_data_adapter import UniversalDataAdapter, UniversalDataStream from .realtime_tick_processor import RealTimeTickProcessor, ProcessedTickFeatures, integrate_with_orchestrator from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface @@ -112,8 +112,9 @@ class EnhancedTradingOrchestrator: self.symbols = self.config.symbols self.timeframes = self.config.timeframes - # Configuration - self.confidence_threshold = self.config.orchestrator.get('confidence_threshold', 0.6) + # Configuration with different thresholds for opening vs closing + self.confidence_threshold_open = self.config.orchestrator.get('confidence_threshold', 0.6) + self.confidence_threshold_close = self.config.orchestrator.get('confidence_threshold_close', 0.25) # Much lower for closing self.decision_frequency = self.config.orchestrator.get('decision_frequency', 30) # Enhanced weighting system @@ -125,9 +126,11 @@ class EnhancedTradingOrchestrator: self.recent_actions = {symbol: deque(maxlen=100) for symbol in self.symbols} self.market_states = {symbol: deque(maxlen=1000) for symbol in self.symbols} - # Perfect move tracking for CNN training + # Perfect move tracking for CNN training with enhanced retrospective learning self.perfect_moves = deque(maxlen=10000) self.performance_tracker = {} + self.retrospective_learning_active = False + self.last_retrospective_analysis = datetime.now() # RL feedback system self.rl_evaluation_queue = deque(maxlen=1000) @@ -140,12 +143,35 @@ class EnhancedTradingOrchestrator: # Integrate tick processor with orchestrator integrate_with_orchestrator(self, self.tick_processor) + # Subscribe to raw tick data and OHLCV bars from data provider + self.raw_tick_subscriber_id = self.data_provider.subscribe_to_raw_ticks(self._handle_raw_tick) + self.ohlcv_bar_subscriber_id = self.data_provider.subscribe_to_ohlcv_bars(self._handle_ohlcv_bar) + + # Raw tick and OHLCV data storage for models + self.raw_tick_buffers = {symbol: deque(maxlen=1000) for symbol in self.symbols} + self.ohlcv_bar_buffers = {symbol: deque(maxlen=3600) for symbol in self.symbols} # 1 hour of 1s bars + + # Pattern-based decision enhancement + self.pattern_weights = { + 'rapid_fire': 1.5, + 'volume_spike': 1.3, + 'price_acceleration': 1.4, + 'high_frequency_bar': 1.2, + 'volume_concentration': 1.1 + } + + # Current open positions tracking for closing logic + self.open_positions = {} # symbol -> {'side': str, 'entry_price': float, 'timestamp': datetime} + logger.info("Enhanced TradingOrchestrator initialized with Universal Data Format") logger.info(f"Symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") logger.info(f"Universal format: ETH ticks, 1m, 1h, 1d + BTC reference ticks") - logger.info(f"Enhanced confidence threshold: {self.confidence_threshold}") + logger.info(f"Opening confidence threshold: {self.confidence_threshold_open}") + logger.info(f"Closing confidence threshold: {self.confidence_threshold_close}") logger.info("Real-time tick processor integrated for ultra-low latency processing") + logger.info("Raw tick and OHLCV bar processing enabled for pattern detection") + logger.info("Enhanced retrospective learning enabled for perfect opportunity detection") def _initialize_timeframe_weights(self) -> Dict[str, float]: """Initialize weights for different timeframes""" @@ -520,7 +546,7 @@ class EnhancedTradingOrchestrator: async def _make_coordinated_decision(self, symbol: str, predictions: List[EnhancedPrediction], all_predictions: Dict[str, List[EnhancedPrediction]], market_state: MarketState) -> Optional[TradingAction]: - """Make decision considering symbol correlations""" + """Make decision considering symbol correlations and different thresholds for opening/closing""" if not predictions: return None @@ -540,10 +566,21 @@ class EnhancedTradingOrchestrator: final_confidence *= 0.8 logger.info(f"Reduced confidence for {symbol} due to correlation disagreement") - # Apply confidence threshold - if final_confidence < self.confidence_threshold: + # Determine if this is an opening or closing action + has_open_position = symbol in self.open_positions + is_closing_action = self._is_closing_action(symbol, final_action) + + # Apply appropriate confidence threshold + if is_closing_action: + threshold = self.confidence_threshold_close + threshold_type = "closing" + else: + threshold = self.confidence_threshold_open + threshold_type = "opening" + + if final_confidence < threshold: final_action = 'HOLD' - logger.info(f"Action for {symbol} changed to HOLD due to low confidence: {final_confidence:.3f}") + logger.info(f"Action for {symbol} changed to HOLD due to low {threshold_type} confidence: {final_confidence:.3f} < {threshold:.3f}") # Create trading action if final_action != 'HOLD': @@ -562,11 +599,17 @@ class EnhancedTradingOrchestrator: 'timeframe_breakdown': [(tf.timeframe, tf.action, tf.confidence) for tf in primary_pred.timeframe_predictions], 'correlated_sentiment': correlated_sentiment, - 'market_regime': market_state.market_regime + 'market_regime': market_state.market_regime, + 'threshold_type': threshold_type, + 'threshold_used': threshold, + 'is_closing': is_closing_action }, timeframe_analysis=primary_pred.timeframe_predictions ) + # Update position tracking + self._update_position_tracking(symbol, action) + # Store recent action self.recent_actions[symbol].append(action) @@ -577,6 +620,131 @@ class EnhancedTradingOrchestrator: return None + def _is_closing_action(self, symbol: str, action: str) -> bool: + """Determine if an action would close an existing position""" + if symbol not in self.open_positions: + return False + + current_position = self.open_positions[symbol] + + # Closing logic: opposite action closes position + if current_position['side'] == 'LONG' and action == 'SELL': + return True + elif current_position['side'] == 'SHORT' and action == 'BUY': + return True + + return False + + def _update_position_tracking(self, symbol: str, action: TradingAction): + """Update internal position tracking for threshold logic""" + if action.action == 'BUY': + # Close any short position, open long position + if symbol in self.open_positions and self.open_positions[symbol]['side'] == 'SHORT': + del self.open_positions[symbol] + else: + self.open_positions[symbol] = { + 'side': 'LONG', + 'entry_price': action.price, + 'timestamp': action.timestamp + } + elif action.action == 'SELL': + # Close any long position, open short position + if symbol in self.open_positions and self.open_positions[symbol]['side'] == 'LONG': + del self.open_positions[symbol] + else: + self.open_positions[symbol] = { + 'side': 'SHORT', + 'entry_price': action.price, + 'timestamp': action.timestamp + } + + async def trigger_retrospective_learning(self): + """Trigger retrospective learning analysis on recent perfect opportunities""" + try: + current_time = datetime.now() + + # Only run retrospective analysis every 5 minutes to avoid overload + if (current_time - self.last_retrospective_analysis).total_seconds() < 300: + return + + self.last_retrospective_analysis = current_time + + # Analyze recent market moves for missed opportunities + await self._analyze_missed_opportunities() + + # Update model confidence thresholds based on recent performance + self._adjust_confidence_thresholds() + + # Mark retrospective learning as active + self.retrospective_learning_active = True + + logger.info("Retrospective learning analysis completed") + + except Exception as e: + logger.error(f"Error in retrospective learning: {e}") + + async def _analyze_missed_opportunities(self): + """Analyze recent price movements to identify missed perfect opportunities""" + try: + for symbol in self.symbols: + # Get recent price data + recent_data = self.data_provider.get_latest_candles(symbol, '1m', limit=60) + + if recent_data is None or len(recent_data) < 10: + continue + + # Look for significant price movements (>1% in 5 minutes) + for i in range(5, len(recent_data)): + price_change = (recent_data.iloc[i]['close'] - recent_data.iloc[i-5]['close']) / recent_data.iloc[i-5]['close'] + + if abs(price_change) > 0.01: # 1% move + # This was a perfect opportunity + optimal_action = 'BUY' if price_change > 0 else 'SELL' + + # Create perfect move for retrospective learning + perfect_move = PerfectMove( + symbol=symbol, + timeframe='1m', + timestamp=recent_data.iloc[i-5]['timestamp'], + optimal_action=optimal_action, + actual_outcome=price_change, + market_state_before=None, # Would need to reconstruct + market_state_after=None, # Would need to reconstruct + confidence_should_have_been=min(0.95, abs(price_change) * 20) # Higher confidence for bigger moves + ) + + self.perfect_moves.append(perfect_move) + + logger.info(f"Retrospective perfect opportunity identified: {optimal_action} {symbol} ({price_change*100:+.2f}%)") + + except Exception as e: + logger.error(f"Error analyzing missed opportunities: {e}") + + def _adjust_confidence_thresholds(self): + """Dynamically adjust confidence thresholds based on recent performance""" + try: + if len(self.perfect_moves) < 10: + return + + # Analyze recent perfect moves + recent_moves = list(self.perfect_moves)[-20:] + avg_confidence_needed = np.mean([move.confidence_should_have_been for move in recent_moves]) + avg_outcome = np.mean([abs(move.actual_outcome) for move in recent_moves]) + + # Adjust opening threshold based on missed opportunities + if avg_confidence_needed > self.confidence_threshold_open: + adjustment = min(0.1, (avg_confidence_needed - self.confidence_threshold_open) * 0.1) + self.confidence_threshold_open = max(0.3, self.confidence_threshold_open - adjustment) + logger.info(f"Adjusted opening confidence threshold to {self.confidence_threshold_open:.3f}") + + # Keep closing threshold very low for sensitivity + if avg_outcome > 0.02: # If we're seeing big moves + self.confidence_threshold_close = max(0.15, self.confidence_threshold_close * 0.9) + logger.info(f"Lowered closing confidence threshold to {self.confidence_threshold_close:.3f}") + + except Exception as e: + logger.error(f"Error adjusting confidence thresholds: {e}") + def _get_correlated_sentiment(self, symbol: str, all_predictions: Dict[str, List[EnhancedPrediction]]) -> Dict[str, Any]: """Get sentiment from correlated symbols""" @@ -993,7 +1161,7 @@ class EnhancedTradingOrchestrator: return self.tick_processor.get_processing_stats() def get_performance_metrics(self) -> Dict[str, Any]: - """Get performance metrics for dashboard compatibility""" + """Get enhanced performance metrics for dashboard compatibility""" total_actions = sum(len(actions) for actions in self.recent_actions.values()) perfect_moves_count = len(self.perfect_moves) @@ -1004,6 +1172,17 @@ class EnhancedTradingOrchestrator: # Add tick processing stats tick_stats = self.get_realtime_tick_stats() + # Calculate retrospective learning metrics + recent_perfect_moves = list(self.perfect_moves)[-10:] if self.perfect_moves else [] + avg_confidence_needed = np.mean([move.confidence_should_have_been for move in recent_perfect_moves]) if recent_perfect_moves else 0.6 + + # Pattern detection stats + patterns_detected = 0 + for symbol_buffer in self.ohlcv_bar_buffers.values(): + for bar in list(symbol_buffer)[-10:]: # Last 10 bars + if hasattr(bar, 'patterns') and bar.patterns: + patterns_detected += len(bar.patterns) + return { 'total_actions': total_actions, 'perfect_moves': perfect_moves_count, @@ -1011,11 +1190,28 @@ class EnhancedTradingOrchestrator: 'total_pnl': total_pnl, 'symbols_active': len(self.symbols), 'rl_queue_size': len(self.rl_evaluation_queue), - 'confidence_threshold': self.confidence_threshold, + 'confidence_threshold_open': self.confidence_threshold_open, + 'confidence_threshold_close': self.confidence_threshold_close, 'decision_frequency': self.decision_frequency, 'leverage': '500x', # Ultra-fast scalping 'primary_timeframe': '1s', # Main scalping timeframe - 'tick_processing': tick_stats # Real-time tick processing stats + 'tick_processing': tick_stats, # Real-time tick processing stats + 'retrospective_learning': { + 'active': self.retrospective_learning_active, + 'perfect_moves_recent': len(recent_perfect_moves), + 'avg_confidence_needed': avg_confidence_needed, + 'last_analysis': self.last_retrospective_analysis.isoformat(), + 'patterns_detected': patterns_detected + }, + 'position_tracking': { + 'open_positions': len(self.open_positions), + 'positions': {symbol: pos['side'] for symbol, pos in self.open_positions.items()} + }, + 'thresholds': { + 'opening': self.confidence_threshold_open, + 'closing': self.confidence_threshold_close, + 'adaptive': True + } } def analyze_market_conditions(self, symbol: str) -> Dict[str, Any]: @@ -1067,4 +1263,105 @@ class EnhancedTradingOrchestrator: 'symbol': symbol, 'error': str(e), 'analysis': f'Error analyzing {symbol}' - } \ No newline at end of file + } + + def _handle_raw_tick(self, raw_tick: RawTick): + """Handle incoming raw tick data for pattern detection and learning""" + try: + symbol = raw_tick.symbol if hasattr(raw_tick, 'symbol') else 'UNKNOWN' + + # Store raw tick for analysis + if symbol in self.raw_tick_buffers: + self.raw_tick_buffers[symbol].append(raw_tick) + + # Detect violent moves for retrospective learning + if raw_tick.time_since_last < 50 and abs(raw_tick.price_change) > 0: # Fast tick with price change + price_change_pct = abs(raw_tick.price_change) / raw_tick.price if raw_tick.price > 0 else 0 + + if price_change_pct > 0.001: # 0.1% price change in single tick + logger.info(f"Violent tick detected: {symbol} {raw_tick.price_change:+.2f} ({price_change_pct*100:.3f}%) in {raw_tick.time_since_last:.0f}ms") + + # Create perfect move for immediate learning + optimal_action = 'BUY' if raw_tick.price_change > 0 else 'SELL' + perfect_move = PerfectMove( + symbol=symbol, + timeframe='tick', + timestamp=raw_tick.timestamp, + optimal_action=optimal_action, + actual_outcome=price_change_pct, + market_state_before=None, + market_state_after=None, + confidence_should_have_been=min(0.95, price_change_pct * 50) + ) + + self.perfect_moves.append(perfect_move) + self.retrospective_learning_active = True + + except Exception as e: + logger.error(f"Error handling raw tick: {e}") + + def _handle_ohlcv_bar(self, ohlcv_bar: OHLCVBar): + """Handle incoming 1s OHLCV bar for pattern detection""" + try: + symbol = ohlcv_bar.symbol if hasattr(ohlcv_bar, 'symbol') else 'UNKNOWN' + + # Store OHLCV bar for analysis + if symbol in self.ohlcv_bar_buffers: + self.ohlcv_bar_buffers[symbol].append(ohlcv_bar) + + # Analyze bar patterns for learning opportunities + if ohlcv_bar.patterns: + for pattern in ohlcv_bar.patterns: + pattern_weight = self.pattern_weights.get(pattern.pattern_type, 1.0) + + if pattern.confidence > 0.7 and pattern_weight > 1.2: + logger.info(f"High-confidence pattern detected: {pattern.pattern_type} for {symbol} (conf: {pattern.confidence:.3f})") + + # Create learning opportunity based on pattern + if pattern.price_change != 0: + optimal_action = 'BUY' if pattern.price_change > 0 else 'SELL' + outcome = abs(pattern.price_change) / ohlcv_bar.close if ohlcv_bar.close > 0 else 0 + + perfect_move = PerfectMove( + symbol=symbol, + timeframe='1s', + timestamp=pattern.start_time, + optimal_action=optimal_action, + actual_outcome=outcome, + market_state_before=None, + market_state_after=None, + confidence_should_have_been=min(0.9, pattern.confidence * pattern_weight) + ) + + self.perfect_moves.append(perfect_move) + + # Check for significant 1s bar moves + if ohlcv_bar.high > 0 and ohlcv_bar.low > 0: + bar_range = (ohlcv_bar.high - ohlcv_bar.low) / ohlcv_bar.close + + if bar_range > 0.002: # 0.2% range in 1 second + logger.info(f"Significant 1s bar range: {symbol} {bar_range*100:.3f}% range") + + # Determine optimal action based on close vs open + if ohlcv_bar.close > ohlcv_bar.open: + optimal_action = 'BUY' + outcome = (ohlcv_bar.close - ohlcv_bar.open) / ohlcv_bar.open + else: + optimal_action = 'SELL' + outcome = (ohlcv_bar.open - ohlcv_bar.close) / ohlcv_bar.open + + perfect_move = PerfectMove( + symbol=symbol, + timeframe='1s', + timestamp=ohlcv_bar.timestamp, + optimal_action=optimal_action, + actual_outcome=outcome, + market_state_before=None, + market_state_after=None, + confidence_should_have_been=min(0.85, bar_range * 100) + ) + + self.perfect_moves.append(perfect_move) + + except Exception as e: + logger.error(f"Error handling OHLCV bar: {e}") \ No newline at end of file diff --git a/core/tick_aggregator.py b/core/tick_aggregator.py new file mode 100644 index 0000000..0522be8 --- /dev/null +++ b/core/tick_aggregator.py @@ -0,0 +1,560 @@ +""" +Real-Time Tick Aggregator for Raw Tick Processing + +This module processes raw tick data and calculates 1s OHLCV bars in real-time. +It preserves raw tick data for pattern detection while providing aggregated data +for traditional analysis. + +Features: +- Raw tick data preservation with time difference analysis +- Real-time 1s OHLCV calculation from ticks +- Tick pattern detection for violent moves +- Volume-weighted price calculations +- Microstructure analysis +""" + +import logging +import time +import numpy as np +import pandas as pd +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple, Any, Deque +from collections import deque +from threading import Lock +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +@dataclass +class RawTick: + """Raw tick data with timing information""" + timestamp: datetime + price: float + volume: float + quantity: float + side: str # 'buy' or 'sell' + trade_id: str + time_since_last: float # Milliseconds since last tick + price_change: float # Price change from last tick + volume_intensity: float # Volume relative to recent average + +@dataclass +class TickPattern: + """Detected tick pattern for violent moves""" + start_time: datetime + end_time: datetime + pattern_type: str # 'rapid_fire', 'volume_spike', 'price_acceleration' + tick_count: int + avg_time_between_ticks: float + price_change: float + volume_total: float + confidence: float + +@dataclass +class OHLCVBar: + """1-second OHLCV bar calculated from ticks""" + timestamp: datetime + open: float + high: float + low: float + close: float + volume: float + tick_count: int + buy_volume: float + sell_volume: float + vwap: float # Volume-weighted average price + patterns: List[TickPattern] = field(default_factory=list) + +class RealTimeTickAggregator: + """ + Real-time tick aggregator that processes raw ticks and calculates 1s OHLCV bars + while preserving tick-level data for pattern detection + """ + + def __init__(self, symbols: List[str] = None, tick_buffer_size: int = 10000): + """Initialize the tick aggregator""" + self.symbols = symbols or ['ETHUSDT', 'BTCUSDT'] + self.tick_buffer_size = tick_buffer_size + + # Raw tick storage (preserves all ticks with timing data) + self.raw_tick_buffers: Dict[str, Deque[RawTick]] = {} + + # 1s OHLCV bars calculated from ticks + self.ohlcv_bars: Dict[str, Deque[OHLCVBar]] = {} + + # Current incomplete bars being built + self.current_bars: Dict[str, Dict] = {} + + # Pattern detection storage + self.detected_patterns: Dict[str, Deque[TickPattern]] = {} + + # Initialize buffers for each symbol + for symbol in self.symbols: + self.raw_tick_buffers[symbol] = deque(maxlen=tick_buffer_size) + self.ohlcv_bars[symbol] = deque(maxlen=3600) # 1 hour of 1s bars + self.detected_patterns[symbol] = deque(maxlen=1000) + self.current_bars[symbol] = None + + # Pattern detection parameters + self.rapid_fire_threshold = 50 # ms between ticks for rapid fire + self.volume_spike_multiplier = 3.0 # Volume spike threshold + self.price_acceleration_threshold = 0.001 # 0.1% price change threshold + + # Statistics tracking + self.stats = { + 'total_ticks_processed': 0, + 'patterns_detected': 0, + 'bars_created': 0, + 'processing_times': deque(maxlen=1000) + } + + # Thread safety + self.data_lock = Lock() + + # Last tick tracking for time differences + self.last_tick_times: Dict[str, datetime] = {} + self.last_tick_prices: Dict[str, float] = {} + + # Volume tracking for intensity calculation + self.volume_history: Dict[str, Deque[float]] = {} + for symbol in self.symbols: + self.volume_history[symbol] = deque(maxlen=100) # Last 100 ticks for volume average + + logger.info(f"RealTimeTickAggregator initialized for symbols: {self.symbols}") + logger.info(f"Tick buffer size: {tick_buffer_size}") + logger.info("Pattern detection enabled for violent moves") + + def process_tick(self, symbol: str, timestamp: datetime, price: float, + volume: float, quantity: float, side: str, trade_id: str) -> Tuple[RawTick, Optional[OHLCVBar]]: + """ + Process a raw tick and return the tick data plus any completed 1s bar + + Returns: + Tuple[RawTick, Optional[OHLCVBar]]: The processed tick and completed bar (if any) + """ + start_time = time.time() + + try: + with self.data_lock: + # Calculate timing information + time_since_last = 0.0 + price_change = 0.0 + + if symbol in self.last_tick_times: + time_diff = (timestamp - self.last_tick_times[symbol]).total_seconds() * 1000 + time_since_last = max(0, time_diff) # Ensure non-negative + + if symbol in self.last_tick_prices: + price_change = price - self.last_tick_prices[symbol] + + # Calculate volume intensity + volume_intensity = self._calculate_volume_intensity(symbol, volume) + + # Create raw tick + raw_tick = RawTick( + timestamp=timestamp, + price=price, + volume=volume, + quantity=quantity, + side=side, + trade_id=trade_id, + time_since_last=time_since_last, + price_change=price_change, + volume_intensity=volume_intensity + ) + + # Add to raw tick buffer + self.raw_tick_buffers[symbol].append(raw_tick) + + # Update tracking + self.last_tick_times[symbol] = timestamp + self.last_tick_prices[symbol] = price + self.volume_history[symbol].append(volume) + + # Process for 1s OHLCV bar + completed_bar = self._update_ohlcv_bar(symbol, raw_tick) + + # Detect patterns + self._detect_tick_patterns(symbol, raw_tick) + + # Update statistics + self.stats['total_ticks_processed'] += 1 + processing_time = (time.time() - start_time) * 1000 + self.stats['processing_times'].append(processing_time) + + return raw_tick, completed_bar + + except Exception as e: + logger.error(f"Error processing tick for {symbol}: {e}") + return None, None + + def _calculate_volume_intensity(self, symbol: str, volume: float) -> float: + """Calculate volume intensity relative to recent average""" + try: + if symbol not in self.volume_history or len(self.volume_history[symbol]) < 10: + return 1.0 # Default intensity + + recent_volumes = list(self.volume_history[symbol]) + avg_volume = np.mean(recent_volumes) + + if avg_volume > 0: + return volume / avg_volume + return 1.0 + + except Exception as e: + logger.error(f"Error calculating volume intensity: {e}") + return 1.0 + + def _update_ohlcv_bar(self, symbol: str, tick: RawTick) -> Optional[OHLCVBar]: + """Update or create 1s OHLCV bar from tick data""" + try: + # Get the second timestamp (truncate to second) + bar_timestamp = tick.timestamp.replace(microsecond=0) + + # Check if we need a new bar + current_bar = self.current_bars[symbol] + + if current_bar is None or current_bar['timestamp'] != bar_timestamp: + # Complete the previous bar if it exists + completed_bar = None + if current_bar is not None: + completed_bar = self._finalize_bar(symbol, current_bar) + + # Start new bar + self.current_bars[symbol] = { + 'timestamp': bar_timestamp, + 'open': tick.price, + 'high': tick.price, + 'low': tick.price, + 'close': tick.price, + 'volume': tick.volume, + 'tick_count': 1, + 'buy_volume': tick.volume if tick.side == 'buy' else 0, + 'sell_volume': tick.volume if tick.side == 'sell' else 0, + 'volume_price_sum': tick.volume * tick.price, + 'ticks': [tick] + } + + return completed_bar + else: + # Update existing bar + current_bar['high'] = max(current_bar['high'], tick.price) + current_bar['low'] = min(current_bar['low'], tick.price) + current_bar['close'] = tick.price + current_bar['volume'] += tick.volume + current_bar['tick_count'] += 1 + current_bar['volume_price_sum'] += tick.volume * tick.price + current_bar['ticks'].append(tick) + + if tick.side == 'buy': + current_bar['buy_volume'] += tick.volume + else: + current_bar['sell_volume'] += tick.volume + + return None + + except Exception as e: + logger.error(f"Error updating OHLCV bar for {symbol}: {e}") + return None + + def _finalize_bar(self, symbol: str, bar_data: Dict) -> OHLCVBar: + """Finalize a 1s OHLCV bar and detect patterns""" + try: + # Calculate VWAP + vwap = bar_data['volume_price_sum'] / bar_data['volume'] if bar_data['volume'] > 0 else bar_data['close'] + + # Detect patterns in this bar + bar_patterns = self._detect_bar_patterns(symbol, bar_data['ticks']) + + # Create OHLCV bar + ohlcv_bar = OHLCVBar( + timestamp=bar_data['timestamp'], + open=bar_data['open'], + high=bar_data['high'], + low=bar_data['low'], + close=bar_data['close'], + volume=bar_data['volume'], + tick_count=bar_data['tick_count'], + buy_volume=bar_data['buy_volume'], + sell_volume=bar_data['sell_volume'], + vwap=vwap, + patterns=bar_patterns + ) + + # Add to buffer + self.ohlcv_bars[symbol].append(ohlcv_bar) + self.stats['bars_created'] += 1 + + return ohlcv_bar + + except Exception as e: + logger.error(f"Error finalizing bar for {symbol}: {e}") + return None + + def _detect_tick_patterns(self, symbol: str, tick: RawTick): + """Detect patterns in tick data for violent moves""" + try: + # Rapid fire detection (very fast ticks) + if tick.time_since_last > 0 and tick.time_since_last < self.rapid_fire_threshold: + self._check_rapid_fire_pattern(symbol, tick) + + # Volume spike detection + if tick.volume_intensity > self.volume_spike_multiplier: + self._check_volume_spike_pattern(symbol, tick) + + # Price acceleration detection + if abs(tick.price_change) > 0 and symbol in self.last_tick_prices: + price_change_pct = abs(tick.price_change) / self.last_tick_prices[symbol] + if price_change_pct > self.price_acceleration_threshold: + self._check_price_acceleration_pattern(symbol, tick) + + except Exception as e: + logger.error(f"Error detecting tick patterns for {symbol}: {e}") + + def _check_rapid_fire_pattern(self, symbol: str, tick: RawTick): + """Check for rapid fire tick pattern""" + try: + # Look at last 10 ticks to see if we have a rapid fire sequence + recent_ticks = list(self.raw_tick_buffers[symbol])[-10:] + + if len(recent_ticks) >= 5: + rapid_ticks = [t for t in recent_ticks if t.time_since_last < self.rapid_fire_threshold] + + if len(rapid_ticks) >= 5: + # We have a rapid fire pattern + pattern = TickPattern( + start_time=rapid_ticks[0].timestamp, + end_time=tick.timestamp, + pattern_type='rapid_fire', + tick_count=len(rapid_ticks), + avg_time_between_ticks=np.mean([t.time_since_last for t in rapid_ticks]), + price_change=tick.price - rapid_ticks[0].price, + volume_total=sum(t.volume for t in rapid_ticks), + confidence=min(1.0, len(rapid_ticks) / 10.0) + ) + + self.detected_patterns[symbol].append(pattern) + self.stats['patterns_detected'] += 1 + + logger.debug(f"RAPID FIRE pattern detected for {symbol}: {len(rapid_ticks)} ticks in {(tick.timestamp - rapid_ticks[0].timestamp).total_seconds():.3f}s") + + except Exception as e: + logger.error(f"Error checking rapid fire pattern: {e}") + + def _check_volume_spike_pattern(self, symbol: str, tick: RawTick): + """Check for volume spike pattern""" + try: + pattern = TickPattern( + start_time=tick.timestamp, + end_time=tick.timestamp, + pattern_type='volume_spike', + tick_count=1, + avg_time_between_ticks=tick.time_since_last, + price_change=tick.price_change, + volume_total=tick.volume, + confidence=min(1.0, tick.volume_intensity / self.volume_spike_multiplier) + ) + + self.detected_patterns[symbol].append(pattern) + self.stats['patterns_detected'] += 1 + + logger.debug(f"VOLUME SPIKE pattern detected for {symbol}: {tick.volume_intensity:.2f}x normal volume") + + except Exception as e: + logger.error(f"Error checking volume spike pattern: {e}") + + def _check_price_acceleration_pattern(self, symbol: str, tick: RawTick): + """Check for price acceleration pattern""" + try: + price_change_pct = abs(tick.price_change) / self.last_tick_prices[symbol] + + pattern = TickPattern( + start_time=tick.timestamp, + end_time=tick.timestamp, + pattern_type='price_acceleration', + tick_count=1, + avg_time_between_ticks=tick.time_since_last, + price_change=tick.price_change, + volume_total=tick.volume, + confidence=min(1.0, price_change_pct / self.price_acceleration_threshold) + ) + + self.detected_patterns[symbol].append(pattern) + self.stats['patterns_detected'] += 1 + + logger.debug(f"PRICE ACCELERATION pattern detected for {symbol}: {price_change_pct*100:.3f}% change") + + except Exception as e: + logger.error(f"Error checking price acceleration pattern: {e}") + + def _detect_bar_patterns(self, symbol: str, ticks: List[RawTick]) -> List[TickPattern]: + """Detect patterns within a completed 1s bar""" + patterns = [] + + try: + if len(ticks) < 2: + return patterns + + # Check for high-frequency trading within the bar + if len(ticks) > 20: # More than 20 ticks in 1 second + avg_time = np.mean([t.time_since_last for t in ticks[1:]]) + pattern = TickPattern( + start_time=ticks[0].timestamp, + end_time=ticks[-1].timestamp, + pattern_type='high_frequency_bar', + tick_count=len(ticks), + avg_time_between_ticks=avg_time, + price_change=ticks[-1].price - ticks[0].price, + volume_total=sum(t.volume for t in ticks), + confidence=min(1.0, len(ticks) / 50.0) + ) + patterns.append(pattern) + + # Check for volume concentration + total_volume = sum(t.volume for t in ticks) + max_tick_volume = max(t.volume for t in ticks) + if max_tick_volume > total_volume * 0.5: # Single tick has >50% of bar volume + pattern = TickPattern( + start_time=ticks[0].timestamp, + end_time=ticks[-1].timestamp, + pattern_type='volume_concentration', + tick_count=len(ticks), + avg_time_between_ticks=np.mean([t.time_since_last for t in ticks[1:]]), + price_change=ticks[-1].price - ticks[0].price, + volume_total=total_volume, + confidence=max_tick_volume / total_volume + ) + patterns.append(pattern) + + except Exception as e: + logger.error(f"Error detecting bar patterns: {e}") + + return patterns + + def get_raw_ticks(self, symbol: str, count: int = 100) -> List[RawTick]: + """Get recent raw ticks for a symbol""" + with self.data_lock: + if symbol in self.raw_tick_buffers: + return list(self.raw_tick_buffers[symbol])[-count:] + return [] + + def get_ohlcv_bars(self, symbol: str, count: int = 100) -> List[OHLCVBar]: + """Get recent 1s OHLCV bars for a symbol""" + with self.data_lock: + if symbol in self.ohlcv_bars: + return list(self.ohlcv_bars[symbol])[-count:] + return [] + + def get_detected_patterns(self, symbol: str, count: int = 50) -> List[TickPattern]: + """Get recently detected patterns for a symbol""" + with self.data_lock: + if symbol in self.detected_patterns: + return list(self.detected_patterns[symbol])[-count:] + return [] + + def get_tick_features_for_model(self, symbol: str, window_size: int = 50) -> Optional[np.ndarray]: + """ + Get tick features formatted for model consumption + + Returns: + np.ndarray: Shape (window_size, features) where features include: + [price, volume, time_since_last, price_change, volume_intensity, side_indicator] + """ + try: + with self.data_lock: + recent_ticks = self.get_raw_ticks(symbol, window_size) + + if len(recent_ticks) < window_size: + return None + + features = [] + for tick in recent_ticks: + tick_features = [ + tick.price, + tick.volume, + tick.time_since_last, + tick.price_change, + tick.volume_intensity, + 1.0 if tick.side == 'buy' else 0.0 # Buy/sell indicator + ] + features.append(tick_features) + + return np.array(features, dtype=np.float32) + + except Exception as e: + logger.error(f"Error getting tick features for model: {e}") + return None + + def get_ohlcv_features_for_model(self, symbol: str, window_size: int = 60) -> Optional[np.ndarray]: + """ + Get 1s OHLCV features formatted for model consumption + + Returns: + np.ndarray: Shape (window_size, 5) - standard OHLCV format + """ + try: + with self.data_lock: + recent_bars = self.get_ohlcv_bars(symbol, window_size) + + if len(recent_bars) < window_size: + return None + + features = [] + for bar in recent_bars: + ohlcv_features = [ + bar.open, + bar.high, + bar.low, + bar.close, + bar.volume + ] + features.append(ohlcv_features) + + return np.array(features, dtype=np.float32) + + except Exception as e: + logger.error(f"Error getting OHLCV features for model: {e}") + return None + + def get_statistics(self) -> Dict[str, Any]: + """Get aggregator statistics""" + with self.data_lock: + avg_processing_time = np.mean(list(self.stats['processing_times'])) if self.stats['processing_times'] else 0 + + return { + 'total_ticks_processed': self.stats['total_ticks_processed'], + 'patterns_detected': self.stats['patterns_detected'], + 'bars_created': self.stats['bars_created'], + 'avg_processing_time_ms': avg_processing_time, + 'symbols': self.symbols, + 'buffer_sizes': {symbol: len(self.raw_tick_buffers[symbol]) for symbol in self.symbols}, + 'bar_counts': {symbol: len(self.ohlcv_bars[symbol]) for symbol in self.symbols}, + 'pattern_counts': {symbol: len(self.detected_patterns[symbol]) for symbol in self.symbols} + } + + def clear_old_data(self, hours_to_keep: int = 1): + """Clear old data to manage memory usage""" + try: + cutoff_time = datetime.now() - timedelta(hours=hours_to_keep) + + with self.data_lock: + for symbol in self.symbols: + # Clear old ticks + while (self.raw_tick_buffers[symbol] and + self.raw_tick_buffers[symbol][0].timestamp < cutoff_time): + self.raw_tick_buffers[symbol].popleft() + + # Clear old bars + while (self.ohlcv_bars[symbol] and + self.ohlcv_bars[symbol][0].timestamp < cutoff_time): + self.ohlcv_bars[symbol].popleft() + + # Clear old patterns + while (self.detected_patterns[symbol] and + self.detected_patterns[symbol][0].start_time < cutoff_time): + self.detected_patterns[symbol].popleft() + + logger.info(f"Cleared data older than {hours_to_keep} hours") + + except Exception as e: + logger.error(f"Error clearing old data: {e}") \ No newline at end of file diff --git a/web/scalping_dashboard.py b/web/scalping_dashboard.py index 8ed8528..a578723 100644 --- a/web/scalping_dashboard.py +++ b/web/scalping_dashboard.py @@ -529,22 +529,22 @@ class RealTimeScalpingDashboard: html.Div([ html.H4(id="current-balance", className="text-success"), html.P("Current Balance", className="text-white") - ], className="col-md-2 text-center"), + ], className="col-md-3 text-center"), # Increased from col-md-2 html.Div([ html.H4(id="session-duration", className="text-info"), html.P("Session Time", className="text-white") - ], className="col-md-2 text-center"), + ], className="col-md-3 text-center"), # Increased from col-md-2 html.Div([ - html.H4(id="open-positions", className="text-warning"), + html.Div(id="open-positions", className="text-warning"), html.P("Open Positions", className="text-white") - ], className="col-md-2 text-center"), + ], className="col-md-3 text-center"), # Increased from col-md-2 to col-md-3 for more space html.Div([ html.H4("500x", className="text-danger"), html.P("Leverage", className="text-white") - ], className="col-md-2 text-center") + ], className="col-md-3 text-center") # Increased from col-md-2 ], className="row mb-3"), # Live metrics row @@ -717,7 +717,34 @@ class RealTimeScalpingDashboard: # Update session metrics current_balance = f"${dashboard_instance.trading_session.current_balance:.2f}" - open_positions = str(len(dashboard_instance.trading_session.positions)) + + # Create color-coded position display + positions = dashboard_instance.trading_session.positions + if positions: + position_displays = [] + for symbol, pos in positions.items(): + side = pos['side'] + size = pos['size'] + entry_price = pos['entry_price'] + current_price = dashboard_instance.live_prices.get(symbol, entry_price) + + # Calculate unrealized P&L + if side == 'LONG': + unrealized_pnl = (current_price - entry_price) * size + color_class = "text-success" # Green for LONG + side_display = "[LONG]" + else: # SHORT + unrealized_pnl = (entry_price - current_price) * size + color_class = "text-danger" # Red for SHORT + side_display = "[SHORT]" + + position_text = f"{side_display} {size:.3f} @ ${entry_price:.2f} | P&L: ${unrealized_pnl:+.2f}" + position_displays.append(html.P(position_text, className=f"{color_class} mb-1")) + + open_positions = html.Div(position_displays) + else: + open_positions = html.P("No open positions", className="text-muted") + pnl = f"${dashboard_instance.trading_session.total_pnl:+.2f}" win_rate = f"{dashboard_instance.trading_session.get_win_rate()*100:.1f}%" total_trades = str(dashboard_instance.trading_session.total_trades) @@ -1767,25 +1794,41 @@ class RealTimeScalpingDashboard: return fig def _create_model_training_status(self): - """Create model training progress display""" + """Create enhanced model training progress display with perfect opportunity detection""" try: # Get model training metrics from orchestrator if hasattr(self.orchestrator, 'get_performance_metrics'): metrics = self.orchestrator.get_performance_metrics() + # Get perfect moves for retrospective training + perfect_moves_count = metrics.get('perfect_moves', 0) + recent_perfect_moves = [] + if hasattr(self.orchestrator, 'get_recent_perfect_moves'): + recent_perfect_moves = self.orchestrator.get_recent_perfect_moves(limit=3) + + # Check if models are actively training + rl_queue_size = metrics.get('rl_queue_size', 0) + is_rl_training = rl_queue_size > 0 + is_cnn_training = perfect_moves_count > 0 + return html.Div([ html.Div([ - html.H6("RL Training", className="text-success"), - html.P(f"Queue Size: {metrics.get('rl_queue_size', 0)}", className="text-white"), + html.H6("RL Training", className="text-success" if is_rl_training else "text-warning"), + html.P(f"Status: {'ACTIVE' if is_rl_training else 'IDLE'}", + className="text-success" if is_rl_training else "text-warning"), + html.P(f"Queue Size: {rl_queue_size}", className="text-white"), html.P(f"Win Rate: {metrics.get('win_rate', 0)*100:.1f}%", className="text-white"), - html.P(f"Total Actions: {metrics.get('total_actions', 0)}", className="text-white") + html.P(f"Actions: {metrics.get('total_actions', 0)}", className="text-white") ], className="col-md-6"), html.Div([ - html.H6("CNN Training", className="text-warning"), - html.P(f"Perfect Moves: {metrics.get('perfect_moves', 0)}", className="text-white"), + html.H6("CNN Training", className="text-success" if is_cnn_training else "text-warning"), + html.P(f"Status: {'LEARNING' if is_cnn_training else 'IDLE'}", + className="text-success" if is_cnn_training else "text-warning"), + html.P(f"Perfect Moves: {perfect_moves_count}", className="text-white"), html.P(f"Confidence: {metrics.get('confidence_threshold', 0.6):.2f}", className="text-white"), - html.P(f"Frequency: {metrics.get('decision_frequency', 30)}s", className="text-white") + html.P(f"Retrospective: {'ON' if recent_perfect_moves else 'OFF'}", + className="text-success" if recent_perfect_moves else "text-muted") ], className="col-md-6") ], className="row") else: @@ -1845,54 +1888,87 @@ class RealTimeScalpingDashboard: ]) def _create_training_events_log(self): - """Create training events log""" + """Create enhanced training events log with retrospective learning details""" try: # Get recent perfect moves and training events events = [] if hasattr(self.orchestrator, 'perfect_moves') and self.orchestrator.perfect_moves: - perfect_moves = self.orchestrator.perfect_moves[-5:] # Last 5 perfect moves + perfect_moves = list(self.orchestrator.perfect_moves)[-8:] # Last 8 perfect moves for move in perfect_moves: timestamp = move.timestamp.strftime('%H:%M:%S') + outcome_pct = move.actual_outcome * 100 + confidence_gap = move.confidence_should_have_been - 0.6 # vs default threshold + events.append({ 'time': timestamp, 'type': 'CNN', - 'event': f"Perfect {move.optimal_action} detected for {move.symbol}", + 'event': f"Perfect {move.optimal_action} {move.symbol} ({outcome_pct:+.2f}%) - Retrospective Learning", 'confidence': move.confidence_should_have_been, - 'color': 'text-warning' + 'color': 'text-warning', + 'priority': 3 if abs(outcome_pct) > 2 else 2 # High priority for big moves + }) + + # Add confidence adjustment event + if confidence_gap > 0.1: + events.append({ + 'time': timestamp, + 'type': 'TUNE', + 'event': f"Confidence threshold adjustment needed: +{confidence_gap:.2f}", + 'confidence': confidence_gap, + 'color': 'text-info', + 'priority': 2 + }) + + # Add RL training events based on queue activity + if hasattr(self.orchestrator, 'rl_evaluation_queue') and self.orchestrator.rl_evaluation_queue: + queue_size = len(self.orchestrator.rl_evaluation_queue) + current_time = datetime.now() + + if queue_size > 0: + events.append({ + 'time': current_time.strftime('%H:%M:%S'), + 'type': 'RL', + 'event': f'Experience replay active (queue: {queue_size} actions)', + 'confidence': min(1.0, queue_size / 10), + 'color': 'text-success', + 'priority': 3 if queue_size > 5 else 1 }) - # Add RL training events (mock for now) - current_time = datetime.now() - events.extend([ - { - 'time': (current_time - timedelta(minutes=2)).strftime('%H:%M:%S'), - 'type': 'RL', - 'event': 'Experience replay completed (batch_size=128)', - 'confidence': 0.85, - 'color': 'text-success' - }, - { - 'time': (current_time - timedelta(minutes=5)).strftime('%H:%M:%S'), - 'type': 'TICK', - 'event': 'High-confidence tick features processed', - 'confidence': 0.92, - 'color': 'text-info' - } - ]) + # Add tick processing events + if hasattr(self.orchestrator, 'get_realtime_tick_stats'): + tick_stats = self.orchestrator.get_realtime_tick_stats() + patterns_detected = tick_stats.get('patterns_detected', 0) + + if patterns_detected > 0: + events.append({ + 'time': datetime.now().strftime('%H:%M:%S'), + 'type': 'TICK', + 'event': f'Violent move patterns detected: {patterns_detected}', + 'confidence': min(1.0, patterns_detected / 5), + 'color': 'text-info', + 'priority': 2 + }) + + # Sort events by priority and time + events.sort(key=lambda x: (x.get('priority', 1), x['time']), reverse=True) if not events: return html.Div([ - html.P("No training events yet. Models are initializing...", + html.P("🤖 Models initializing... Waiting for perfect opportunities to learn from.", + className="text-muted text-center"), + html.P("💡 Retrospective learning will activate when significant price moves are detected.", className="text-muted text-center") ]) log_items = [] - for event in events[-8:]: # Last 8 events - icon = "🧠" if event['type'] == 'CNN' else "🤖" if event['type'] == 'RL' else "⚡" + for event in events[:10]: # Show top 10 events + icon = "🧠" if event['type'] == 'CNN' else "🤖" if event['type'] == 'RL' else "⚙️" if event['type'] == 'TUNE' else "⚡" + confidence_display = f"{event['confidence']:.2f}" if event['confidence'] <= 1.0 else f"{event['confidence']:.3f}" + log_items.append( - html.P(f"{event['time']} {icon} [{event['type']}] {event['event']} (conf: {event['confidence']:.2f})", + html.P(f"{event['time']} {icon} [{event['type']}] {event['event']} (conf: {confidence_display})", className=f"{event['color']} mb-1") ) @@ -2121,25 +2197,33 @@ class RealTimeScalpingDashboard: logger.error(f"Error logging trade for accounting: {e}") def _start_orchestrator_trading(self): - """Start orchestrator trading thread""" + """Start orchestrator-based trading in background""" def orchestrator_loop(): - """Background thread for orchestrator trading decisions""" - logger.info("Orchestrator trading thread started") + """Background orchestrator trading loop with retrospective learning""" + logger.info("ORCHESTRATOR: Starting enhanced trading loop with retrospective learning") while self.streaming: try: # Process orchestrator decisions - self._process_orchestrator_decisions() - logger.debug("Processing orchestrator decisions...") - time.sleep(30) # Decision cycle every 30 seconds + self._process_orchestrator_decisions() + + # Trigger retrospective learning analysis every 5 minutes + if hasattr(self.orchestrator, 'trigger_retrospective_learning'): + asyncio.run(self.orchestrator.trigger_retrospective_learning()) + + # Sleep for decision frequency + time.sleep(30) # 30 second intervals for scalping + except Exception as e: - logger.error(f"Error in orchestrator trading loop: {e}") - time.sleep(5) + logger.error(f"Error in orchestrator loop: {e}") + time.sleep(5) # Short sleep on error + + logger.info("ORCHESTRATOR: Trading loop stopped") - # Start the thread - thread = Thread(target=orchestrator_loop, daemon=True) - thread.start() - logger.info("SUCCESS: Orchestrator trading thread running") + # Start orchestrator in background thread + orchestrator_thread = Thread(target=orchestrator_loop, daemon=True) + orchestrator_thread.start() + logger.info("ORCHESTRATOR: Enhanced trading loop started with retrospective learning") def create_scalping_dashboard(data_provider=None, orchestrator=None): """Create real-time dashboard instance"""