added clean dashboard - reimplementation as other is 10k lines
This commit is contained in:
797
web/clean_dashboard.py
Normal file
797
web/clean_dashboard.py
Normal file
@ -0,0 +1,797 @@
|
||||
"""
|
||||
Clean Trading Dashboard - Modular Implementation
|
||||
Uses layout and component managers to reduce file size and improve maintainability
|
||||
"""
|
||||
|
||||
import dash
|
||||
from dash import Dash, dcc, html, Input, Output, State
|
||||
import plotly.graph_objects as go
|
||||
from plotly.subplots import make_subplots
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import pytz
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
from typing import Dict, List, Optional, Any
|
||||
import os
|
||||
|
||||
# Setup logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import core components
|
||||
from core.config import get_config
|
||||
from core.data_provider import DataProvider
|
||||
from core.orchestrator import TradingOrchestrator
|
||||
from core.trading_executor import TradingExecutor
|
||||
|
||||
# Import layout and component managers
|
||||
from web.layout_manager import DashboardLayoutManager
|
||||
from web.component_manager import DashboardComponentManager
|
||||
|
||||
# Import optional components
|
||||
try:
|
||||
from core.enhanced_orchestrator import EnhancedTradingOrchestrator
|
||||
ENHANCED_RL_AVAILABLE = True
|
||||
except ImportError:
|
||||
ENHANCED_RL_AVAILABLE = False
|
||||
logger.warning("Enhanced RL components not available")
|
||||
|
||||
try:
|
||||
from core.cob_integration import COBIntegration
|
||||
from core.multi_exchange_cob_provider import COBSnapshot
|
||||
COB_INTEGRATION_AVAILABLE = True
|
||||
except ImportError:
|
||||
COB_INTEGRATION_AVAILABLE = False
|
||||
logger.warning("COB integration not available")
|
||||
|
||||
class CleanTradingDashboard:
|
||||
"""Clean, modular trading dashboard implementation"""
|
||||
|
||||
def __init__(self, data_provider: DataProvider = None, orchestrator: TradingOrchestrator = None, trading_executor: TradingExecutor = None):
|
||||
self.config = get_config()
|
||||
|
||||
# Initialize components
|
||||
self.data_provider = data_provider or DataProvider()
|
||||
self.orchestrator = orchestrator
|
||||
self.trading_executor = trading_executor
|
||||
|
||||
# Initialize layout and component managers
|
||||
self.layout_manager = DashboardLayoutManager(
|
||||
starting_balance=self._get_initial_balance(),
|
||||
trading_executor=self.trading_executor
|
||||
)
|
||||
self.component_manager = DashboardComponentManager()
|
||||
|
||||
# Dashboard state
|
||||
self.recent_decisions = []
|
||||
self.closed_trades = []
|
||||
self.current_prices = {}
|
||||
self.session_pnl = 0.0
|
||||
self.total_fees = 0.0
|
||||
self.current_position = None
|
||||
|
||||
# WebSocket streaming
|
||||
self.ws_price_cache = {}
|
||||
self.is_streaming = False
|
||||
self.tick_cache = []
|
||||
|
||||
# COB data cache
|
||||
self.cob_cache = {
|
||||
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0},
|
||||
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
|
||||
}
|
||||
|
||||
# Initialize timezone
|
||||
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
|
||||
self.timezone = pytz.timezone(timezone_name)
|
||||
|
||||
# Create Dash app
|
||||
self.app = Dash(__name__, external_stylesheets=[
|
||||
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
|
||||
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
|
||||
])
|
||||
|
||||
# Setup layout and callbacks
|
||||
self._setup_layout()
|
||||
self._setup_callbacks()
|
||||
|
||||
# Start data streams
|
||||
self._initialize_streaming()
|
||||
|
||||
logger.info("Clean Trading Dashboard initialized")
|
||||
|
||||
def _get_initial_balance(self) -> float:
|
||||
"""Get initial balance from trading executor or default"""
|
||||
try:
|
||||
if self.trading_executor and hasattr(self.trading_executor, 'get_balance'):
|
||||
balance = self.trading_executor.get_balance()
|
||||
if balance and balance > 0:
|
||||
return balance
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting balance: {e}")
|
||||
return 100.0 # Default balance
|
||||
|
||||
def _setup_layout(self):
|
||||
"""Setup the dashboard layout using layout manager"""
|
||||
self.app.layout = self.layout_manager.create_main_layout()
|
||||
|
||||
def _setup_callbacks(self):
|
||||
"""Setup dashboard callbacks"""
|
||||
|
||||
@self.app.callback(
|
||||
[Output('current-price', 'children'),
|
||||
Output('session-pnl', 'children'),
|
||||
Output('current-position', 'children'),
|
||||
Output('portfolio-value', 'children'),
|
||||
Output('total-fees', 'children'),
|
||||
Output('trade-count', 'children'),
|
||||
Output('mexc-status', 'children')],
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_metrics(n):
|
||||
"""Update key metrics"""
|
||||
try:
|
||||
# Get current price
|
||||
current_price = self._get_current_price('ETH/USDT')
|
||||
price_str = f"${current_price:.2f}" if current_price else "Loading..."
|
||||
|
||||
# Calculate session P&L
|
||||
session_pnl_str = f"${self.session_pnl:.2f}"
|
||||
session_pnl_class = "text-success" if self.session_pnl >= 0 else "text-danger"
|
||||
|
||||
# Current position
|
||||
position_str = "No Position"
|
||||
if self.current_position:
|
||||
side = self.current_position.get('side', 'UNKNOWN')
|
||||
size = self.current_position.get('size', 0)
|
||||
entry_price = self.current_position.get('price', 0)
|
||||
position_str = f"{side} {size:.3f} @ ${entry_price:.2f}"
|
||||
|
||||
# Portfolio value
|
||||
initial_balance = self._get_initial_balance()
|
||||
portfolio_value = initial_balance + self.session_pnl
|
||||
portfolio_str = f"${portfolio_value:.2f}"
|
||||
|
||||
# Total fees
|
||||
fees_str = f"${self.total_fees:.3f}"
|
||||
|
||||
# Trade count
|
||||
trade_count = len(self.closed_trades)
|
||||
trade_str = f"{trade_count} Trades"
|
||||
|
||||
# MEXC status
|
||||
mexc_status = "SIM"
|
||||
if self.trading_executor:
|
||||
if hasattr(self.trading_executor, 'trading_enabled') and self.trading_executor.trading_enabled:
|
||||
if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode:
|
||||
mexc_status = "LIVE"
|
||||
|
||||
return price_str, session_pnl_str, position_str, portfolio_str, fees_str, trade_str, mexc_status
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating metrics: {e}")
|
||||
return "Error", "$0.00", "Error", "$100.00", "$0.00", "0", "ERROR"
|
||||
|
||||
@self.app.callback(
|
||||
Output('recent-decisions', 'children'),
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_recent_decisions(n):
|
||||
"""Update recent trading signals"""
|
||||
try:
|
||||
return self.component_manager.format_trading_signals(self.recent_decisions)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating decisions: {e}")
|
||||
return [html.P(f"Error: {str(e)}", className="text-danger")]
|
||||
|
||||
@self.app.callback(
|
||||
Output('price-chart', 'figure'),
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_price_chart(n):
|
||||
"""Update price chart every second (1000ms interval)"""
|
||||
try:
|
||||
return self._create_price_chart('ETH/USDT')
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating chart: {e}")
|
||||
return go.Figure().add_annotation(text=f"Chart Error: {str(e)}",
|
||||
xref="paper", yref="paper",
|
||||
x=0.5, y=0.5, showarrow=False)
|
||||
|
||||
@self.app.callback(
|
||||
Output('closed-trades-table', 'children'),
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_closed_trades(n):
|
||||
"""Update closed trades table"""
|
||||
try:
|
||||
return self.component_manager.format_closed_trades_table(self.closed_trades)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating trades table: {e}")
|
||||
return html.P(f"Error: {str(e)}", className="text-danger")
|
||||
|
||||
@self.app.callback(
|
||||
[Output('cob-status-content', 'children'),
|
||||
Output('eth-cob-content', 'children'),
|
||||
Output('btc-cob-content', 'children')],
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_cob_data(n):
|
||||
"""Update COB data displays"""
|
||||
try:
|
||||
# COB Status
|
||||
cob_status = self._get_cob_status()
|
||||
status_components = self.component_manager.format_system_status(cob_status)
|
||||
|
||||
# ETH/USDT COB
|
||||
eth_cob = self._get_cob_snapshot('ETH/USDT')
|
||||
eth_components = self.component_manager.format_cob_data(eth_cob, 'ETH/USDT')
|
||||
|
||||
# BTC/USDT COB
|
||||
btc_cob = self._get_cob_snapshot('BTC/USDT')
|
||||
btc_components = self.component_manager.format_cob_data(btc_cob, 'BTC/USDT')
|
||||
|
||||
return status_components, eth_components, btc_components
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating COB data: {e}")
|
||||
error_msg = html.P(f"Error: {str(e)}", className="text-danger")
|
||||
return error_msg, error_msg, error_msg
|
||||
|
||||
@self.app.callback(
|
||||
Output('training-metrics', 'children'),
|
||||
[Input('interval-component', 'n_intervals')]
|
||||
)
|
||||
def update_training_metrics(n):
|
||||
"""Update training metrics"""
|
||||
try:
|
||||
metrics_data = self._get_training_metrics()
|
||||
return self.component_manager.format_training_metrics(metrics_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating training metrics: {e}")
|
||||
return [html.P(f"Error: {str(e)}", className="text-danger")]
|
||||
|
||||
# Manual trading buttons
|
||||
@self.app.callback(
|
||||
Output('manual-buy-btn', 'children'),
|
||||
[Input('manual-buy-btn', 'n_clicks')],
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def handle_manual_buy(n_clicks):
|
||||
"""Handle manual buy button"""
|
||||
if n_clicks:
|
||||
self._execute_manual_trade('BUY')
|
||||
return [html.I(className="fas fa-arrow-up me-1"), "BUY"]
|
||||
|
||||
@self.app.callback(
|
||||
Output('manual-sell-btn', 'children'),
|
||||
[Input('manual-sell-btn', 'n_clicks')],
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def handle_manual_sell(n_clicks):
|
||||
"""Handle manual sell button"""
|
||||
if n_clicks:
|
||||
self._execute_manual_trade('SELL')
|
||||
return [html.I(className="fas fa-arrow-down me-1"), "SELL"]
|
||||
|
||||
# Clear session button
|
||||
@self.app.callback(
|
||||
Output('clear-session-btn', 'children'),
|
||||
[Input('clear-session-btn', 'n_clicks')],
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def handle_clear_session(n_clicks):
|
||||
"""Handle clear session button"""
|
||||
if n_clicks:
|
||||
self._clear_session()
|
||||
return [html.I(className="fas fa-trash me-1"), "Clear Session"]
|
||||
|
||||
def _get_current_price(self, symbol: str) -> Optional[float]:
|
||||
"""Get current price for symbol"""
|
||||
try:
|
||||
# Try WebSocket cache first
|
||||
ws_symbol = symbol.replace('/', '')
|
||||
if ws_symbol in self.ws_price_cache:
|
||||
return self.ws_price_cache[ws_symbol]
|
||||
|
||||
# Fallback to data provider
|
||||
if symbol in self.current_prices:
|
||||
return self.current_prices[symbol]
|
||||
|
||||
# Get fresh price from data provider
|
||||
df = self.data_provider.get_historical_data(symbol, '1m', limit=1)
|
||||
if df is not None and not df.empty:
|
||||
price = float(df['close'].iloc[-1])
|
||||
self.current_prices[symbol] = price
|
||||
return price
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting current price for {symbol}: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _create_price_chart(self, symbol: str) -> go.Figure:
|
||||
"""Create 1-minute main chart with 1-second mini chart - Updated every second"""
|
||||
try:
|
||||
# Get 1-minute data (main chart) - FIXED for real-time updates
|
||||
# First try to create 1m bars from WebSocket 1s data
|
||||
ws_data_raw = self._get_websocket_chart_data(symbol, 'raw')
|
||||
if ws_data_raw is not None and len(ws_data_raw) > 60:
|
||||
# Resample 1s data to 1m bars for real-time updating 1m chart
|
||||
df_main = ws_data_raw.resample('1min').agg({
|
||||
'open': 'first',
|
||||
'high': 'max',
|
||||
'low': 'min',
|
||||
'close': 'last',
|
||||
'volume': 'sum'
|
||||
}).dropna().tail(180) # Last 3 hours
|
||||
main_source = "WebSocket 1m (Real-time)"
|
||||
else:
|
||||
# Fallback to historical 1-minute data (3 hours)
|
||||
df_main = self.data_provider.get_historical_data(symbol, '1m', limit=180)
|
||||
main_source = "Historical 1m"
|
||||
|
||||
# Get 1-second data (mini chart)
|
||||
ws_data_1s = self._get_websocket_chart_data(symbol, '1s')
|
||||
|
||||
if df_main is None or df_main.empty:
|
||||
return go.Figure().add_annotation(text="No data available",
|
||||
xref="paper", yref="paper",
|
||||
x=0.5, y=0.5, showarrow=False)
|
||||
|
||||
# Create chart with 3 subplots: Main 1m chart, Mini 1s chart, Volume
|
||||
if ws_data_1s is not None and len(ws_data_1s) > 5:
|
||||
fig = make_subplots(
|
||||
rows=3, cols=1,
|
||||
shared_xaxes=True,
|
||||
vertical_spacing=0.05,
|
||||
subplot_titles=(
|
||||
f'{symbol} - {main_source} ({len(df_main)} bars)',
|
||||
f'1s Mini Chart ({len(ws_data_1s)} bars)',
|
||||
'Volume'
|
||||
),
|
||||
row_heights=[0.5, 0.25, 0.25]
|
||||
)
|
||||
has_mini_chart = True
|
||||
else:
|
||||
fig = make_subplots(
|
||||
rows=2, cols=1,
|
||||
shared_xaxes=True,
|
||||
vertical_spacing=0.08,
|
||||
subplot_titles=(f'{symbol} - {main_source} ({len(df_main)} bars)', 'Volume'),
|
||||
row_heights=[0.7, 0.3]
|
||||
)
|
||||
has_mini_chart = False
|
||||
|
||||
# Main 1-minute candlestick chart
|
||||
fig.add_trace(
|
||||
go.Candlestick(
|
||||
x=df_main.index,
|
||||
open=df_main['open'],
|
||||
high=df_main['high'],
|
||||
low=df_main['low'],
|
||||
close=df_main['close'],
|
||||
name=f'{symbol} 1m',
|
||||
increasing_line_color='#26a69a',
|
||||
decreasing_line_color='#ef5350',
|
||||
increasing_fillcolor='#26a69a',
|
||||
decreasing_fillcolor='#ef5350'
|
||||
),
|
||||
row=1, col=1
|
||||
)
|
||||
|
||||
# Mini 1-second chart (if available)
|
||||
if has_mini_chart:
|
||||
fig.add_trace(
|
||||
go.Scatter(
|
||||
x=ws_data_1s.index,
|
||||
y=ws_data_1s['close'],
|
||||
mode='lines',
|
||||
name='1s Price',
|
||||
line=dict(color='#ffa726', width=1),
|
||||
showlegend=False
|
||||
),
|
||||
row=2, col=1
|
||||
)
|
||||
|
||||
# Volume bars (bottom subplot)
|
||||
volume_row = 3 if has_mini_chart else 2
|
||||
fig.add_trace(
|
||||
go.Bar(
|
||||
x=df_main.index,
|
||||
y=df_main['volume'],
|
||||
name='Volume',
|
||||
marker_color='rgba(100,150,200,0.6)',
|
||||
showlegend=False
|
||||
),
|
||||
row=volume_row, col=1
|
||||
)
|
||||
|
||||
# Update layout
|
||||
chart_height = 500 if has_mini_chart else 400
|
||||
fig.update_layout(
|
||||
title=f'{symbol} Live Chart - {main_source} (Updated Every Second)',
|
||||
template='plotly_dark',
|
||||
showlegend=False,
|
||||
height=chart_height,
|
||||
margin=dict(l=50, r=50, t=60, b=50),
|
||||
xaxis_rangeslider_visible=False
|
||||
)
|
||||
|
||||
# Update axes
|
||||
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
|
||||
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
|
||||
|
||||
chart_info = f"1m bars: {len(df_main)}"
|
||||
if has_mini_chart:
|
||||
chart_info += f", 1s ticks: {len(ws_data_1s)}"
|
||||
|
||||
logger.debug(f"[CHART] Created combined chart - {chart_info}")
|
||||
return fig
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating chart for {symbol}: {e}")
|
||||
return go.Figure().add_annotation(text=f"Chart Error: {str(e)}",
|
||||
xref="paper", yref="paper",
|
||||
x=0.5, y=0.5, showarrow=False)
|
||||
|
||||
def _get_websocket_chart_data(self, symbol: str, timeframe: str = '1m') -> Optional[pd.DataFrame]:
|
||||
"""Get WebSocket chart data - supports both 1m and 1s timeframes"""
|
||||
try:
|
||||
if not hasattr(self, 'tick_cache') or not self.tick_cache:
|
||||
return None
|
||||
|
||||
# Filter ticks for symbol
|
||||
symbol_ticks = [tick for tick in self.tick_cache if tick.get('symbol') == symbol.replace('/', '')]
|
||||
|
||||
if len(symbol_ticks) < 10:
|
||||
return None
|
||||
|
||||
# Convert to DataFrame
|
||||
df = pd.DataFrame(symbol_ticks)
|
||||
df['datetime'] = pd.to_datetime(df['datetime'])
|
||||
df.set_index('datetime', inplace=True)
|
||||
|
||||
# Get the price column (could be 'price', 'close', or 'c')
|
||||
price_col = None
|
||||
for col in ['price', 'close', 'c']:
|
||||
if col in df.columns:
|
||||
price_col = col
|
||||
break
|
||||
|
||||
if price_col is None:
|
||||
logger.warning(f"No price column found in WebSocket data for {symbol}")
|
||||
return None
|
||||
|
||||
# Create OHLC bars based on requested timeframe
|
||||
if timeframe == '1s':
|
||||
df_resampled = df[price_col].resample('1s').ohlc()
|
||||
# For 1s data, keep last 300 seconds (5 minutes)
|
||||
max_bars = 300
|
||||
elif timeframe == 'raw':
|
||||
# Return raw 1s kline data for resampling to 1m in chart creation
|
||||
df_resampled = df[['open', 'high', 'low', 'close', 'volume']].copy()
|
||||
# Keep last 3+ hours of 1s data for 1m resampling
|
||||
max_bars = 200 * 60 # 200 minutes worth of 1s data
|
||||
else: # 1m
|
||||
df_resampled = df[price_col].resample('1min').ohlc()
|
||||
# For 1m data, keep last 180 minutes (3 hours)
|
||||
max_bars = 180
|
||||
|
||||
if timeframe == '1s':
|
||||
df_resampled.columns = ['open', 'high', 'low', 'close']
|
||||
|
||||
# Handle volume data
|
||||
if timeframe == '1s':
|
||||
# FIXED: Better volume calculation for 1s
|
||||
if 'volume' in df.columns and df['volume'].sum() > 0:
|
||||
df_resampled['volume'] = df['volume'].resample('1s').sum()
|
||||
else:
|
||||
# Use tick count as volume proxy with some randomization for variety
|
||||
import random
|
||||
tick_counts = df[price_col].resample('1s').count()
|
||||
df_resampled['volume'] = tick_counts * (50 + random.randint(0, 100))
|
||||
# For 1m timeframe, volume is already in the raw data
|
||||
|
||||
# Remove any NaN rows and limit to max bars
|
||||
df_resampled = df_resampled.dropna().tail(max_bars)
|
||||
|
||||
if len(df_resampled) < 5:
|
||||
logger.debug(f"Insufficient {timeframe} data for {symbol}: {len(df_resampled)} bars")
|
||||
return None
|
||||
|
||||
logger.debug(f"[WS-CHART] Created {len(df_resampled)} {timeframe} OHLC bars for {symbol}")
|
||||
return df_resampled
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting WebSocket chart data: {e}")
|
||||
return None
|
||||
|
||||
def _get_cob_status(self) -> Dict:
|
||||
"""Get COB integration status"""
|
||||
try:
|
||||
status = {
|
||||
'trading_enabled': bool(self.trading_executor and getattr(self.trading_executor, 'trading_enabled', False)),
|
||||
'simulation_mode': bool(self.trading_executor and getattr(self.trading_executor, 'simulation_mode', True)),
|
||||
'data_provider_status': 'Active',
|
||||
'websocket_status': 'Connected' if self.is_streaming else 'Disconnected',
|
||||
'cob_status': 'Active' if COB_INTEGRATION_AVAILABLE else 'Inactive'
|
||||
}
|
||||
|
||||
if self.orchestrator and hasattr(self.orchestrator, 'cob_integration'):
|
||||
cob_integration = self.orchestrator.cob_integration
|
||||
if cob_integration and hasattr(cob_integration, 'is_active'):
|
||||
status['cob_status'] = 'Active' if cob_integration.is_active else 'Inactive'
|
||||
|
||||
return status
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting COB status: {e}")
|
||||
return {'error': str(e)}
|
||||
|
||||
def _get_cob_snapshot(self, symbol: str) -> Optional[Any]:
|
||||
"""Get COB snapshot for symbol"""
|
||||
try:
|
||||
if not COB_INTEGRATION_AVAILABLE:
|
||||
return None
|
||||
|
||||
if self.orchestrator and hasattr(self.orchestrator, 'cob_integration'):
|
||||
cob_integration = self.orchestrator.cob_integration
|
||||
if cob_integration and hasattr(cob_integration, 'get_latest_snapshot'):
|
||||
return cob_integration.get_latest_snapshot(symbol)
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting COB snapshot for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def _get_training_metrics(self) -> Dict:
|
||||
"""Get training metrics data"""
|
||||
try:
|
||||
metrics = {}
|
||||
|
||||
# CNN metrics
|
||||
if hasattr(self, 'williams_structure') and self.williams_structure:
|
||||
cnn_stats = getattr(self.williams_structure, 'get_training_stats', lambda: {})()
|
||||
if cnn_stats:
|
||||
metrics['cnn_metrics'] = cnn_stats
|
||||
|
||||
# RL metrics
|
||||
if ENHANCED_RL_AVAILABLE and self.orchestrator:
|
||||
if hasattr(self.orchestrator, 'get_rl_stats'):
|
||||
rl_stats = self.orchestrator.get_rl_stats()
|
||||
if rl_stats:
|
||||
metrics['rl_metrics'] = rl_stats
|
||||
|
||||
return metrics
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting training metrics: {e}")
|
||||
return {'error': str(e)}
|
||||
|
||||
def _execute_manual_trade(self, action: str):
|
||||
"""Execute manual trading action"""
|
||||
try:
|
||||
if not self.trading_executor:
|
||||
logger.warning("No trading executor available")
|
||||
return
|
||||
|
||||
symbol = 'ETH/USDT'
|
||||
current_price = self._get_current_price(symbol)
|
||||
|
||||
if not current_price:
|
||||
logger.warning("No current price available for manual trade")
|
||||
return
|
||||
|
||||
# Create manual trading decision
|
||||
decision = {
|
||||
'timestamp': datetime.now().strftime('%H:%M:%S'),
|
||||
'action': action,
|
||||
'confidence': 100.0, # Manual trades have 100% confidence
|
||||
'price': current_price,
|
||||
'executed': False,
|
||||
'blocked': False,
|
||||
'manual': True
|
||||
}
|
||||
|
||||
# Execute through trading executor
|
||||
if hasattr(self.trading_executor, 'execute_trade'):
|
||||
result = self.trading_executor.execute_trade(symbol, action, 0.01) # Small size for testing
|
||||
if result:
|
||||
decision['executed'] = True
|
||||
logger.info(f"Manual {action} executed at ${current_price:.2f}")
|
||||
else:
|
||||
decision['blocked'] = True
|
||||
decision['block_reason'] = "Execution failed"
|
||||
|
||||
# Add to recent decisions
|
||||
self.recent_decisions.append(decision)
|
||||
|
||||
# Keep only last 20 decisions
|
||||
if len(self.recent_decisions) > 20:
|
||||
self.recent_decisions = self.recent_decisions[-20:]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing manual {action}: {e}")
|
||||
|
||||
def _clear_session(self):
|
||||
"""Clear session data"""
|
||||
try:
|
||||
# Reset session metrics
|
||||
self.session_pnl = 0.0
|
||||
self.total_fees = 0.0
|
||||
self.closed_trades = []
|
||||
self.recent_decisions = []
|
||||
|
||||
logger.info("Session data cleared")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error clearing session: {e}")
|
||||
|
||||
def _initialize_streaming(self):
|
||||
"""Initialize data streaming"""
|
||||
try:
|
||||
# Start WebSocket streaming
|
||||
self._start_websocket_streaming()
|
||||
|
||||
# Start data collection thread
|
||||
self._start_data_collection()
|
||||
|
||||
logger.info("Data streaming initialized")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing streaming: {e}")
|
||||
|
||||
def _start_websocket_streaming(self):
|
||||
"""Start WebSocket streaming for real-time data"""
|
||||
try:
|
||||
def ws_worker():
|
||||
try:
|
||||
import websocket
|
||||
import json
|
||||
|
||||
def on_message(ws, message):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if 'k' in data: # Kline data
|
||||
kline = data['k']
|
||||
# Process ALL klines (both open and closed) for real-time updates
|
||||
tick_record = {
|
||||
'symbol': 'ETHUSDT',
|
||||
'datetime': datetime.fromtimestamp(int(kline['t']) / 1000),
|
||||
'open': float(kline['o']),
|
||||
'high': float(kline['h']),
|
||||
'low': float(kline['l']),
|
||||
'close': float(kline['c']),
|
||||
'price': float(kline['c']), # For compatibility
|
||||
'volume': float(kline['v']), # Real volume data!
|
||||
'is_closed': kline['x'] # Track if kline is closed
|
||||
}
|
||||
|
||||
# Update current price every second
|
||||
current_price = float(kline['c'])
|
||||
self.ws_price_cache['ETHUSDT'] = current_price
|
||||
self.current_prices['ETH/USDT'] = current_price
|
||||
|
||||
# Add to tick cache (keep last 1000 klines for charts)
|
||||
# For real-time updates, we need more data points
|
||||
self.tick_cache.append(tick_record)
|
||||
if len(self.tick_cache) > 1000:
|
||||
self.tick_cache = self.tick_cache[-1000:]
|
||||
|
||||
status = "CLOSED" if kline['x'] else "LIVE"
|
||||
logger.debug(f"[WS] {status} kline: {current_price:.2f}, Vol: {tick_record['volume']:.0f} (cache: {len(self.tick_cache)})")
|
||||
except Exception as e:
|
||||
logger.warning(f"WebSocket message error: {e}")
|
||||
|
||||
def on_error(ws, error):
|
||||
logger.error(f"WebSocket error: {error}")
|
||||
self.is_streaming = False
|
||||
|
||||
def on_close(ws, close_status_code, close_msg):
|
||||
logger.warning("WebSocket connection closed")
|
||||
self.is_streaming = False
|
||||
|
||||
def on_open(ws):
|
||||
logger.info("WebSocket connected")
|
||||
self.is_streaming = True
|
||||
|
||||
# Binance WebSocket - Use kline stream for OHLCV data
|
||||
ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s"
|
||||
|
||||
ws = websocket.WebSocketApp(
|
||||
ws_url,
|
||||
on_message=on_message,
|
||||
on_error=on_error,
|
||||
on_close=on_close,
|
||||
on_open=on_open
|
||||
)
|
||||
|
||||
ws.run_forever()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket worker error: {e}")
|
||||
self.is_streaming = False
|
||||
|
||||
# Start WebSocket thread
|
||||
ws_thread = threading.Thread(target=ws_worker, daemon=True)
|
||||
ws_thread.start()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting WebSocket: {e}")
|
||||
|
||||
def _start_data_collection(self):
|
||||
"""Start background data collection"""
|
||||
try:
|
||||
def data_worker():
|
||||
while True:
|
||||
try:
|
||||
# Update recent decisions from orchestrator
|
||||
if self.orchestrator and hasattr(self.orchestrator, 'get_recent_decisions'):
|
||||
decisions = self.orchestrator.get_recent_decisions('ETH/USDT')
|
||||
if decisions:
|
||||
self.recent_decisions = decisions[-20:] # Keep last 20
|
||||
|
||||
# Update closed trades
|
||||
if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'):
|
||||
trades = self.trading_executor.get_closed_trades()
|
||||
if trades:
|
||||
self.closed_trades = trades
|
||||
|
||||
# Update session metrics
|
||||
self._update_session_metrics()
|
||||
|
||||
time.sleep(5) # Update every 5 seconds
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Data collection error: {e}")
|
||||
time.sleep(10) # Wait longer on error
|
||||
|
||||
# Start data collection thread
|
||||
data_thread = threading.Thread(target=data_worker, daemon=True)
|
||||
data_thread.start()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting data collection: {e}")
|
||||
|
||||
def _update_session_metrics(self):
|
||||
"""Update session P&L and metrics"""
|
||||
try:
|
||||
# Calculate session P&L from closed trades
|
||||
if self.closed_trades:
|
||||
self.session_pnl = sum(trade.get('pnl', 0) for trade in self.closed_trades)
|
||||
self.total_fees = sum(trade.get('fees', 0) for trade in self.closed_trades)
|
||||
|
||||
# Update current position
|
||||
if self.trading_executor and hasattr(self.trading_executor, 'get_current_position'):
|
||||
position = self.trading_executor.get_current_position()
|
||||
self.current_position = position
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error updating session metrics: {e}")
|
||||
|
||||
def run_server(self, host='127.0.0.1', port=8051, debug=False):
|
||||
"""Run the dashboard server"""
|
||||
logger.info(f"Starting Clean Trading Dashboard at http://{host}:{port}")
|
||||
self.app.run(host=host, port=port, debug=debug)
|
||||
|
||||
def stop(self):
|
||||
"""Stop the dashboard and cleanup resources"""
|
||||
try:
|
||||
self.is_streaming = False
|
||||
logger.info("Clean Trading Dashboard stopped")
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping dashboard: {e}")
|
||||
|
||||
# Factory function for easy creation
|
||||
def create_clean_dashboard(data_provider=None, orchestrator=None, trading_executor=None):
|
||||
"""Create a clean trading dashboard instance"""
|
||||
return CleanTradingDashboard(
|
||||
data_provider=data_provider,
|
||||
orchestrator=orchestrator,
|
||||
trading_executor=trading_executor
|
||||
)
|
Reference in New Issue
Block a user