pivot points

This commit is contained in:
Dobromir Popov 2025-05-29 21:03:26 +03:00
parent 3e697acf08
commit 7e4b29fdc2
4 changed files with 642 additions and 110 deletions

View File

@ -1,5 +1,11 @@
# Enhanced Multi-Modal Trading System Configuration
# System Settings
system:
timezone: "Europe/Sofia" # Configurable timezone for all timestamps
log_level: "INFO" # DEBUG, INFO, WARNING, ERROR
session_timeout: 3600 # Session timeout in seconds
# Trading Symbols (extendable/configurable)
symbols:
- "ETH/USDC" # MEXC supports ETHUSDC for API trading
@ -161,7 +167,7 @@ mexc_trading:
# Risk management
max_daily_loss_usd: 5.0 # Stop trading if daily loss exceeds $5
max_concurrent_positions: 3 # Only 1 position at a time for testing
max_trades_per_hour: 60 # Maximum 60 trades per hour
max_trades_per_hour: 600 # Maximum 60 trades per hour
min_trade_interval_seconds: 30 # Minimum between trades
# Order configuration

142
debug_dashboard_issue.py Normal file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Debug Dashboard Data Flow
Check if the dashboard is receiving data and updating properly.
"""
import sys
import logging
import time
import requests
import json
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from core.config import get_config, setup_logging
from core.data_provider import DataProvider
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
def test_data_provider():
"""Test if data provider is working"""
logger.info("=== TESTING DATA PROVIDER ===")
try:
# Test data provider
data_provider = DataProvider()
# Test current price
logger.info("Testing current price retrieval...")
current_price = data_provider.get_current_price('ETH/USDT')
logger.info(f"Current ETH/USDT price: ${current_price}")
# Test historical data
logger.info("Testing historical data retrieval...")
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=5, refresh=True)
if df is not None and not df.empty:
logger.info(f"Historical data: {len(df)} rows")
logger.info(f"Latest price: ${df['close'].iloc[-1]:.2f}")
logger.info(f"Latest timestamp: {df.index[-1]}")
else:
logger.error("No historical data available!")
return True
except Exception as e:
logger.error(f"Data provider test failed: {e}")
return False
def test_dashboard_api():
"""Test if dashboard API is responding"""
logger.info("=== TESTING DASHBOARD API ===")
try:
# Test main dashboard page
response = requests.get("http://127.0.0.1:8050", timeout=5)
logger.info(f"Dashboard main page status: {response.status_code}")
if response.status_code == 200:
logger.info("Dashboard is responding")
# Check if there are any JavaScript errors in the page
content = response.text
if 'error' in content.lower():
logger.warning("Possible errors found in dashboard HTML")
return True
else:
logger.error(f"Dashboard returned status {response.status_code}")
return False
except Exception as e:
logger.error(f"Dashboard API test failed: {e}")
return False
def test_dashboard_callbacks():
"""Test dashboard callback updates"""
logger.info("=== TESTING DASHBOARD CALLBACKS ===")
try:
# Test the callback endpoint (this would need to be exposed)
# For now, just check if the dashboard is serving content
# Wait a bit and check again
time.sleep(2)
response = requests.get("http://127.0.0.1:8050", timeout=5)
if response.status_code == 200:
logger.info("Dashboard callbacks appear to be working")
return True
else:
logger.error("Dashboard callbacks may be stuck")
return False
except Exception as e:
logger.error(f"Dashboard callback test failed: {e}")
return False
def main():
"""Run all diagnostic tests"""
logger.info("DASHBOARD DIAGNOSTIC TOOL")
logger.info("=" * 50)
results = {
'data_provider': test_data_provider(),
'dashboard_api': test_dashboard_api(),
'dashboard_callbacks': test_dashboard_callbacks()
}
logger.info("=" * 50)
logger.info("DIAGNOSTIC RESULTS:")
for test_name, result in results.items():
status = "PASS" if result else "FAIL"
logger.info(f" {test_name}: {status}")
if all(results.values()):
logger.info("All tests passed - issue may be browser-side")
logger.info("Try refreshing the dashboard at http://127.0.0.1:8050")
else:
logger.error("Issues detected - check logs above")
logger.info("Recommendations:")
if not results['data_provider']:
logger.info(" - Check internet connection")
logger.info(" - Verify Binance API is accessible")
if not results['dashboard_api']:
logger.info(" - Restart the dashboard")
logger.info(" - Check if port 8050 is blocked")
if not results['dashboard_callbacks']:
logger.info(" - Dashboard may be frozen")
logger.info(" - Consider restarting")
if __name__ == "__main__":
main()

View File

@ -286,7 +286,7 @@ def run_web_dashboard():
data_provider = DataProvider()
# Verify we have real data connection
logger.info("🔍 Verifying REAL data connection...")
logger.info("[DATA] Verifying REAL data connection...")
test_data = data_provider.get_historical_data('ETH/USDT', '1m', limit=10, refresh=True)
if test_data is None or test_data.empty:
logger.warning("⚠️ No fresh data available - trying cached data...")

View File

@ -10,49 +10,57 @@ This module provides a modern, responsive web dashboard for the trading system:
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta, timezone
from threading import Thread
from typing import Dict, List, Optional, Any, Tuple
from collections import deque
# Optional WebSocket support
try:
import websocket
import threading
WEBSOCKET_AVAILABLE = True
except ImportError:
WEBSOCKET_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("websocket-client not available. Install with: pip install websocket-client")
import dash
from dash import dcc, html, Input, Output, State, callback_context
from dash import dcc, html, Input, Output
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.express as px
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
import pytz
import logging
import json
import time
import threading
from threading import Thread, Lock
from collections import deque
import warnings
from typing import Dict, List, Optional, Any, Union, Tuple
import websocket
# Setup logger immediately after logging import
logger = logging.getLogger(__name__)
# WebSocket availability check
try:
import websocket
WEBSOCKET_AVAILABLE = True
logger.info("WebSocket client available")
except ImportError:
WEBSOCKET_AVAILABLE = False
logger.warning("websocket-client not available. Real-time data will use API fallback.")
# Import trading system components
from core.config import get_config
from core.data_provider import DataProvider
from core.orchestrator import TradingOrchestrator, TradingDecision
from core.trading_executor import TradingExecutor
from core.trading_action import TradingAction
from models import get_model_registry
# Enhanced RL Training Integration
# Import enhanced RL components if available
try:
from core.enhanced_orchestrator import EnhancedTradingOrchestrator
from core.universal_data_adapter import UniversalDataAdapter
from core.unified_data_stream import UnifiedDataStream, TrainingDataPacket, UIDataPacket
from core.enhanced_orchestrator import EnhancedTradingOrchestrator, MarketState, TradingAction
from training.enhanced_rl_trainer import EnhancedRLTrainer
ENHANCED_RL_AVAILABLE = True
logger = logging.getLogger(__name__)
logger.info("Enhanced RL training components available")
except ImportError as e:
logger.warning(f"Enhanced RL components not available: {e}")
ENHANCED_RL_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning(f"Enhanced RL training not available: {e}")
# Fallback classes
class UnifiedDataStream:
def __init__(self, *args, **kwargs): pass
@ -68,36 +76,6 @@ except ImportError as e:
class UIDataPacket:
def __init__(self, *args, **kwargs): pass
# Try to import model registry, fallback if not available
try:
from models import get_model_registry
except ImportError:
logger.warning("Models module not available, creating fallback registry")
class FallbackModelRegistry:
def __init__(self):
self.total_memory_limit_mb = 8192 # 8GB
self.models = {}
def get_memory_stats(self):
return {
'utilization_percent': 0,
'total_used_mb': 0,
'total_limit_mb': self.total_memory_limit_mb,
'models': {}
}
def get_models_by_type(self, model_type: str):
"""Get models by type - fallback implementation returns empty dict"""
return {}
def register_model(self, model, weight=1.0):
return True
def get_model_registry():
return FallbackModelRegistry()
logger = logging.getLogger(__name__)
class AdaptiveThresholdLearner:
"""Learn optimal confidence thresholds based on real trade outcomes"""
@ -190,11 +168,17 @@ class AdaptiveThresholdLearner:
return {'error': str(e)}
class TradingDashboard:
"""Modern trading dashboard with real-time updates and enhanced RL training integration"""
"""Enhanced Trading Dashboard with Williams pivot points and unified timezone handling"""
def __init__(self, data_provider: DataProvider = None, orchestrator: TradingOrchestrator = None, trading_executor: TradingExecutor = None):
"""Initialize the dashboard with unified data stream and enhanced RL training"""
self.config = get_config()
# Initialize timezone from config
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
self.timezone = pytz.timezone(timezone_name)
logger.info(f"Dashboard timezone set to: {timezone_name}")
self.data_provider = data_provider or DataProvider()
# Enhanced orchestrator support
@ -314,16 +298,60 @@ class TradingDashboard:
logger.info(f"Enhanced RL enabled: {self.enhanced_rl_training_enabled}")
logger.info(f"Stream consumer ID: {self.stream_consumer_id}")
def _to_local_timezone(self, dt: datetime) -> datetime:
"""Convert datetime to configured local timezone"""
try:
if dt is None:
return None
# If datetime is naive, assume it's UTC
if dt.tzinfo is None:
dt = pytz.UTC.localize(dt)
# Convert to local timezone
return dt.astimezone(self.timezone)
except Exception as e:
logger.warning(f"Error converting timezone: {e}")
return dt
def _now_local(self) -> datetime:
"""Get current time in configured local timezone"""
return datetime.now(self.timezone)
def _ensure_timezone_consistency(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure DataFrame index is in consistent timezone"""
try:
if hasattr(df.index, 'tz'):
if df.index.tz is None:
# Assume UTC if no timezone
df.index = df.index.tz_localize('UTC')
# Convert to local timezone
df.index = df.index.tz_convert(self.timezone)
return df
except Exception as e:
logger.warning(f"Error ensuring timezone consistency: {e}")
return df
def _initialize_streaming(self):
"""Initialize unified data streaming and WebSocket fallback"""
try:
# Start WebSocket first (non-blocking)
self._start_websocket_stream()
logger.info("WebSocket streaming initialized")
if ENHANCED_RL_AVAILABLE:
# Start unified data stream
# Start unified data stream in background
def start_unified_stream():
try:
asyncio.run(self.unified_stream.start_streaming())
logger.info("Unified data stream started")
except Exception as e:
logger.error(f"Error starting unified stream: {e}")
# Start WebSocket as backup/additional data source
self._start_websocket_stream()
unified_thread = Thread(target=start_unified_stream, daemon=True)
unified_thread.start()
# Start background data collection
self._start_enhanced_training_data_collection()
@ -332,7 +360,7 @@ class TradingDashboard:
except Exception as e:
logger.error(f"Error initializing streaming: {e}")
# Fallback to WebSocket only
# Ensure WebSocket is started as fallback
self._start_websocket_stream()
def _start_enhanced_training_data_collection(self):
@ -855,20 +883,32 @@ class TradingDashboard:
current_threshold = self.adaptive_learner.get_current_threshold()
should_execute = signal['confidence'] >= current_threshold
if should_execute:
# Check position limits before execution
can_execute = self._can_execute_new_position(signal['action'])
if should_execute and can_execute:
signal['signal_type'] = 'EXECUTED'
signal['threshold_used'] = current_threshold # Track threshold for learning
signal['reason'] = f"ADAPTIVE EXECUTE (≥{current_threshold:.2%}): {signal['reason']}"
logger.debug(f"[EXECUTE] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%}{current_threshold:.1%})")
self._process_trading_decision(signal)
elif should_execute and not can_execute:
# Signal meets confidence but we're at position limit
signal['signal_type'] = 'NOT_EXECUTED_POSITION_LIMIT'
signal['threshold_used'] = current_threshold
signal['reason'] = f"BLOCKED BY POSITION LIMIT (≥{current_threshold:.2%}): {signal['reason']} [Positions: {self._count_open_positions()}/{self.config.get('trading', {}).get('max_concurrent_positions', 3)}]"
logger.info(f"[BLOCKED] {signal['action']} signal @ ${signal['price']:.2f} - Position limit reached ({self._count_open_positions()}/{self.config.get('trading', {}).get('max_concurrent_positions', 3)})")
# Still add to training queue for RL learning
self._queue_signal_for_training(signal, current_price, symbol)
else:
signal['signal_type'] = 'IGNORED'
signal['reason'] = f"ADAPTIVE IGNORE (<{current_threshold:.2%}): {signal['reason']}"
logger.debug(f"[IGNORE] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%} < {current_threshold:.1%})")
# Add to recent decisions for display but don't execute trade
self.recent_decisions.append(signal)
if len(self.recent_decisions) > 500: # Keep last 500 decisions
self.recent_decisions = self.recent_decisions[-500:]
signal['signal_type'] = 'NOT_EXECUTED_LOW_CONFIDENCE'
signal['threshold_used'] = current_threshold
signal['reason'] = f"LOW CONFIDENCE (<{current_threshold:.2%}): {signal['reason']}"
logger.debug(f"[SKIP] {signal['action']} signal @ ${signal['price']:.2f} (confidence: {signal['confidence']:.1%} < {current_threshold:.1%})")
# Still add to training queue for RL learning
self._queue_signal_for_training(signal, current_price, symbol)
else:
# Fallback: Add a simple monitoring update
if n_intervals % 10 == 0 and current_price: # Every 10 seconds
@ -1100,7 +1140,7 @@ class TradingDashboard:
return fig
def _create_price_chart(self, symbol: str) -> go.Figure:
"""Create enhanced 1-second price chart with volume from WebSocket stream"""
"""Create enhanced 1-second price chart with volume and Williams pivot points from WebSocket stream"""
try:
# Get 1-second bars from WebSocket stream
df = self.get_one_second_bars(count=300) # Last 5 minutes of 1s bars
@ -1111,6 +1151,8 @@ class TradingDashboard:
try:
df = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=True)
if df is not None and not df.empty:
# Ensure timezone consistency for fallback data
df = self._ensure_timezone_consistency(df)
# Add volume column if missing
if 'volume' not in df.columns:
df['volume'] = 100 # Default volume for demo
@ -1127,15 +1169,17 @@ class TradingDashboard:
f"Chart Error: {str(e)}"
)
else:
# Ensure timezone consistency for WebSocket data
df = self._ensure_timezone_consistency(df)
actual_timeframe = '1s'
logger.debug(f"[CHART] Using {len(df)} 1s bars from WebSocket stream")
logger.debug(f"[CHART] Using {len(df)} 1s bars from WebSocket stream in {self.timezone}")
# Create subplot with secondary y-axis for volume
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.1,
subplot_titles=(f'{symbol} Price ({actual_timeframe.upper()})', 'Volume'),
subplot_titles=(f'{symbol} Price ({actual_timeframe.upper()}) with Williams Pivot Points', 'Volume'),
row_heights=[0.7, 0.3]
)
@ -1152,6 +1196,14 @@ class TradingDashboard:
row=1, col=1
)
# Add Williams Market Structure pivot points
try:
pivot_points = self._get_williams_pivot_points_for_chart(df)
if pivot_points:
self._add_williams_pivot_points_to_chart(fig, pivot_points, row=1)
except Exception as e:
logger.debug(f"Error adding Williams pivot points to chart: {e}")
# Add moving averages if we have enough data
if len(df) >= 20:
# 20-period SMA
@ -1240,12 +1292,12 @@ class TradingDashboard:
# Add BUY markers with different styles for executed vs ignored
executed_buys = [d[0] for d in buy_decisions if d[1] == 'EXECUTED']
ignored_buys = [d[0] for d in buy_decisions if d[1] == 'IGNORED']
ignored_buys = [d[0] for d in buy_decisions if d[1] in ['NOT_EXECUTED_POSITION_LIMIT', 'NOT_EXECUTED_LOW_CONFIDENCE']]
if executed_buys:
fig.add_trace(
go.Scatter(
x=[d['timestamp'] for d in executed_buys],
x=[self._to_local_timezone(d['timestamp']) for d in executed_buys],
y=[d['price'] for d in executed_buys],
mode='markers',
marker=dict(
@ -1256,7 +1308,8 @@ class TradingDashboard:
),
name="BUY (Executed)",
showlegend=True,
hovertemplate="<b>BUY EXECUTED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br><extra></extra>"
hovertemplate="<b>BUY EXECUTED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br>Confidence: %{customdata:.1%}<extra></extra>",
customdata=[d.get('confidence', 0) for d in executed_buys]
),
row=1, col=1
)
@ -1264,7 +1317,7 @@ class TradingDashboard:
if ignored_buys:
fig.add_trace(
go.Scatter(
x=[d['timestamp'] for d in ignored_buys],
x=[self._to_local_timezone(d['timestamp']) for d in ignored_buys],
y=[d['price'] for d in ignored_buys],
mode='markers',
marker=dict(
@ -1273,21 +1326,22 @@ class TradingDashboard:
symbol='triangle-up-open',
line=dict(color='#00ff88', width=2)
),
name="BUY (Ignored)",
name="BUY (Blocked)",
showlegend=True,
hovertemplate="<b>BUY IGNORED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br><extra></extra>"
hovertemplate="<b>BUY BLOCKED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br>Confidence: %{customdata:.1%}<extra></extra>",
customdata=[d.get('confidence', 0) for d in ignored_buys]
),
row=1, col=1
)
# Add SELL markers with different styles for executed vs ignored
executed_sells = [d[0] for d in sell_decisions if d[1] == 'EXECUTED']
ignored_sells = [d[0] for d in sell_decisions if d[1] == 'IGNORED']
ignored_sells = [d[0] for d in sell_decisions if d[1] in ['NOT_EXECUTED_POSITION_LIMIT', 'NOT_EXECUTED_LOW_CONFIDENCE']]
if executed_sells:
fig.add_trace(
go.Scatter(
x=[d['timestamp'] for d in executed_sells],
x=[self._to_local_timezone(d['timestamp']) for d in executed_sells],
y=[d['price'] for d in executed_sells],
mode='markers',
marker=dict(
@ -1298,7 +1352,8 @@ class TradingDashboard:
),
name="SELL (Executed)",
showlegend=True,
hovertemplate="<b>SELL EXECUTED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br><extra></extra>"
hovertemplate="<b>SELL EXECUTED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br>Confidence: %{customdata:.1%}<extra></extra>",
customdata=[d.get('confidence', 0) for d in executed_sells]
),
row=1, col=1
)
@ -1306,7 +1361,7 @@ class TradingDashboard:
if ignored_sells:
fig.add_trace(
go.Scatter(
x=[d['timestamp'] for d in ignored_sells],
x=[self._to_local_timezone(d['timestamp']) for d in ignored_sells],
y=[d['price'] for d in ignored_sells],
mode='markers',
marker=dict(
@ -1315,9 +1370,10 @@ class TradingDashboard:
symbol='triangle-down-open',
line=dict(color='#ff6b6b', width=2)
),
name="SELL (Ignored)",
name="SELL (Blocked)",
showlegend=True,
hovertemplate="<b>SELL IGNORED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br><extra></extra>"
hovertemplate="<b>SELL BLOCKED</b><br>Price: $%{y:.2f}<br>Time: %{x}<br>Confidence: %{customdata:.1%}<extra></extra>",
customdata=[d.get('confidence', 0) for d in ignored_sells]
),
row=1, col=1
)
@ -2801,14 +2857,15 @@ class TradingDashboard:
self.is_streaming = False
def _process_tick_data(self, tick_data: Dict):
"""Process incoming tick data and update 1-second bars"""
"""Process incoming tick data and update 1-second bars with consistent timezone"""
try:
# Extract price and volume from Binance ticker data
price = float(tick_data.get('c', 0)) # Current price
volume = float(tick_data.get('v', 0)) # 24h volume
timestamp = datetime.now(timezone.utc)
# Use configured timezone instead of UTC for consistency
timestamp = self._now_local()
# Add to tick cache
# Add to tick cache with consistent timezone
tick = {
'timestamp': timestamp,
'price': price,
@ -2821,7 +2878,7 @@ class TradingDashboard:
self.tick_cache.append(tick)
# Update current second bar
# Update current second bar using local timezone
current_second = timestamp.replace(microsecond=0)
if self.current_second_data['timestamp'] != current_second:
@ -2849,6 +2906,8 @@ class TradingDashboard:
# Update current price for dashboard
self.current_prices[tick_data.get('s', 'ETHUSDT')] = price
logger.debug(f"[TICK] Processed tick at {timestamp.strftime('%H:%M:%S')} {self.timezone.zone}: ${price:.2f}")
except Exception as e:
logger.warning(f"Error processing tick data: {e}")
@ -2889,7 +2948,7 @@ class TradingDashboard:
return []
def get_one_second_bars(self, count: int = 300) -> pd.DataFrame:
"""Get recent 1-second bars as DataFrame"""
"""Get recent 1-second bars as DataFrame with consistent timezone"""
try:
if len(self.one_second_bars) == 0:
return pd.DataFrame()
@ -2903,6 +2962,11 @@ class TradingDashboard:
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
# Ensure timezone consistency
df = self._ensure_timezone_consistency(df)
logger.debug(f"[BARS] Retrieved {len(df)} 1s bars in {self.timezone.zone}")
return df
except Exception as e:
@ -4274,16 +4338,16 @@ class TradingDashboard:
logger.warning(f"Error calculating Williams pivot points: {e}")
state_features.extend([0.0] * 250) # Default features
# Add multi-timeframe OHLCV features (300 features)
# Add multi-timeframe OHLCV features (200 features: ETH 1s/1m/1d + BTC 1s)
try:
multi_tf_features = self._get_multi_timeframe_features(training_episode.get('symbol', 'ETH/USDT'))
if multi_tf_features:
state_features.extend(multi_tf_features)
else:
state_features.extend([0.0] * 300) # Default if calculation fails
state_features.extend([0.0] * 200) # Default if calculation fails
except Exception as e:
logger.warning(f"Error calculating multi-timeframe features: {e}")
state_features.extend([0.0] * 300) # Default features
state_features.extend([0.0] * 200) # Default features
# Add trade-specific context
entry_price = training_episode['entry_price']
@ -4458,7 +4522,7 @@ class TradingDashboard:
return None
ohlcv_array = np.array([
[df.index[i].timestamp() if hasattr(df.index[i], 'timestamp') else time.time(),
[self._to_local_timezone(df.index[i]).timestamp() if hasattr(df.index[i], 'timestamp') else time.time(),
df['open'].iloc[i], df['high'].iloc[i], df['low'].iloc[i],
df['close'].iloc[i], df['volume'].iloc[i]]
for i in range(len(df))
@ -4479,16 +4543,40 @@ class TradingDashboard:
return None
def _get_multi_timeframe_features(self, symbol: str) -> Optional[List[float]]:
"""Get multi-timeframe OHLCV features for comprehensive market context"""
"""Get multi-timeframe OHLCV features for ETH and BTC only (focused timeframes)"""
try:
features = []
timeframes = ['1m', '5m', '15m', '1h', '4h', '1d'] # 6 timeframes
# Focus only on key timeframes for ETH and BTC
if symbol.startswith('ETH'):
timeframes = ['1s', '1m', '1d'] # ETH: 3 key timeframes
target_symbol = 'ETH/USDT'
elif symbol.startswith('BTC'):
timeframes = ['1s'] # BTC: only 1s for reference
target_symbol = 'BTC/USDT'
else:
# Default to ETH if unknown symbol
timeframes = ['1s', '1m', '1d']
target_symbol = 'ETH/USDT'
for timeframe in timeframes:
try:
# Get data for this timeframe
if timeframe == '1s':
# For 1s data, use our tick aggregation
df = self.get_one_second_bars(count=60) # Last 60 seconds
if df.empty:
# Fallback to 1m data
df = self.data_provider.get_historical_data(
symbol=symbol,
symbol=target_symbol,
timeframe='1m',
limit=50,
refresh=True
)
else:
# Get historical data for other timeframes
df = self.data_provider.get_historical_data(
symbol=target_symbol,
timeframe=timeframe,
limit=50, # Last 50 bars
refresh=True
@ -4503,14 +4591,40 @@ class TradingDashboard:
features.extend([0.0] * 50) # 50 features per timeframe
except Exception as e:
logger.debug(f"Error getting {timeframe} data: {e}")
logger.debug(f"Error getting {timeframe} data for {target_symbol}: {e}")
features.extend([0.0] * 50) # 50 features per timeframe
# Total: 6 timeframes * 50 features = 300 features
return features[:300]
# Pad to ensure consistent feature count
# ETH: 3 timeframes * 50 = 150 features
# BTC: 1 timeframe * 50 = 50 features
# Total expected: 200 features (150 ETH + 50 BTC)
# Add BTC 1s data if we're processing ETH (for correlation analysis)
if symbol.startswith('ETH'):
try:
btc_1s_df = self.get_one_second_bars(count=60, symbol='BTC/USDT')
if btc_1s_df.empty:
btc_1s_df = self.data_provider.get_historical_data(
symbol='BTC/USDT',
timeframe='1m',
limit=50,
refresh=True
)
if btc_1s_df is not None and not btc_1s_df.empty and len(btc_1s_df) >= 10:
btc_features = self._extract_timeframe_features(btc_1s_df, '1s_btc')
features.extend(btc_features)
else:
features.extend([0.0] * 50) # BTC features
except Exception as e:
logger.debug(f"Error getting BTC correlation data: {e}")
features.extend([0.0] * 50) # BTC features
# Total: ETH(150) + BTC(50) = 200 features (reduced from 300)
return features[:200]
except Exception as e:
logger.warning(f"Error calculating multi-timeframe features: {e}")
logger.warning(f"Error calculating focused multi-timeframe features: {e}")
return None
def _extract_timeframe_features(self, df: pd.DataFrame, timeframe: str) -> List[float]:
@ -4647,6 +4761,276 @@ class TradingDashboard:
logger.warning(f"Error extracting features for {timeframe}: {e}")
return [0.0] * 50
def _get_williams_pivot_points_for_chart(self, df: pd.DataFrame) -> Optional[Dict]:
"""Calculate Williams pivot points specifically for chart visualization with consistent timezone"""
try:
# Import Williams Market Structure
try:
from training.williams_market_structure import WilliamsMarketStructure
except ImportError:
logger.warning("Williams Market Structure not available for chart")
return None
# Need at least 50 bars for meaningful pivot calculation
if len(df) < 50:
logger.debug(f"[WILLIAMS] Insufficient data for pivot calculation: {len(df)} bars")
return None
# Ensure timezone consistency for the chart data
df = self._ensure_timezone_consistency(df)
# Convert DataFrame to numpy array for Williams calculation with proper timezone handling
ohlcv_array = []
for i in range(len(df)):
timestamp = df.index[i]
# Convert timestamp to local timezone and then to Unix timestamp
if hasattr(timestamp, 'timestamp'):
local_time = self._to_local_timezone(timestamp)
unix_timestamp = local_time.timestamp()
else:
unix_timestamp = time.time()
ohlcv_array.append([
unix_timestamp,
df['open'].iloc[i],
df['high'].iloc[i],
df['low'].iloc[i],
df['close'].iloc[i],
df['volume'].iloc[i]
])
ohlcv_array = np.array(ohlcv_array)
# Calculate Williams pivot points
williams = WilliamsMarketStructure()
structure_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
# Extract pivot points for chart display
chart_pivots = {}
level_colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'] # Different colors per level
level_sizes = [8, 7, 6, 5, 4] # Different sizes for each level
total_pivots = 0
for level in range(5):
level_key = f'level_{level}'
if level_key in structure_levels:
level_data = structure_levels[level_key]
swing_points = level_data.swing_points
if swing_points:
# Log swing point details for validation
highs = [s for s in swing_points if s.swing_type.name == 'SWING_HIGH']
lows = [s for s in swing_points if s.swing_type.name == 'SWING_LOW']
logger.debug(f"[WILLIAMS] Level {level}: {len(highs)} highs, {len(lows)} lows, total: {len(swing_points)}")
# Convert swing points to chart format
chart_pivots[f'level_{level}'] = {
'swing_points': swing_points,
'color': level_colors[level],
'name': f'L{level + 1} Pivots', # Shorter name
'size': level_sizes[level], # Different sizes for validation
'opacity': max(0.9 - (level * 0.15), 0.4) # High opacity for validation
}
total_pivots += len(swing_points)
logger.info(f"[WILLIAMS] Calculated {total_pivots} total pivot points across {len(chart_pivots)} levels")
return chart_pivots
except Exception as e:
logger.warning(f"Error calculating Williams pivot points for chart: {e}")
return None
def _add_williams_pivot_points_to_chart(self, fig, pivot_points: Dict, row: int = 1):
"""Add Williams pivot points as small triangles to the chart with proper timezone conversion"""
try:
for level_key, level_data in pivot_points.items():
swing_points = level_data['swing_points']
if not swing_points:
continue
# Validate swing alternation (shouldn't have consecutive highs or lows)
self._validate_swing_alternation(swing_points, level_key)
# Separate swing highs and lows
swing_highs_x = []
swing_highs_y = []
swing_lows_x = []
swing_lows_y = []
for swing in swing_points:
# Ensure proper timezone conversion for swing point timestamps
if hasattr(swing, 'timestamp'):
timestamp = swing.timestamp
# Convert swing timestamp to local timezone
if isinstance(timestamp, datetime):
# Williams Market Structure creates naive datetimes that are actually in local time
# but without timezone info, so we need to localize them to our configured timezone
if timestamp.tzinfo is None:
# Williams creates timestamps in local time (Europe/Sofia), so localize directly
local_timestamp = self.timezone.localize(timestamp)
else:
# If it has timezone info, convert to local timezone
local_timestamp = timestamp.astimezone(self.timezone)
else:
# Fallback if timestamp is not a datetime
local_timestamp = self._now_local()
else:
local_timestamp = self._now_local()
price = swing.price
if swing.swing_type.name == 'SWING_HIGH':
swing_highs_x.append(local_timestamp)
swing_highs_y.append(price)
elif swing.swing_type.name == 'SWING_LOW':
swing_lows_x.append(local_timestamp)
swing_lows_y.append(price)
# Add swing highs (triangle-up above the price)
if swing_highs_x:
fig.add_trace(
go.Scatter(
x=swing_highs_x,
y=swing_highs_y,
mode='markers',
name=f"{level_data['name']} (Highs)",
marker=dict(
color=level_data['color'],
size=max(level_data['size'] - 2, 4), # Smaller triangles
symbol='triangle-up', # Triangle pointing up for highs
line=dict(color='white', width=1),
opacity=level_data['opacity']
),
hovertemplate=f'<b>Swing High</b><br>Price: $%{{y:.2f}}<br>%{{x}}<br>{level_data["name"]}<br>Strength: {swing_points[0].strength if swing_points else "N/A"}<extra></extra>',
showlegend=True
),
row=row, col=1
)
# Add swing lows (triangle-down below the price)
if swing_lows_x:
fig.add_trace(
go.Scatter(
x=swing_lows_x,
y=swing_lows_y,
mode='markers',
name=f"{level_data['name']} (Lows)",
marker=dict(
color=level_data['color'],
size=max(level_data['size'] - 2, 4), # Smaller triangles
symbol='triangle-down', # Triangle pointing down for lows
line=dict(color='white', width=1),
opacity=level_data['opacity']
),
hovertemplate=f'<b>Swing Low</b><br>Price: $%{{y:.2f}}<br>%{{x}}<br>{level_data["name"]}<br>Strength: {swing_points[0].strength if swing_points else "N/A"}<extra></extra>',
showlegend=True,
legendgroup=level_data['name'] # Group with highs in legend
),
row=row, col=1
)
logger.debug(f"[CHART] Added Williams pivot points as triangles with proper timezone conversion")
except Exception as e:
logger.warning(f"Error adding Williams pivot points to chart: {e}")
def _validate_swing_alternation(self, swing_points: List, level_key: str):
"""Validate that swing points alternate correctly between highs and lows"""
try:
if len(swing_points) < 2:
return
# Sort by index to check chronological order
sorted_swings = sorted(swing_points, key=lambda x: x.index)
consecutive_issues = 0
last_type = None
for i, swing in enumerate(sorted_swings):
current_type = swing.swing_type.name
if last_type and last_type == current_type:
consecutive_issues += 1
logger.debug(f"[WILLIAMS_VALIDATION] {level_key}: Consecutive {current_type} at index {swing.index}")
last_type = current_type
if consecutive_issues > 0:
logger.warning(f"[WILLIAMS_VALIDATION] {level_key}: Found {consecutive_issues} consecutive swing issues")
else:
logger.debug(f"[WILLIAMS_VALIDATION] {level_key}: Swing alternation is correct ({len(sorted_swings)} swings)")
except Exception as e:
logger.warning(f"Error validating swing alternation: {e}")
def _can_execute_new_position(self, action):
"""Check if a new position can be executed based on current position limits"""
try:
# Get max concurrent positions from config
max_positions = self.config.get('trading', {}).get('max_concurrent_positions', 3)
current_open_positions = self._count_open_positions()
# Check if we can open a new position
if current_open_positions >= max_positions:
logger.debug(f"[POSITION_LIMIT] Cannot execute {action} - at max positions ({current_open_positions}/{max_positions})")
return False
# Additional check: if we have a current position, only allow closing trades
if self.current_position:
current_side = self.current_position['side']
if current_side == 'LONG' and action == 'BUY':
return False # Already long, can't buy more
elif current_side == 'SHORT' and action == 'SELL':
return False # Already short, can't sell more
return True
except Exception as e:
logger.error(f"Error checking position limits: {e}")
return False
def _count_open_positions(self):
"""Count current open positions"""
try:
# Simple count: 1 if we have a current position, 0 otherwise
return 1 if self.current_position else 0
except Exception as e:
logger.error(f"Error counting open positions: {e}")
return 0
def _queue_signal_for_training(self, signal, current_price, symbol):
"""Add a signal to training queue for RL learning (even if not executed)"""
try:
# Add to recent decisions for display
signal['timestamp'] = datetime.now()
self.recent_decisions.append(signal.copy())
if len(self.recent_decisions) > 500:
self.recent_decisions = self.recent_decisions[-500:]
# Create synthetic trade for RL training
training_trade = {
'trade_id': f"training_{len(self.closed_trades) + 1}",
'symbol': symbol,
'side': 'LONG' if signal['action'] == 'BUY' else 'SHORT',
'entry_price': current_price,
'exit_price': current_price, # Immediate close for training
'size': 0.01, # Small size for training
'net_pnl': 0.0, # Neutral outcome for blocked signals
'fees': 0.001,
'duration': timedelta(seconds=1),
'timestamp': datetime.now(),
'mexc_executed': False
}
# Trigger RL training with this synthetic trade
self._trigger_rl_training_on_closed_trade(training_trade)
logger.debug(f"[TRAINING] Queued {signal['action']} signal for RL learning")
except Exception as e:
logger.warning(f"Error queuing signal for training: {e}")
def create_dashboard(data_provider: DataProvider = None, orchestrator: TradingOrchestrator = None, trading_executor: TradingExecutor = None) -> TradingDashboard:
"""Factory function to create a trading dashboard"""