diff --git a/config.yaml b/config.yaml index ba530d4..1ddf8de 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,11 @@ # Enhanced Multi-Modal Trading System Configuration +# System Settings +system: + timezone: "Europe/Sofia" # Configurable timezone for all timestamps + log_level: "INFO" # DEBUG, INFO, WARNING, ERROR + session_timeout: 3600 # Session timeout in seconds + # Trading Symbols (extendable/configurable) symbols: - "ETH/USDC" # MEXC supports ETHUSDC for API trading @@ -161,7 +167,7 @@ mexc_trading: # Risk management max_daily_loss_usd: 5.0 # Stop trading if daily loss exceeds $5 max_concurrent_positions: 3 # Only 1 position at a time for testing - max_trades_per_hour: 60 # Maximum 60 trades per hour + max_trades_per_hour: 600 # Maximum 60 trades per hour min_trade_interval_seconds: 30 # Minimum between trades # Order configuration diff --git a/debug_dashboard_issue.py b/debug_dashboard_issue.py new file mode 100644 index 0000000..aad2700 --- /dev/null +++ b/debug_dashboard_issue.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Debug Dashboard Data Flow + +Check if the dashboard is receiving data and updating properly. +""" + +import sys +import logging +import time +import requests +import json +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from core.config import get_config, setup_logging +from core.data_provider import DataProvider + +# Setup logging +setup_logging() +logger = logging.getLogger(__name__) + +def test_data_provider(): + """Test if data provider is working""" + logger.info("=== TESTING DATA PROVIDER ===") + + try: + # Test data provider + data_provider = DataProvider() + + # Test current price + logger.info("Testing current price retrieval...") + current_price = data_provider.get_current_price('ETH/USDT') + logger.info(f"Current ETH/USDT price: ${current_price}") + + # Test historical data + logger.info("Testing historical data retrieval...") + df = data_provider.get_historical_data('ETH/USDT', '1m', limit=5, refresh=True) + if df is not None and not df.empty: + logger.info(f"Historical data: {len(df)} rows") + logger.info(f"Latest price: ${df['close'].iloc[-1]:.2f}") + logger.info(f"Latest timestamp: {df.index[-1]}") + else: + logger.error("No historical data available!") + + return True + + except Exception as e: + logger.error(f"Data provider test failed: {e}") + return False + +def test_dashboard_api(): + """Test if dashboard API is responding""" + logger.info("=== TESTING DASHBOARD API ===") + + try: + # Test main dashboard page + response = requests.get("http://127.0.0.1:8050", timeout=5) + logger.info(f"Dashboard main page status: {response.status_code}") + + if response.status_code == 200: + logger.info("Dashboard is responding") + + # Check if there are any JavaScript errors in the page + content = response.text + if 'error' in content.lower(): + logger.warning("Possible errors found in dashboard HTML") + + return True + else: + logger.error(f"Dashboard returned status {response.status_code}") + return False + + except Exception as e: + logger.error(f"Dashboard API test failed: {e}") + return False + +def test_dashboard_callbacks(): + """Test dashboard callback updates""" + logger.info("=== TESTING DASHBOARD CALLBACKS ===") + + try: + # Test the callback endpoint (this would need to be exposed) + # For now, just check if the dashboard is serving content + + # Wait a bit and check again + time.sleep(2) + + response = requests.get("http://127.0.0.1:8050", timeout=5) + if response.status_code == 200: + logger.info("Dashboard callbacks appear to be working") + return True + else: + logger.error("Dashboard callbacks may be stuck") + return False + + except Exception as e: + logger.error(f"Dashboard callback test failed: {e}") + return False + +def main(): + """Run all diagnostic tests""" + logger.info("DASHBOARD DIAGNOSTIC TOOL") + logger.info("=" * 50) + + results = { + 'data_provider': test_data_provider(), + 'dashboard_api': test_dashboard_api(), + 'dashboard_callbacks': test_dashboard_callbacks() + } + + logger.info("=" * 50) + logger.info("DIAGNOSTIC RESULTS:") + + for test_name, result in results.items(): + status = "PASS" if result else "FAIL" + logger.info(f" {test_name}: {status}") + + if all(results.values()): + logger.info("All tests passed - issue may be browser-side") + logger.info("Try refreshing the dashboard at http://127.0.0.1:8050") + else: + logger.error("Issues detected - check logs above") + logger.info("Recommendations:") + + if not results['data_provider']: + logger.info(" - Check internet connection") + logger.info(" - Verify Binance API is accessible") + + if not results['dashboard_api']: + logger.info(" - Restart the dashboard") + logger.info(" - Check if port 8050 is blocked") + + if not results['dashboard_callbacks']: + logger.info(" - Dashboard may be frozen") + logger.info(" - Consider restarting") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/main_clean.py b/main_clean.py index 1c44e6b..40fbfcb 100644 --- a/main_clean.py +++ b/main_clean.py @@ -286,7 +286,7 @@ def run_web_dashboard(): data_provider = DataProvider() # Verify we have real data connection - logger.info("🔍 Verifying REAL data connection...") + logger.info("[DATA] Verifying REAL data connection...") test_data = data_provider.get_historical_data('ETH/USDT', '1m', limit=10, refresh=True) if test_data is None or test_data.empty: logger.warning("⚠️ No fresh data available - trying cached data...") diff --git a/web/dashboard.py b/web/dashboard.py index c71366b..ccb21b2 100644 --- a/web/dashboard.py +++ b/web/dashboard.py @@ -10,49 +10,57 @@ This module provides a modern, responsive web dashboard for the trading system: """ import asyncio -import json -import logging -import time -from datetime import datetime, timedelta, timezone -from threading import Thread -from typing import Dict, List, Optional, Any, Tuple -from collections import deque - -# Optional WebSocket support -try: - import websocket - import threading - WEBSOCKET_AVAILABLE = True -except ImportError: - WEBSOCKET_AVAILABLE = False - logger = logging.getLogger(__name__) - logger.warning("websocket-client not available. Install with: pip install websocket-client") - import dash -from dash import dcc, html, Input, Output, State, callback_context +from dash import dcc, html, Input, Output import plotly.graph_objects as go -import plotly.express as px from plotly.subplots import make_subplots +import plotly.express as px import pandas as pd import numpy as np +from datetime import datetime, timedelta, timezone +import pytz +import logging +import json +import time +import threading +from threading import Thread, Lock +from collections import deque +import warnings +from typing import Dict, List, Optional, Any, Union, Tuple +import websocket +# Setup logger immediately after logging import +logger = logging.getLogger(__name__) + +# WebSocket availability check +try: + import websocket + WEBSOCKET_AVAILABLE = True + logger.info("WebSocket client available") +except ImportError: + WEBSOCKET_AVAILABLE = False + logger.warning("websocket-client not available. Real-time data will use API fallback.") + +# Import trading system components from core.config import get_config from core.data_provider import DataProvider from core.orchestrator import TradingOrchestrator, TradingDecision from core.trading_executor import TradingExecutor +from core.trading_action import TradingAction +from models import get_model_registry -# Enhanced RL Training Integration + +# Import enhanced RL components if available try: + from core.enhanced_orchestrator import EnhancedTradingOrchestrator + from core.universal_data_adapter import UniversalDataAdapter from core.unified_data_stream import UnifiedDataStream, TrainingDataPacket, UIDataPacket - from core.enhanced_orchestrator import EnhancedTradingOrchestrator, MarketState, TradingAction - from training.enhanced_rl_trainer import EnhancedRLTrainer ENHANCED_RL_AVAILABLE = True - logger = logging.getLogger(__name__) logger.info("Enhanced RL training components available") except ImportError as e: + logger.warning(f"Enhanced RL components not available: {e}") ENHANCED_RL_AVAILABLE = False - logger = logging.getLogger(__name__) - logger.warning(f"Enhanced RL training not available: {e}") + # Fallback classes class UnifiedDataStream: def __init__(self, *args, **kwargs): pass @@ -68,36 +76,6 @@ except ImportError as e: class UIDataPacket: def __init__(self, *args, **kwargs): pass -# Try to import model registry, fallback if not available -try: - from models import get_model_registry -except ImportError: - logger.warning("Models module not available, creating fallback registry") - - class FallbackModelRegistry: - def __init__(self): - self.total_memory_limit_mb = 8192 # 8GB - self.models = {} - - def get_memory_stats(self): - return { - 'utilization_percent': 0, - 'total_used_mb': 0, - 'total_limit_mb': self.total_memory_limit_mb, - 'models': {} - } - - def get_models_by_type(self, model_type: str): - """Get models by type - fallback implementation returns empty dict""" - return {} - - def register_model(self, model, weight=1.0): - return True - - def get_model_registry(): - return FallbackModelRegistry() - -logger = logging.getLogger(__name__) class AdaptiveThresholdLearner: """Learn optimal confidence thresholds based on real trade outcomes""" @@ -190,11 +168,17 @@ class AdaptiveThresholdLearner: return {'error': str(e)} class TradingDashboard: - """Modern trading dashboard with real-time updates and enhanced RL training integration""" + """Enhanced Trading Dashboard with Williams pivot points and unified timezone handling""" def __init__(self, data_provider: DataProvider = None, orchestrator: TradingOrchestrator = None, trading_executor: TradingExecutor = None): """Initialize the dashboard with unified data stream and enhanced RL training""" self.config = get_config() + + # Initialize timezone from config + timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia') + self.timezone = pytz.timezone(timezone_name) + logger.info(f"Dashboard timezone set to: {timezone_name}") + self.data_provider = data_provider or DataProvider() # Enhanced orchestrator support @@ -314,16 +298,60 @@ class TradingDashboard: logger.info(f"Enhanced RL enabled: {self.enhanced_rl_training_enabled}") logger.info(f"Stream consumer ID: {self.stream_consumer_id}") + def _to_local_timezone(self, dt: datetime) -> datetime: + """Convert datetime to configured local timezone""" + try: + if dt is None: + return None + + # If datetime is naive, assume it's UTC + if dt.tzinfo is None: + dt = pytz.UTC.localize(dt) + + # Convert to local timezone + return dt.astimezone(self.timezone) + except Exception as e: + logger.warning(f"Error converting timezone: {e}") + return dt + + def _now_local(self) -> datetime: + """Get current time in configured local timezone""" + return datetime.now(self.timezone) + + def _ensure_timezone_consistency(self, df: pd.DataFrame) -> pd.DataFrame: + """Ensure DataFrame index is in consistent timezone""" + try: + if hasattr(df.index, 'tz'): + if df.index.tz is None: + # Assume UTC if no timezone + df.index = df.index.tz_localize('UTC') + + # Convert to local timezone + df.index = df.index.tz_convert(self.timezone) + + return df + except Exception as e: + logger.warning(f"Error ensuring timezone consistency: {e}") + return df + def _initialize_streaming(self): """Initialize unified data streaming and WebSocket fallback""" try: - if ENHANCED_RL_AVAILABLE: - # Start unified data stream - asyncio.run(self.unified_stream.start_streaming()) - logger.info("Unified data stream started") - - # Start WebSocket as backup/additional data source + # Start WebSocket first (non-blocking) self._start_websocket_stream() + logger.info("WebSocket streaming initialized") + + if ENHANCED_RL_AVAILABLE: + # Start unified data stream in background + def start_unified_stream(): + try: + asyncio.run(self.unified_stream.start_streaming()) + logger.info("Unified data stream started") + except Exception as e: + logger.error(f"Error starting unified stream: {e}") + + unified_thread = Thread(target=start_unified_stream, daemon=True) + unified_thread.start() # Start background data collection self._start_enhanced_training_data_collection() @@ -332,7 +360,7 @@ class TradingDashboard: except Exception as e: logger.error(f"Error initializing streaming: {e}") - # Fallback to WebSocket only + # Ensure WebSocket is started as fallback self._start_websocket_stream() def _start_enhanced_training_data_collection(self): @@ -855,20 +883,32 @@ class TradingDashboard: current_threshold = self.adaptive_learner.get_current_threshold() should_execute = signal['confidence'] >= current_threshold - if should_execute: + # Check position limits before execution + can_execute = self._can_execute_new_position(signal['action']) + + if should_execute and can_execute: signal['signal_type'] = 'EXECUTED' signal['threshold_used'] = current_threshold # Track threshold for learning signal['reason'] = f"ADAPTIVE EXECUTE (≥{current_threshold:.2%}): {signal['reason']}" logger.debug(f"[EXECUTE] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%} ≥ {current_threshold:.1%})") self._process_trading_decision(signal) + elif should_execute and not can_execute: + # Signal meets confidence but we're at position limit + signal['signal_type'] = 'NOT_EXECUTED_POSITION_LIMIT' + signal['threshold_used'] = current_threshold + signal['reason'] = f"BLOCKED BY POSITION LIMIT (≥{current_threshold:.2%}): {signal['reason']} [Positions: {self._count_open_positions()}/{self.config.get('trading', {}).get('max_concurrent_positions', 3)}]" + logger.info(f"[BLOCKED] {signal['action']} signal @ ${signal['price']:.2f} - Position limit reached ({self._count_open_positions()}/{self.config.get('trading', {}).get('max_concurrent_positions', 3)})") + + # Still add to training queue for RL learning + self._queue_signal_for_training(signal, current_price, symbol) else: - signal['signal_type'] = 'IGNORED' - signal['reason'] = f"ADAPTIVE IGNORE (<{current_threshold:.2%}): {signal['reason']}" - logger.debug(f"[IGNORE] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%} < {current_threshold:.1%})") - # Add to recent decisions for display but don't execute trade - self.recent_decisions.append(signal) - if len(self.recent_decisions) > 500: # Keep last 500 decisions - self.recent_decisions = self.recent_decisions[-500:] + signal['signal_type'] = 'NOT_EXECUTED_LOW_CONFIDENCE' + signal['threshold_used'] = current_threshold + signal['reason'] = f"LOW CONFIDENCE (<{current_threshold:.2%}): {signal['reason']}" + logger.debug(f"[SKIP] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%} < {current_threshold:.1%})") + + # Still add to training queue for RL learning + self._queue_signal_for_training(signal, current_price, symbol) else: # Fallback: Add a simple monitoring update if n_intervals % 10 == 0 and current_price: # Every 10 seconds @@ -1100,7 +1140,7 @@ class TradingDashboard: return fig def _create_price_chart(self, symbol: str) -> go.Figure: - """Create enhanced 1-second price chart with volume from WebSocket stream""" + """Create enhanced 1-second price chart with volume and Williams pivot points from WebSocket stream""" try: # Get 1-second bars from WebSocket stream df = self.get_one_second_bars(count=300) # Last 5 minutes of 1s bars @@ -1111,6 +1151,8 @@ class TradingDashboard: try: df = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=True) if df is not None and not df.empty: + # Ensure timezone consistency for fallback data + df = self._ensure_timezone_consistency(df) # Add volume column if missing if 'volume' not in df.columns: df['volume'] = 100 # Default volume for demo @@ -1127,15 +1169,17 @@ class TradingDashboard: f"Chart Error: {str(e)}" ) else: + # Ensure timezone consistency for WebSocket data + df = self._ensure_timezone_consistency(df) actual_timeframe = '1s' - logger.debug(f"[CHART] Using {len(df)} 1s bars from WebSocket stream") + logger.debug(f"[CHART] Using {len(df)} 1s bars from WebSocket stream in {self.timezone}") # Create subplot with secondary y-axis for volume fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, - subplot_titles=(f'{symbol} Price ({actual_timeframe.upper()})', 'Volume'), + subplot_titles=(f'{symbol} Price ({actual_timeframe.upper()}) with Williams Pivot Points', 'Volume'), row_heights=[0.7, 0.3] ) @@ -1152,6 +1196,14 @@ class TradingDashboard: row=1, col=1 ) + # Add Williams Market Structure pivot points + try: + pivot_points = self._get_williams_pivot_points_for_chart(df) + if pivot_points: + self._add_williams_pivot_points_to_chart(fig, pivot_points, row=1) + except Exception as e: + logger.debug(f"Error adding Williams pivot points to chart: {e}") + # Add moving averages if we have enough data if len(df) >= 20: # 20-period SMA @@ -1240,12 +1292,12 @@ class TradingDashboard: # Add BUY markers with different styles for executed vs ignored executed_buys = [d[0] for d in buy_decisions if d[1] == 'EXECUTED'] - ignored_buys = [d[0] for d in buy_decisions if d[1] == 'IGNORED'] + ignored_buys = [d[0] for d in buy_decisions if d[1] in ['NOT_EXECUTED_POSITION_LIMIT', 'NOT_EXECUTED_LOW_CONFIDENCE']] if executed_buys: fig.add_trace( go.Scatter( - x=[d['timestamp'] for d in executed_buys], + x=[self._to_local_timezone(d['timestamp']) for d in executed_buys], y=[d['price'] for d in executed_buys], mode='markers', marker=dict( @@ -1256,7 +1308,8 @@ class TradingDashboard: ), name="BUY (Executed)", showlegend=True, - hovertemplate="BUY EXECUTED
Price: $%{y:.2f}
Time: %{x}
" + hovertemplate="BUY EXECUTED
Price: $%{y:.2f}
Time: %{x}
Confidence: %{customdata:.1%}", + customdata=[d.get('confidence', 0) for d in executed_buys] ), row=1, col=1 ) @@ -1264,7 +1317,7 @@ class TradingDashboard: if ignored_buys: fig.add_trace( go.Scatter( - x=[d['timestamp'] for d in ignored_buys], + x=[self._to_local_timezone(d['timestamp']) for d in ignored_buys], y=[d['price'] for d in ignored_buys], mode='markers', marker=dict( @@ -1273,21 +1326,22 @@ class TradingDashboard: symbol='triangle-up-open', line=dict(color='#00ff88', width=2) ), - name="BUY (Ignored)", + name="BUY (Blocked)", showlegend=True, - hovertemplate="BUY IGNORED
Price: $%{y:.2f}
Time: %{x}
" + hovertemplate="BUY BLOCKED
Price: $%{y:.2f}
Time: %{x}
Confidence: %{customdata:.1%}", + customdata=[d.get('confidence', 0) for d in ignored_buys] ), row=1, col=1 ) # Add SELL markers with different styles for executed vs ignored executed_sells = [d[0] for d in sell_decisions if d[1] == 'EXECUTED'] - ignored_sells = [d[0] for d in sell_decisions if d[1] == 'IGNORED'] + ignored_sells = [d[0] for d in sell_decisions if d[1] in ['NOT_EXECUTED_POSITION_LIMIT', 'NOT_EXECUTED_LOW_CONFIDENCE']] if executed_sells: fig.add_trace( go.Scatter( - x=[d['timestamp'] for d in executed_sells], + x=[self._to_local_timezone(d['timestamp']) for d in executed_sells], y=[d['price'] for d in executed_sells], mode='markers', marker=dict( @@ -1298,7 +1352,8 @@ class TradingDashboard: ), name="SELL (Executed)", showlegend=True, - hovertemplate="SELL EXECUTED
Price: $%{y:.2f}
Time: %{x}
" + hovertemplate="SELL EXECUTED
Price: $%{y:.2f}
Time: %{x}
Confidence: %{customdata:.1%}", + customdata=[d.get('confidence', 0) for d in executed_sells] ), row=1, col=1 ) @@ -1306,7 +1361,7 @@ class TradingDashboard: if ignored_sells: fig.add_trace( go.Scatter( - x=[d['timestamp'] for d in ignored_sells], + x=[self._to_local_timezone(d['timestamp']) for d in ignored_sells], y=[d['price'] for d in ignored_sells], mode='markers', marker=dict( @@ -1315,9 +1370,10 @@ class TradingDashboard: symbol='triangle-down-open', line=dict(color='#ff6b6b', width=2) ), - name="SELL (Ignored)", + name="SELL (Blocked)", showlegend=True, - hovertemplate="SELL IGNORED
Price: $%{y:.2f}
Time: %{x}
" + hovertemplate="SELL BLOCKED
Price: $%{y:.2f}
Time: %{x}
Confidence: %{customdata:.1%}", + customdata=[d.get('confidence', 0) for d in ignored_sells] ), row=1, col=1 ) @@ -2801,14 +2857,15 @@ class TradingDashboard: self.is_streaming = False def _process_tick_data(self, tick_data: Dict): - """Process incoming tick data and update 1-second bars""" + """Process incoming tick data and update 1-second bars with consistent timezone""" try: # Extract price and volume from Binance ticker data price = float(tick_data.get('c', 0)) # Current price volume = float(tick_data.get('v', 0)) # 24h volume - timestamp = datetime.now(timezone.utc) + # Use configured timezone instead of UTC for consistency + timestamp = self._now_local() - # Add to tick cache + # Add to tick cache with consistent timezone tick = { 'timestamp': timestamp, 'price': price, @@ -2821,7 +2878,7 @@ class TradingDashboard: self.tick_cache.append(tick) - # Update current second bar + # Update current second bar using local timezone current_second = timestamp.replace(microsecond=0) if self.current_second_data['timestamp'] != current_second: @@ -2849,6 +2906,8 @@ class TradingDashboard: # Update current price for dashboard self.current_prices[tick_data.get('s', 'ETHUSDT')] = price + logger.debug(f"[TICK] Processed tick at {timestamp.strftime('%H:%M:%S')} {self.timezone.zone}: ${price:.2f}") + except Exception as e: logger.warning(f"Error processing tick data: {e}") @@ -2889,7 +2948,7 @@ class TradingDashboard: return [] def get_one_second_bars(self, count: int = 300) -> pd.DataFrame: - """Get recent 1-second bars as DataFrame""" + """Get recent 1-second bars as DataFrame with consistent timezone""" try: if len(self.one_second_bars) == 0: return pd.DataFrame() @@ -2902,6 +2961,11 @@ class TradingDashboard: if not df.empty: df.set_index('timestamp', inplace=True) df.sort_index(inplace=True) + + # Ensure timezone consistency + df = self._ensure_timezone_consistency(df) + + logger.debug(f"[BARS] Retrieved {len(df)} 1s bars in {self.timezone.zone}") return df @@ -4274,16 +4338,16 @@ class TradingDashboard: logger.warning(f"Error calculating Williams pivot points: {e}") state_features.extend([0.0] * 250) # Default features - # Add multi-timeframe OHLCV features (300 features) + # Add multi-timeframe OHLCV features (200 features: ETH 1s/1m/1d + BTC 1s) try: multi_tf_features = self._get_multi_timeframe_features(training_episode.get('symbol', 'ETH/USDT')) if multi_tf_features: state_features.extend(multi_tf_features) else: - state_features.extend([0.0] * 300) # Default if calculation fails + state_features.extend([0.0] * 200) # Default if calculation fails except Exception as e: logger.warning(f"Error calculating multi-timeframe features: {e}") - state_features.extend([0.0] * 300) # Default features + state_features.extend([0.0] * 200) # Default features # Add trade-specific context entry_price = training_episode['entry_price'] @@ -4458,7 +4522,7 @@ class TradingDashboard: return None ohlcv_array = np.array([ - [df.index[i].timestamp() if hasattr(df.index[i], 'timestamp') else time.time(), + [self._to_local_timezone(df.index[i]).timestamp() if hasattr(df.index[i], 'timestamp') else time.time(), df['open'].iloc[i], df['high'].iloc[i], df['low'].iloc[i], df['close'].iloc[i], df['volume'].iloc[i]] for i in range(len(df)) @@ -4479,20 +4543,44 @@ class TradingDashboard: return None def _get_multi_timeframe_features(self, symbol: str) -> Optional[List[float]]: - """Get multi-timeframe OHLCV features for comprehensive market context""" + """Get multi-timeframe OHLCV features for ETH and BTC only (focused timeframes)""" try: features = [] - timeframes = ['1m', '5m', '15m', '1h', '4h', '1d'] # 6 timeframes + + # Focus only on key timeframes for ETH and BTC + if symbol.startswith('ETH'): + timeframes = ['1s', '1m', '1d'] # ETH: 3 key timeframes + target_symbol = 'ETH/USDT' + elif symbol.startswith('BTC'): + timeframes = ['1s'] # BTC: only 1s for reference + target_symbol = 'BTC/USDT' + else: + # Default to ETH if unknown symbol + timeframes = ['1s', '1m', '1d'] + target_symbol = 'ETH/USDT' for timeframe in timeframes: try: # Get data for this timeframe - df = self.data_provider.get_historical_data( - symbol=symbol, - timeframe=timeframe, - limit=50, # Last 50 bars - refresh=True - ) + if timeframe == '1s': + # For 1s data, use our tick aggregation + df = self.get_one_second_bars(count=60) # Last 60 seconds + if df.empty: + # Fallback to 1m data + df = self.data_provider.get_historical_data( + symbol=target_symbol, + timeframe='1m', + limit=50, + refresh=True + ) + else: + # Get historical data for other timeframes + df = self.data_provider.get_historical_data( + symbol=target_symbol, + timeframe=timeframe, + limit=50, # Last 50 bars + refresh=True + ) if df is not None and not df.empty and len(df) >= 10: # Calculate normalized features for this timeframe @@ -4503,14 +4591,40 @@ class TradingDashboard: features.extend([0.0] * 50) # 50 features per timeframe except Exception as e: - logger.debug(f"Error getting {timeframe} data: {e}") + logger.debug(f"Error getting {timeframe} data for {target_symbol}: {e}") features.extend([0.0] * 50) # 50 features per timeframe - # Total: 6 timeframes * 50 features = 300 features - return features[:300] + # Pad to ensure consistent feature count + # ETH: 3 timeframes * 50 = 150 features + # BTC: 1 timeframe * 50 = 50 features + # Total expected: 200 features (150 ETH + 50 BTC) + + # Add BTC 1s data if we're processing ETH (for correlation analysis) + if symbol.startswith('ETH'): + try: + btc_1s_df = self.get_one_second_bars(count=60, symbol='BTC/USDT') + if btc_1s_df.empty: + btc_1s_df = self.data_provider.get_historical_data( + symbol='BTC/USDT', + timeframe='1m', + limit=50, + refresh=True + ) + + if btc_1s_df is not None and not btc_1s_df.empty and len(btc_1s_df) >= 10: + btc_features = self._extract_timeframe_features(btc_1s_df, '1s_btc') + features.extend(btc_features) + else: + features.extend([0.0] * 50) # BTC features + except Exception as e: + logger.debug(f"Error getting BTC correlation data: {e}") + features.extend([0.0] * 50) # BTC features + + # Total: ETH(150) + BTC(50) = 200 features (reduced from 300) + return features[:200] except Exception as e: - logger.warning(f"Error calculating multi-timeframe features: {e}") + logger.warning(f"Error calculating focused multi-timeframe features: {e}") return None def _extract_timeframe_features(self, df: pd.DataFrame, timeframe: str) -> List[float]: @@ -4647,6 +4761,276 @@ class TradingDashboard: logger.warning(f"Error extracting features for {timeframe}: {e}") return [0.0] * 50 + def _get_williams_pivot_points_for_chart(self, df: pd.DataFrame) -> Optional[Dict]: + """Calculate Williams pivot points specifically for chart visualization with consistent timezone""" + try: + # Import Williams Market Structure + try: + from training.williams_market_structure import WilliamsMarketStructure + except ImportError: + logger.warning("Williams Market Structure not available for chart") + return None + + # Need at least 50 bars for meaningful pivot calculation + if len(df) < 50: + logger.debug(f"[WILLIAMS] Insufficient data for pivot calculation: {len(df)} bars") + return None + + # Ensure timezone consistency for the chart data + df = self._ensure_timezone_consistency(df) + + # Convert DataFrame to numpy array for Williams calculation with proper timezone handling + ohlcv_array = [] + for i in range(len(df)): + timestamp = df.index[i] + + # Convert timestamp to local timezone and then to Unix timestamp + if hasattr(timestamp, 'timestamp'): + local_time = self._to_local_timezone(timestamp) + unix_timestamp = local_time.timestamp() + else: + unix_timestamp = time.time() + + ohlcv_array.append([ + unix_timestamp, + df['open'].iloc[i], + df['high'].iloc[i], + df['low'].iloc[i], + df['close'].iloc[i], + df['volume'].iloc[i] + ]) + + ohlcv_array = np.array(ohlcv_array) + + # Calculate Williams pivot points + williams = WilliamsMarketStructure() + structure_levels = williams.calculate_recursive_pivot_points(ohlcv_array) + + # Extract pivot points for chart display + chart_pivots = {} + level_colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'] # Different colors per level + level_sizes = [8, 7, 6, 5, 4] # Different sizes for each level + + total_pivots = 0 + for level in range(5): + level_key = f'level_{level}' + if level_key in structure_levels: + level_data = structure_levels[level_key] + swing_points = level_data.swing_points + + if swing_points: + # Log swing point details for validation + highs = [s for s in swing_points if s.swing_type.name == 'SWING_HIGH'] + lows = [s for s in swing_points if s.swing_type.name == 'SWING_LOW'] + logger.debug(f"[WILLIAMS] Level {level}: {len(highs)} highs, {len(lows)} lows, total: {len(swing_points)}") + + # Convert swing points to chart format + chart_pivots[f'level_{level}'] = { + 'swing_points': swing_points, + 'color': level_colors[level], + 'name': f'L{level + 1} Pivots', # Shorter name + 'size': level_sizes[level], # Different sizes for validation + 'opacity': max(0.9 - (level * 0.15), 0.4) # High opacity for validation + } + total_pivots += len(swing_points) + + logger.info(f"[WILLIAMS] Calculated {total_pivots} total pivot points across {len(chart_pivots)} levels") + return chart_pivots + + except Exception as e: + logger.warning(f"Error calculating Williams pivot points for chart: {e}") + return None + + def _add_williams_pivot_points_to_chart(self, fig, pivot_points: Dict, row: int = 1): + """Add Williams pivot points as small triangles to the chart with proper timezone conversion""" + try: + for level_key, level_data in pivot_points.items(): + swing_points = level_data['swing_points'] + if not swing_points: + continue + + # Validate swing alternation (shouldn't have consecutive highs or lows) + self._validate_swing_alternation(swing_points, level_key) + + # Separate swing highs and lows + swing_highs_x = [] + swing_highs_y = [] + swing_lows_x = [] + swing_lows_y = [] + + for swing in swing_points: + # Ensure proper timezone conversion for swing point timestamps + if hasattr(swing, 'timestamp'): + timestamp = swing.timestamp + + # Convert swing timestamp to local timezone + if isinstance(timestamp, datetime): + # Williams Market Structure creates naive datetimes that are actually in local time + # but without timezone info, so we need to localize them to our configured timezone + if timestamp.tzinfo is None: + # Williams creates timestamps in local time (Europe/Sofia), so localize directly + local_timestamp = self.timezone.localize(timestamp) + else: + # If it has timezone info, convert to local timezone + local_timestamp = timestamp.astimezone(self.timezone) + else: + # Fallback if timestamp is not a datetime + local_timestamp = self._now_local() + else: + local_timestamp = self._now_local() + + price = swing.price + + if swing.swing_type.name == 'SWING_HIGH': + swing_highs_x.append(local_timestamp) + swing_highs_y.append(price) + elif swing.swing_type.name == 'SWING_LOW': + swing_lows_x.append(local_timestamp) + swing_lows_y.append(price) + + # Add swing highs (triangle-up above the price) + if swing_highs_x: + fig.add_trace( + go.Scatter( + x=swing_highs_x, + y=swing_highs_y, + mode='markers', + name=f"{level_data['name']} (Highs)", + marker=dict( + color=level_data['color'], + size=max(level_data['size'] - 2, 4), # Smaller triangles + symbol='triangle-up', # Triangle pointing up for highs + line=dict(color='white', width=1), + opacity=level_data['opacity'] + ), + hovertemplate=f'Swing High
Price: $%{{y:.2f}}
%{{x}}
{level_data["name"]}
Strength: {swing_points[0].strength if swing_points else "N/A"}', + showlegend=True + ), + row=row, col=1 + ) + + # Add swing lows (triangle-down below the price) + if swing_lows_x: + fig.add_trace( + go.Scatter( + x=swing_lows_x, + y=swing_lows_y, + mode='markers', + name=f"{level_data['name']} (Lows)", + marker=dict( + color=level_data['color'], + size=max(level_data['size'] - 2, 4), # Smaller triangles + symbol='triangle-down', # Triangle pointing down for lows + line=dict(color='white', width=1), + opacity=level_data['opacity'] + ), + hovertemplate=f'Swing Low
Price: $%{{y:.2f}}
%{{x}}
{level_data["name"]}
Strength: {swing_points[0].strength if swing_points else "N/A"}', + showlegend=True, + legendgroup=level_data['name'] # Group with highs in legend + ), + row=row, col=1 + ) + + logger.debug(f"[CHART] Added Williams pivot points as triangles with proper timezone conversion") + + except Exception as e: + logger.warning(f"Error adding Williams pivot points to chart: {e}") + + def _validate_swing_alternation(self, swing_points: List, level_key: str): + """Validate that swing points alternate correctly between highs and lows""" + try: + if len(swing_points) < 2: + return + + # Sort by index to check chronological order + sorted_swings = sorted(swing_points, key=lambda x: x.index) + + consecutive_issues = 0 + last_type = None + + for i, swing in enumerate(sorted_swings): + current_type = swing.swing_type.name + + if last_type and last_type == current_type: + consecutive_issues += 1 + logger.debug(f"[WILLIAMS_VALIDATION] {level_key}: Consecutive {current_type} at index {swing.index}") + + last_type = current_type + + if consecutive_issues > 0: + logger.warning(f"[WILLIAMS_VALIDATION] {level_key}: Found {consecutive_issues} consecutive swing issues") + else: + logger.debug(f"[WILLIAMS_VALIDATION] {level_key}: Swing alternation is correct ({len(sorted_swings)} swings)") + + except Exception as e: + logger.warning(f"Error validating swing alternation: {e}") + + def _can_execute_new_position(self, action): + """Check if a new position can be executed based on current position limits""" + try: + # Get max concurrent positions from config + max_positions = self.config.get('trading', {}).get('max_concurrent_positions', 3) + current_open_positions = self._count_open_positions() + + # Check if we can open a new position + if current_open_positions >= max_positions: + logger.debug(f"[POSITION_LIMIT] Cannot execute {action} - at max positions ({current_open_positions}/{max_positions})") + return False + + # Additional check: if we have a current position, only allow closing trades + if self.current_position: + current_side = self.current_position['side'] + if current_side == 'LONG' and action == 'BUY': + return False # Already long, can't buy more + elif current_side == 'SHORT' and action == 'SELL': + return False # Already short, can't sell more + + return True + + except Exception as e: + logger.error(f"Error checking position limits: {e}") + return False + + def _count_open_positions(self): + """Count current open positions""" + try: + # Simple count: 1 if we have a current position, 0 otherwise + return 1 if self.current_position else 0 + except Exception as e: + logger.error(f"Error counting open positions: {e}") + return 0 + + def _queue_signal_for_training(self, signal, current_price, symbol): + """Add a signal to training queue for RL learning (even if not executed)""" + try: + # Add to recent decisions for display + signal['timestamp'] = datetime.now() + self.recent_decisions.append(signal.copy()) + if len(self.recent_decisions) > 500: + self.recent_decisions = self.recent_decisions[-500:] + + # Create synthetic trade for RL training + training_trade = { + 'trade_id': f"training_{len(self.closed_trades) + 1}", + 'symbol': symbol, + 'side': 'LONG' if signal['action'] == 'BUY' else 'SHORT', + 'entry_price': current_price, + 'exit_price': current_price, # Immediate close for training + 'size': 0.01, # Small size for training + 'net_pnl': 0.0, # Neutral outcome for blocked signals + 'fees': 0.001, + 'duration': timedelta(seconds=1), + 'timestamp': datetime.now(), + 'mexc_executed': False + } + + # Trigger RL training with this synthetic trade + self._trigger_rl_training_on_closed_trade(training_trade) + + logger.debug(f"[TRAINING] Queued {signal['action']} signal for RL learning") + + except Exception as e: + logger.warning(f"Error queuing signal for training: {e}") def create_dashboard(data_provider: DataProvider = None, orchestrator: TradingOrchestrator = None, trading_executor: TradingExecutor = None) -> TradingDashboard: """Factory function to create a trading dashboard"""