2577 lines
130 KiB
Python
2577 lines
130 KiB
Python
# """
|
|
# OBSOLETE AND BROKN. IGNORE THIS FILE FOR NOW.
|
|
|
|
# Ultra-Fast Real-Time Scalping Dashboard (500x Leverage) - Live Data Streaming
|
|
|
|
# Real-time WebSocket streaming dashboard with:
|
|
# - Main 1s ETH/USDT chart (full width) with live updates
|
|
# - 4 small charts: 1m ETH, 1h ETH, 1d ETH, 1s BTC
|
|
# - WebSocket price streaming for instant updates
|
|
# - Europe/Sofia timezone support
|
|
# - Ultra-low latency UI updates (100ms)
|
|
# - NO CACHED DATA - 100% live streaming
|
|
# """
|
|
|
|
# import asyncio
|
|
# import json
|
|
# import logging
|
|
# import time
|
|
# import websockets
|
|
# import pytz
|
|
# from datetime import datetime, timedelta
|
|
# from threading import Thread, Lock
|
|
# from typing import Dict, List, Optional, Any
|
|
# from collections import deque
|
|
# import pandas as pd
|
|
# import numpy as np
|
|
# import requests
|
|
# import uuid
|
|
|
|
# import dash
|
|
# from dash import dcc, html, Input, Output
|
|
# import plotly.graph_objects as go
|
|
# import dash_bootstrap_components as dbc
|
|
|
|
# from core.config import get_config
|
|
# from core.data_provider import DataProvider, MarketTick
|
|
# from core.enhanced_orchestrator import EnhancedTradingOrchestrator, TradingAction
|
|
# from core.trading_executor import TradingExecutor, Position, TradeRecord
|
|
# from core.unified_data_stream import UnifiedDataStream, TrainingDataPacket, UIDataPacket
|
|
|
|
# logger = logging.getLogger(__name__)
|
|
|
|
# class TradingSession:
|
|
# """
|
|
# Session-based trading with MEXC integration
|
|
# Tracks P&L for each session but resets between sessions
|
|
# """
|
|
|
|
# def __init__(self, session_id: str = None, trading_executor: TradingExecutor = None):
|
|
# self.session_id = session_id or str(uuid.uuid4())[:8]
|
|
# self.start_time = datetime.now()
|
|
# self.starting_balance = 100.0 # $100 USD starting balance
|
|
# self.current_balance = self.starting_balance
|
|
# self.total_pnl = 0.0
|
|
# self.total_fees = 0.0 # Track total fees paid (opening + closing)
|
|
# self.total_trades = 0
|
|
# self.winning_trades = 0
|
|
# self.losing_trades = 0
|
|
# self.positions = {} # symbol -> {'size': float, 'entry_price': float, 'side': str, 'fees': float}
|
|
# self.trade_history = []
|
|
# self.last_action = None
|
|
# self.trading_executor = trading_executor
|
|
|
|
# # Fee configuration - MEXC spot trading fees
|
|
# self.fee_rate = 0.001 # 0.1% trading fee (typical for MEXC spot)
|
|
|
|
# logger.info(f"NEW TRADING SESSION STARTED WITH MEXC INTEGRATION")
|
|
# logger.info(f"Session ID: {self.session_id}")
|
|
# logger.info(f"Starting Balance: ${self.starting_balance:.2f}")
|
|
# logger.info(f"MEXC Trading: {'ENABLED' if trading_executor and trading_executor.trading_enabled else 'DISABLED'}")
|
|
# logger.info(f"Trading Fee Rate: {self.fee_rate*100:.1f}%")
|
|
# logger.info(f"Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
# def execute_trade(self, action: TradingAction, current_price: float):
|
|
# """Execute a trading action through MEXC and update P&L"""
|
|
# try:
|
|
# symbol = action.symbol
|
|
|
|
# # Execute trade through MEXC if available
|
|
# mexc_success = False
|
|
# if self.trading_executor and action.action != 'HOLD':
|
|
# try:
|
|
# mexc_success = self.trading_executor.execute_signal(
|
|
# symbol=symbol,
|
|
# action=action.action,
|
|
# confidence=action.confidence,
|
|
# current_price=current_price
|
|
# )
|
|
# if mexc_success:
|
|
# logger.info(f"MEXC: Trade executed successfully: {action.action} {symbol}")
|
|
# else:
|
|
# logger.warning(f"MEXC: Trade execution failed: {action.action} {symbol}")
|
|
# except Exception as e:
|
|
# logger.error(f"MEXC: Error executing trade: {e}")
|
|
|
|
# # Calculate position size based on confidence and leverage
|
|
# leverage = 500 # 500x leverage
|
|
# risk_per_trade = 0.02 # 2% risk per trade
|
|
# position_value = self.current_balance * risk_per_trade * leverage * action.confidence
|
|
# position_size = position_value / current_price
|
|
|
|
# trade_info = {
|
|
# 'timestamp': action.timestamp,
|
|
# 'symbol': symbol,
|
|
# 'action': action.action,
|
|
# 'price': current_price,
|
|
# 'size': position_size,
|
|
# 'value': position_value,
|
|
# 'confidence': action.confidence,
|
|
# 'mexc_executed': mexc_success
|
|
# }
|
|
|
|
# if action.action == 'BUY':
|
|
# # Close any existing short position
|
|
# if symbol in self.positions and self.positions[symbol]['side'] == 'SHORT':
|
|
# pnl = self._close_position(symbol, current_price, 'BUY')
|
|
# trade_info['pnl'] = pnl
|
|
|
|
# # Open new long position with opening fee
|
|
# opening_fee = current_price * position_size * self.fee_rate
|
|
# self.total_fees += opening_fee
|
|
|
|
# self.positions[symbol] = {
|
|
# 'size': position_size,
|
|
# 'entry_price': current_price,
|
|
# 'side': 'LONG',
|
|
# 'fees': opening_fee # Track opening fee
|
|
# }
|
|
# trade_info['opening_fee'] = opening_fee
|
|
# trade_info['pnl'] = 0 # No immediate P&L on entry
|
|
|
|
# elif action.action == 'SELL':
|
|
# # Close any existing long position
|
|
# if symbol in self.positions and self.positions[symbol]['side'] == 'LONG':
|
|
# pnl = self._close_position(symbol, current_price, 'SELL')
|
|
# trade_info['pnl'] = pnl
|
|
# else:
|
|
# # Open new short position with opening fee
|
|
# opening_fee = current_price * position_size * self.fee_rate
|
|
# self.total_fees += opening_fee
|
|
|
|
# self.positions[symbol] = {
|
|
# 'size': position_size,
|
|
# 'entry_price': current_price,
|
|
# 'side': 'SHORT',
|
|
# 'fees': opening_fee # Track opening fee
|
|
# }
|
|
# trade_info['opening_fee'] = opening_fee
|
|
# trade_info['pnl'] = 0
|
|
|
|
# elif action.action == 'HOLD':
|
|
# # No position change, just track
|
|
# trade_info['pnl'] = 0
|
|
# trade_info['size'] = 0
|
|
# trade_info['value'] = 0
|
|
|
|
# self.trade_history.append(trade_info)
|
|
# self.total_trades += 1
|
|
# self.last_action = f"{action.action} {symbol}"
|
|
|
|
# # Update current balance
|
|
# self.current_balance = self.starting_balance + self.total_pnl
|
|
|
|
# logger.info(f"TRADING: TRADE EXECUTED: {action.action} {symbol} @ ${current_price:.2f}")
|
|
# logger.info(f"MEXC: {'SUCCESS' if mexc_success else 'SIMULATION'}")
|
|
# logger.info(f"CHART: Position Size: {position_size:.6f} (${position_value:.2f})")
|
|
# logger.info(f"MONEY: Session P&L: ${self.total_pnl:+.2f} | Balance: ${self.current_balance:.2f}")
|
|
|
|
# return trade_info
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error executing trade: {e}")
|
|
# return None
|
|
|
|
# def _close_position(self, symbol: str, exit_price: float, close_action: str) -> float:
|
|
# """Close an existing position and calculate P&L with fees"""
|
|
# if symbol not in self.positions:
|
|
# return 0.0
|
|
|
|
# position = self.positions[symbol]
|
|
# entry_price = position['entry_price']
|
|
# size = position['size']
|
|
# side = position['side']
|
|
# opening_fee = position.get('fees', 0.0)
|
|
|
|
# # Calculate closing fee
|
|
# closing_fee = exit_price * size * self.fee_rate
|
|
# total_fees = opening_fee + closing_fee
|
|
# self.total_fees += closing_fee
|
|
|
|
# # Calculate gross P&L
|
|
# if side == 'LONG':
|
|
# gross_pnl = (exit_price - entry_price) * size
|
|
# else: # SHORT
|
|
# gross_pnl = (entry_price - exit_price) * size
|
|
|
|
# # Calculate net P&L (after fees)
|
|
# net_pnl = gross_pnl - total_fees
|
|
|
|
# # Update session P&L
|
|
# self.total_pnl += net_pnl
|
|
|
|
# # Track win/loss based on net P&L
|
|
# if net_pnl > 0:
|
|
# self.winning_trades += 1
|
|
# else:
|
|
# self.losing_trades += 1
|
|
|
|
# # Remove position
|
|
# del self.positions[symbol]
|
|
|
|
# logger.info(f"CHART: POSITION CLOSED: {side} {symbol}")
|
|
# logger.info(f"CHART: Entry: ${entry_price:.2f} | Exit: ${exit_price:.2f}")
|
|
# logger.info(f"FEES: Opening: ${opening_fee:.4f} | Closing: ${closing_fee:.4f} | Total: ${total_fees:.4f}")
|
|
# logger.info(f"MONEY: Gross P&L: ${gross_pnl:+.2f} | Net P&L: ${net_pnl:+.2f}")
|
|
|
|
# return net_pnl
|
|
|
|
# def get_win_rate(self) -> float:
|
|
# """Calculate current win rate"""
|
|
# total_closed_trades = self.winning_trades + self.losing_trades
|
|
# if total_closed_trades == 0:
|
|
# return 0.78 # Default win rate
|
|
# return self.winning_trades / total_closed_trades
|
|
|
|
# def get_session_summary(self) -> dict:
|
|
# """Get complete session summary"""
|
|
# return {
|
|
# 'session_id': self.session_id,
|
|
# 'start_time': self.start_time,
|
|
# 'duration': datetime.now() - self.start_time,
|
|
# 'starting_balance': self.starting_balance,
|
|
# 'current_balance': self.current_balance,
|
|
# 'total_pnl': self.total_pnl,
|
|
# 'total_fees': self.total_fees,
|
|
# 'total_trades': self.total_trades,
|
|
# 'winning_trades': self.winning_trades,
|
|
# 'losing_trades': self.losing_trades,
|
|
# 'win_rate': self.get_win_rate(),
|
|
# 'open_positions': len(self.positions),
|
|
# 'trade_history': self.trade_history
|
|
# }
|
|
|
|
# class RealTimeScalpingDashboard:
|
|
# """Real-time scalping dashboard with WebSocket streaming and ultra-low latency"""
|
|
|
|
# def __init__(self, data_provider: DataProvider = None, orchestrator: EnhancedTradingOrchestrator = None, trading_executor: TradingExecutor = None):
|
|
# """Initialize the real-time scalping dashboard with unified data stream"""
|
|
# self.config = get_config()
|
|
# self.data_provider = data_provider or DataProvider()
|
|
# self.orchestrator = orchestrator
|
|
# self.trading_executor = trading_executor
|
|
|
|
# # Initialize timezone (Sofia timezone)
|
|
# import pytz
|
|
# self.timezone = pytz.timezone('Europe/Sofia')
|
|
|
|
# # Initialize unified data stream for centralized data distribution
|
|
# self.unified_stream = UnifiedDataStream(self.data_provider, self.orchestrator)
|
|
|
|
# # Register dashboard as data consumer
|
|
# self.stream_consumer_id = self.unified_stream.register_consumer(
|
|
# consumer_name="ScalpingDashboard",
|
|
# callback=self._handle_unified_stream_data,
|
|
# data_types=['ui_data', 'training_data', 'ticks', 'ohlcv']
|
|
# )
|
|
|
|
# # Dashboard data storage (updated from unified stream)
|
|
# self.tick_cache = deque(maxlen=2500)
|
|
# self.one_second_bars = deque(maxlen=900)
|
|
# self.current_prices = {}
|
|
# self.is_streaming = False
|
|
# self.training_data_available = False
|
|
|
|
# # Enhanced training integration
|
|
# self.latest_training_data: Optional[TrainingDataPacket] = None
|
|
# self.latest_ui_data: Optional[UIDataPacket] = None
|
|
|
|
# # Trading session with MEXC integration
|
|
# self.trading_session = TradingSession(trading_executor=trading_executor)
|
|
|
|
# # Dashboard state
|
|
# self.streaming = False
|
|
# self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.CYBORG])
|
|
|
|
# # Initialize missing attributes for callback functionality
|
|
# self.data_lock = Lock()
|
|
# self.live_prices = {'ETH/USDT': 0.0, 'BTC/USDT': 0.0}
|
|
# self.chart_data = {
|
|
# 'ETH/USDT': {'1s': pd.DataFrame(), '1m': pd.DataFrame(), '1h': pd.DataFrame(), '1d': pd.DataFrame()},
|
|
# 'BTC/USDT': {'1s': pd.DataFrame()}
|
|
# }
|
|
# self.recent_decisions = deque(maxlen=50)
|
|
# self.live_tick_buffer = {
|
|
# 'ETH/USDT': deque(maxlen=1000),
|
|
# 'BTC/USDT': deque(maxlen=1000)
|
|
# }
|
|
# self.max_tick_buffer_size = 1000
|
|
|
|
# # Performance tracking
|
|
# self.callback_performance = {
|
|
# 'total_calls': 0,
|
|
# 'successful_calls': 0,
|
|
# 'avg_duration': 0.0,
|
|
# 'last_update': datetime.now(),
|
|
# 'throttle_active': False,
|
|
# 'throttle_count': 0
|
|
# }
|
|
|
|
# # Throttling configuration
|
|
# self.throttle_threshold = 50 # Max callbacks per minute
|
|
# self.throttle_window = 60 # 1 minute window
|
|
# self.callback_times = deque(maxlen=self.throttle_threshold)
|
|
|
|
# # Initialize throttling attributes
|
|
# self.throttle_level = 0
|
|
# self.update_frequency = 2000 # Start with 2 seconds
|
|
# self.max_frequency = 1000 # Fastest update (1 second)
|
|
# self.min_frequency = 10000 # Slowest update (10 seconds)
|
|
# self.consecutive_fast_updates = 0
|
|
# self.consecutive_slow_updates = 0
|
|
# self.callback_duration_history = []
|
|
# self.last_callback_time = time.time()
|
|
# self.last_known_state = None
|
|
|
|
# # WebSocket threads tracking
|
|
# self.websocket_threads = []
|
|
|
|
# # Setup dashboard
|
|
# self._setup_layout()
|
|
# self._setup_callbacks()
|
|
|
|
# # Start streaming automatically
|
|
# self._initialize_streaming()
|
|
|
|
# logger.info("Real-Time Scalping Dashboard initialized with unified data stream")
|
|
# logger.info(f"Stream consumer ID: {self.stream_consumer_id}")
|
|
# logger.info(f"Enhanced RL training integration: {'ENABLED' if orchestrator else 'DISABLED'}")
|
|
# logger.info(f"MEXC trading: {'ENABLED' if trading_executor and trading_executor.trading_enabled else 'DISABLED'}")
|
|
|
|
# def _initialize_streaming(self):
|
|
# """Initialize streaming and populate initial data"""
|
|
# try:
|
|
# logger.info("Initializing dashboard streaming and data...")
|
|
|
|
# # Start unified data streaming
|
|
# self._start_real_time_streaming()
|
|
|
|
# # Initialize chart data with some basic data
|
|
# self._initialize_chart_data()
|
|
|
|
# # Start background data refresh
|
|
# self._start_background_data_refresh()
|
|
|
|
# logger.info("Dashboard streaming initialized successfully")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error initializing streaming: {e}")
|
|
|
|
# def _initialize_chart_data(self):
|
|
# """Initialize chart data with basic data to prevent empty charts"""
|
|
# try:
|
|
# logger.info("Initializing chart data...")
|
|
|
|
# # Get initial data for charts
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# try:
|
|
# # Get current price
|
|
# current_price = self.data_provider.get_current_price(symbol)
|
|
# if current_price and current_price > 0:
|
|
# self.live_prices[symbol] = current_price
|
|
# logger.info(f"Initial price for {symbol}: ${current_price:.2f}")
|
|
|
|
# # Create initial tick data
|
|
# initial_tick = {
|
|
# 'timestamp': datetime.now(),
|
|
# 'price': current_price,
|
|
# 'volume': 0.0,
|
|
# 'quantity': 0.0,
|
|
# 'side': 'buy',
|
|
# 'open': current_price,
|
|
# 'high': current_price,
|
|
# 'low': current_price,
|
|
# 'close': current_price
|
|
# }
|
|
# self.live_tick_buffer[symbol].append(initial_tick)
|
|
|
|
# except Exception as e:
|
|
# logger.warning(f"Error getting initial price for {symbol}: {e}")
|
|
# # Set default price
|
|
# default_price = 3500.0 if 'ETH' in symbol else 70000.0
|
|
# self.live_prices[symbol] = default_price
|
|
|
|
# # Get initial historical data for charts
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# timeframes = ['1s', '1m', '1h', '1d'] if symbol == 'ETH/USDT' else ['1s']
|
|
|
|
# for timeframe in timeframes:
|
|
# try:
|
|
# # Get historical data
|
|
# data = self.data_provider.get_historical_data(symbol, timeframe, limit=100)
|
|
# if data is not None and not data.empty:
|
|
# self.chart_data[symbol][timeframe] = data
|
|
# logger.info(f"Loaded {len(data)} candles for {symbol} {timeframe}")
|
|
# else:
|
|
# # Create empty DataFrame with proper structure
|
|
# self.chart_data[symbol][timeframe] = pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
|
# logger.warning(f"No data available for {symbol} {timeframe}")
|
|
|
|
# except Exception as e:
|
|
# logger.warning(f"Error loading data for {symbol} {timeframe}: {e}")
|
|
# self.chart_data[symbol][timeframe] = pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
|
|
|
# logger.info("Chart data initialization completed")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error initializing chart data: {e}")
|
|
|
|
# def _start_background_data_refresh(self):
|
|
# """Start background data refresh thread"""
|
|
# def background_refresh():
|
|
# logger.info("Background data refresh thread started")
|
|
|
|
# while True:
|
|
# try:
|
|
# # Refresh live prices
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# try:
|
|
# current_price = self.data_provider.get_current_price(symbol)
|
|
# if current_price and current_price > 0:
|
|
# with self.data_lock:
|
|
# self.live_prices[symbol] = current_price
|
|
|
|
# # Add to tick buffer
|
|
# tick_data = {
|
|
# 'timestamp': datetime.now(),
|
|
# 'price': current_price,
|
|
# 'volume': 0.0,
|
|
# 'quantity': 0.0,
|
|
# 'side': 'buy',
|
|
# 'open': current_price,
|
|
# 'high': current_price,
|
|
# 'low': current_price,
|
|
# 'close': current_price
|
|
# }
|
|
# self.live_tick_buffer[symbol].append(tick_data)
|
|
|
|
# except Exception as e:
|
|
# logger.warning(f"Error refreshing price for {symbol}: {e}")
|
|
|
|
# # Sleep for 5 seconds
|
|
# time.sleep(5)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error in background refresh: {e}")
|
|
# time.sleep(10)
|
|
|
|
# # Start background thread
|
|
# refresh_thread = Thread(target=background_refresh, daemon=True)
|
|
# refresh_thread.start()
|
|
# logger.info("Background data refresh thread started")
|
|
|
|
# def _setup_layout(self):
|
|
# """Setup the ultra-fast real-time dashboard layout"""
|
|
# self.app.layout = html.Div([
|
|
# # Header with live metrics
|
|
# html.Div([
|
|
# html.H1("Enhanced Scalping Dashboard (500x Leverage) - WebSocket + AI",
|
|
# className="text-center mb-4 text-white"),
|
|
# html.P(f"WebSocket Streaming | Model Training | PnL Tracking | Session: ${self.trading_session.starting_balance:.0f} Starting Balance",
|
|
# className="text-center text-info"),
|
|
|
|
# # Session info row
|
|
# html.Div([
|
|
# html.Div([
|
|
# html.H4(f"Session: {self.trading_session.session_id}", className="text-warning"),
|
|
# html.P("Session ID", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H4(f"${self.trading_session.starting_balance:.0f}", className="text-primary"),
|
|
# html.P("Starting Balance", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H4(id="current-balance", className="text-success"),
|
|
# html.P("Current Balance", className="text-white"),
|
|
# html.Small(id="account-details", className="text-muted")
|
|
# ], className="col-md-3 text-center"), # Increased from col-md-2
|
|
|
|
# html.Div([
|
|
# html.H4(id="session-duration", className="text-info"),
|
|
# html.P("Session Time", className="text-white")
|
|
# ], className="col-md-3 text-center"), # Increased from col-md-2
|
|
|
|
# html.Div([
|
|
# html.Div(id="open-positions", className="text-warning"),
|
|
# html.P("Open Positions", className="text-white")
|
|
# ], className="col-md-3 text-center"), # Increased from col-md-2 to col-md-3 for more space
|
|
|
|
# html.Div([
|
|
# html.H4("500x", className="text-danger"),
|
|
# html.P("Leverage", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H4(id="mexc-status", className="text-info"),
|
|
# html.P("MEXC API", className="text-white")
|
|
# ], className="col-md-2 text-center")
|
|
# ], className="row mb-3"),
|
|
|
|
# # Live metrics row (split layout)
|
|
# html.Div([
|
|
# # Left side - Key metrics (4 columns, 8/12 width)
|
|
# html.Div([
|
|
# html.Div([
|
|
# html.H3(id="live-pnl", className="text-success"),
|
|
# html.P("Session P&L", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H3(id="total-fees", className="text-warning"),
|
|
# html.P("Total Fees", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H3(id="win-rate", className="text-info"),
|
|
# html.P("Win Rate", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H3(id="total-trades", className="text-primary"),
|
|
# html.P("Total Trades", className="text-white")
|
|
# ], className="col-md-2 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H3(id="last-action", className="text-warning"),
|
|
# html.P("Last Action", className="text-white")
|
|
# ], className="col-md-4 text-center")
|
|
# ], className="col-md-4"),
|
|
|
|
# # Middle - Price displays (2 columns, 2/12 width)
|
|
# html.Div([
|
|
# html.Div([
|
|
# html.H3(id="eth-price", className="text-success"),
|
|
# html.P("ETH/USDT LIVE", className="text-white")
|
|
# ], className="col-md-6 text-center"),
|
|
|
|
# html.Div([
|
|
# html.H3(id="btc-price", className="text-success"),
|
|
# html.P("BTC/USDT LIVE", className="text-white")
|
|
# ], className="col-md-6 text-center")
|
|
# ], className="col-md-2"),
|
|
|
|
# # Right side - Recent Trading Actions (6/12 width)
|
|
# html.Div([
|
|
# html.H5("Recent Trading Signals & Executions", className="text-center mb-2 text-warning"),
|
|
# html.Div(id="actions-log", style={"height": "120px", "overflowY": "auto", "backgroundColor": "rgba(0,0,0,0.3)", "padding": "10px", "borderRadius": "5px"})
|
|
# ], className="col-md-6")
|
|
# ], className="row mb-4")
|
|
# ], className="bg-dark p-3 mb-3"),
|
|
|
|
# # Main 1s ETH/USDT chart (full width) - WebSocket Streaming
|
|
# html.Div([
|
|
# html.H4("ETH/USDT WebSocket Live Ticks (Ultra-Fast Updates)",
|
|
# className="text-center mb-3"),
|
|
# dcc.Graph(id="main-eth-1s-chart", style={"height": "600px"})
|
|
# ], className="mb-4"),
|
|
|
|
# # Row of 4 small charts - Mixed WebSocket and Cached
|
|
# html.Div([
|
|
# html.Div([
|
|
# html.H6("ETH/USDT 1m (Cached)", className="text-center"),
|
|
# dcc.Graph(id="eth-1m-chart", style={"height": "300px"})
|
|
# ], className="col-md-3"),
|
|
|
|
# html.Div([
|
|
# html.H6("ETH/USDT 1h (Cached)", className="text-center"),
|
|
# dcc.Graph(id="eth-1h-chart", style={"height": "300px"})
|
|
# ], className="col-md-3"),
|
|
|
|
# html.Div([
|
|
# html.H6("ETH/USDT 1d (Cached)", className="text-center"),
|
|
# dcc.Graph(id="eth-1d-chart", style={"height": "300px"})
|
|
# ], className="col-md-3"),
|
|
|
|
# html.Div([
|
|
# html.H6("BTC/USDT WebSocket Ticks", className="text-center"),
|
|
# dcc.Graph(id="btc-1s-chart", style={"height": "300px"})
|
|
# ], className="col-md-3")
|
|
# ], className="row mb-4"),
|
|
|
|
# # Model Training & Orchestrator Status
|
|
# html.Div([
|
|
# html.Div([
|
|
# html.H5("Model Training Progress", className="text-center mb-3 text-warning"),
|
|
# html.Div(id="model-training-status")
|
|
# ], className="col-md-6"),
|
|
|
|
# html.Div([
|
|
# html.H5("Orchestrator Data Flow", className="text-center mb-3 text-info"),
|
|
# html.Div(id="orchestrator-status")
|
|
# ], className="col-md-6")
|
|
# ], className="row mb-4"),
|
|
|
|
# # RL & CNN Events Log
|
|
# html.Div([
|
|
# html.H5("RL & CNN Training Events (Real-Time)", className="text-center mb-3 text-success"),
|
|
# html.Div(id="training-events-log")
|
|
# ], className="mb-4"),
|
|
|
|
|
|
|
|
# # Dynamic interval - adjusts based on system performance
|
|
# dcc.Interval(
|
|
# id='ultra-fast-interval',
|
|
# interval=2000, # Start with 2 seconds for stability
|
|
# n_intervals=0
|
|
# ),
|
|
|
|
# # Debug info panel (hidden by default)
|
|
# html.Div([
|
|
# html.H6("Debug Info (Open Browser Console for detailed logs)", className="text-warning"),
|
|
# html.P("Use browser console commands:", className="text-muted"),
|
|
# html.P("- getDashDebugInfo() - Get all debug data", className="text-muted"),
|
|
# html.P("- clearDashLogs() - Clear debug logs", className="text-muted"),
|
|
# html.P("- window.dashLogs - View all logs", className="text-muted"),
|
|
# html.Div(id="debug-status", className="text-info")
|
|
# ], className="mt-4 p-3 border border-warning", style={"display": "block"})
|
|
# ], className="container-fluid bg-dark")
|
|
|
|
# def _setup_callbacks(self):
|
|
# """Setup ultra-fast callbacks with real-time streaming data"""
|
|
|
|
# # Store reference to self for callback access
|
|
# dashboard_instance = self
|
|
|
|
# # Initialize last known state
|
|
# self.last_known_state = None
|
|
|
|
# # Reset throttling to ensure fresh start
|
|
# self._reset_throttling()
|
|
|
|
# @self.app.callback(
|
|
# [
|
|
# Output('current-balance', 'children'),
|
|
# Output('account-details', 'children'),
|
|
# Output('session-duration', 'children'),
|
|
# Output('open-positions', 'children'),
|
|
# Output('live-pnl', 'children'),
|
|
# Output('total-fees', 'children'),
|
|
# Output('win-rate', 'children'),
|
|
# Output('total-trades', 'children'),
|
|
# Output('last-action', 'children'),
|
|
# Output('eth-price', 'children'),
|
|
# Output('btc-price', 'children'),
|
|
# Output('mexc-status', 'children'),
|
|
# Output('main-eth-1s-chart', 'figure'),
|
|
# Output('eth-1m-chart', 'figure'),
|
|
# Output('eth-1h-chart', 'figure'),
|
|
# Output('eth-1d-chart', 'figure'),
|
|
# Output('btc-1s-chart', 'figure'),
|
|
# Output('model-training-status', 'children'),
|
|
# Output('orchestrator-status', 'children'),
|
|
# Output('training-events-log', 'children'),
|
|
# Output('actions-log', 'children'),
|
|
# Output('debug-status', 'children')
|
|
# ],
|
|
# [Input('ultra-fast-interval', 'n_intervals')]
|
|
# )
|
|
# def update_real_time_dashboard(n_intervals):
|
|
# """Update all components with real-time streaming data with dynamic throttling"""
|
|
# start_time = time.time()
|
|
|
|
# try:
|
|
# # Dynamic throttling logic
|
|
# should_update, throttle_reason = dashboard_instance._should_update_now(n_intervals)
|
|
|
|
# if not should_update:
|
|
# logger.debug(f"Callback #{n_intervals} throttled: {throttle_reason}")
|
|
# # Return current state without processing
|
|
# return dashboard_instance._get_last_known_state()
|
|
|
|
# logger.info(f"Dashboard callback triggered, interval: {n_intervals} (freq: {dashboard_instance.update_frequency}ms, throttle: {dashboard_instance.throttle_level})")
|
|
|
|
# # Log the current state
|
|
# logger.info(f"Data lock acquired, processing update...")
|
|
# logger.info(f"Trading session: {dashboard_instance.trading_session.session_id}")
|
|
# logger.info(f"Live prices: ETH={dashboard_instance.live_prices.get('ETH/USDT', 0)}, BTC={dashboard_instance.live_prices.get('BTC/USDT', 0)}")
|
|
|
|
# with dashboard_instance.data_lock:
|
|
# # Calculate session duration
|
|
# duration = datetime.now() - dashboard_instance.trading_session.start_time
|
|
# duration_str = f"{int(duration.total_seconds()//3600):02d}:{int((duration.total_seconds()%3600)//60):02d}:{int(duration.total_seconds()%60):02d}"
|
|
|
|
# # Update session metrics
|
|
# current_balance = f"${dashboard_instance.trading_session.current_balance:.2f}"
|
|
|
|
# # Account details
|
|
# balance_change = dashboard_instance.trading_session.current_balance - dashboard_instance.trading_session.starting_balance
|
|
# balance_change_pct = (balance_change / dashboard_instance.trading_session.starting_balance) * 100
|
|
# account_details = f"Change: ${balance_change:+.2f} ({balance_change_pct:+.1f}%)"
|
|
|
|
# # Create color-coded position display
|
|
# positions = dashboard_instance.trading_session.positions
|
|
# if positions:
|
|
# position_displays = []
|
|
# for symbol, pos in positions.items():
|
|
# side = pos['side']
|
|
# size = pos['size']
|
|
# entry_price = pos['entry_price']
|
|
# current_price = dashboard_instance.live_prices.get(symbol, entry_price)
|
|
|
|
# # Calculate unrealized P&L
|
|
# if side == 'LONG':
|
|
# unrealized_pnl = (current_price - entry_price) * size
|
|
# color_class = "text-success" # Green for LONG
|
|
# side_display = "[LONG]"
|
|
# else: # SHORT
|
|
# unrealized_pnl = (entry_price - current_price) * size
|
|
# color_class = "text-danger" # Red for SHORT
|
|
# side_display = "[SHORT]"
|
|
|
|
# position_text = f"{side_display} {size:.3f} @ ${entry_price:.2f} | P&L: ${unrealized_pnl:+.2f}"
|
|
# position_displays.append(html.P(position_text, className=f"{color_class} mb-1"))
|
|
|
|
# open_positions = html.Div(position_displays)
|
|
# else:
|
|
# open_positions = html.P("No open positions", className="text-muted")
|
|
|
|
# pnl = f"${dashboard_instance.trading_session.total_pnl:+.2f}"
|
|
# total_fees = f"${dashboard_instance.trading_session.total_fees:.2f}"
|
|
# win_rate = f"{dashboard_instance.trading_session.get_win_rate()*100:.1f}%"
|
|
# total_trades = str(dashboard_instance.trading_session.total_trades)
|
|
# last_action = dashboard_instance.trading_session.last_action or "WAITING"
|
|
|
|
# # Live prices from WebSocket stream
|
|
# eth_price = f"${dashboard_instance.live_prices['ETH/USDT']:.2f}" if dashboard_instance.live_prices['ETH/USDT'] > 0 else "Loading..."
|
|
# btc_price = f"${dashboard_instance.live_prices['BTC/USDT']:.2f}" if dashboard_instance.live_prices['BTC/USDT'] > 0 else "Loading..."
|
|
|
|
# # MEXC status
|
|
# if dashboard_instance.trading_executor and dashboard_instance.trading_executor.trading_enabled:
|
|
# mexc_status = "LIVE"
|
|
# elif dashboard_instance.trading_executor and dashboard_instance.trading_executor.simulation_mode:
|
|
# mexc_status = f"{dashboard_instance.trading_executor.trading_mode.upper()} MODE"
|
|
# else:
|
|
# mexc_status = "OFFLINE"
|
|
|
|
# # Create real-time charts - use WebSocket tick buffer for main chart and BTC
|
|
# try:
|
|
# main_eth_chart = dashboard_instance._create_main_tick_chart('ETH/USDT')
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating main ETH chart: {e}")
|
|
# main_eth_chart = dashboard_instance._create_empty_chart("ETH/USDT Main Chart Error")
|
|
|
|
# try:
|
|
# # Use cached data for 1m chart to reduce API calls
|
|
# eth_1m_chart = dashboard_instance._create_cached_chart('ETH/USDT', '1m')
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating ETH 1m chart: {e}")
|
|
# eth_1m_chart = dashboard_instance._create_empty_chart("ETH/USDT 1m Chart Error")
|
|
|
|
# try:
|
|
# # Use cached data for 1h chart to reduce API calls
|
|
# eth_1h_chart = dashboard_instance._create_cached_chart('ETH/USDT', '1h')
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating ETH 1h chart: {e}")
|
|
# eth_1h_chart = dashboard_instance._create_empty_chart("ETH/USDT 1h Chart Error")
|
|
|
|
# try:
|
|
# # Use cached data for 1d chart to reduce API calls
|
|
# eth_1d_chart = dashboard_instance._create_cached_chart('ETH/USDT', '1d')
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating ETH 1d chart: {e}")
|
|
# eth_1d_chart = dashboard_instance._create_empty_chart("ETH/USDT 1d Chart Error")
|
|
|
|
# try:
|
|
# # Use WebSocket tick buffer for BTC chart
|
|
# btc_1s_chart = dashboard_instance._create_main_tick_chart('BTC/USDT')
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating BTC 1s chart: {e}")
|
|
# btc_1s_chart = dashboard_instance._create_empty_chart("BTC/USDT 1s Chart Error")
|
|
|
|
# # Model training status
|
|
# model_training_status = dashboard_instance._create_model_training_status()
|
|
|
|
# # Orchestrator status
|
|
# orchestrator_status = dashboard_instance._create_orchestrator_status()
|
|
|
|
# # Training events log
|
|
# training_events_log = dashboard_instance._create_training_events_log()
|
|
|
|
# # Live actions log
|
|
# actions_log = dashboard_instance._create_live_actions_log()
|
|
|
|
# # Debug status
|
|
# debug_status = html.Div([
|
|
# html.P(f"Server Callback #{n_intervals} at {datetime.now().strftime('%H:%M:%S')}", className="text-success"),
|
|
# html.P(f"Session: {dashboard_instance.trading_session.session_id}", className="text-info"),
|
|
# html.P(f"Live Prices: ETH=${dashboard_instance.live_prices.get('ETH/USDT', 0):.2f}, BTC=${dashboard_instance.live_prices.get('BTC/USDT', 0):.2f}", className="text-info"),
|
|
# html.P(f"Chart Data: ETH/1s={len(dashboard_instance.chart_data.get('ETH/USDT', {}).get('1s', []))} candles", className="text-info")
|
|
# ])
|
|
|
|
# # Log what we're returning
|
|
# logger.info(f"Callback returning: balance={current_balance}, duration={duration_str}, positions={open_positions}")
|
|
# logger.info(f"Charts created: main_eth={type(main_eth_chart)}, eth_1m={type(eth_1m_chart)}")
|
|
|
|
# # Track performance and adjust throttling
|
|
# callback_duration = time.time() - start_time
|
|
# dashboard_instance._track_callback_performance(callback_duration, success=True)
|
|
|
|
# # Store last known state for throttling
|
|
# result = (
|
|
# current_balance, account_details, duration_str, open_positions, pnl, total_fees, win_rate, total_trades, last_action, eth_price, btc_price, mexc_status,
|
|
# main_eth_chart, eth_1m_chart, eth_1h_chart, eth_1d_chart, btc_1s_chart,
|
|
# model_training_status, orchestrator_status, training_events_log, actions_log, debug_status
|
|
# )
|
|
# dashboard_instance.last_known_state = result
|
|
|
|
# return result
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error in real-time update: {e}")
|
|
# import traceback
|
|
# logger.error(f"Traceback: {traceback.format_exc()}")
|
|
|
|
# # Track error performance
|
|
# callback_duration = time.time() - start_time
|
|
# dashboard_instance._track_callback_performance(callback_duration, success=False)
|
|
|
|
# # Return safe fallback values
|
|
# empty_fig = {
|
|
# 'data': [],
|
|
# 'layout': {
|
|
# 'template': 'plotly_dark',
|
|
# 'title': 'Error loading chart',
|
|
# 'paper_bgcolor': '#1e1e1e',
|
|
# 'plot_bgcolor': '#1e1e1e'
|
|
# }
|
|
# }
|
|
|
|
# error_debug = html.Div([
|
|
# html.P(f"ERROR in callback #{n_intervals}", className="text-danger"),
|
|
# html.P(f"Error: {str(e)}", className="text-danger"),
|
|
# html.P(f"Throttle Level: {dashboard_instance.throttle_level}", className="text-warning"),
|
|
# html.P(f"Update Frequency: {dashboard_instance.update_frequency}ms", className="text-info")
|
|
# ])
|
|
|
|
# error_result = (
|
|
# "$100.00", "Change: $0.00 (0.0%)", "00:00:00", "0", "$0.00", "$0.00", "0%", "0", "INIT", "Loading...", "Loading...", "OFFLINE",
|
|
# empty_fig, empty_fig, empty_fig, empty_fig, empty_fig,
|
|
# "Initializing models...", "Starting orchestrator...", "Loading events...",
|
|
# "Waiting for data...", error_debug
|
|
# )
|
|
|
|
# # Store error state as last known state
|
|
# def _track_callback_performance(self, duration, success=True):
|
|
# """Track callback performance and adjust throttling dynamically"""
|
|
# self.last_callback_time = time.time()
|
|
# self.callback_duration_history.append(duration)
|
|
|
|
# # Keep only last 20 measurements
|
|
# if len(self.callback_duration_history) > 20:
|
|
# self.callback_duration_history.pop(0)
|
|
|
|
# # Calculate average performance
|
|
# avg_duration = sum(self.callback_duration_history) / len(self.callback_duration_history)
|
|
|
|
# # Define performance thresholds - more lenient
|
|
# fast_threshold = 1.0 # Under 1.0 seconds is fast
|
|
# slow_threshold = 3.0 # Over 3.0 seconds is slow
|
|
# critical_threshold = 8.0 # Over 8.0 seconds is critical
|
|
|
|
# # Adjust throttling based on performance
|
|
# if duration > critical_threshold or not success:
|
|
# # Critical performance issue - increase throttling significantly
|
|
# self.throttle_level = min(3, self.throttle_level + 1) # Max level 3, increase by 1
|
|
# self.update_frequency = min(self.min_frequency, self.update_frequency * 1.3)
|
|
# self.consecutive_slow_updates += 1
|
|
# self.consecutive_fast_updates = 0
|
|
# logger.warning(f"CRITICAL PERFORMANCE: {duration:.2f}s - Throttle level: {self.throttle_level}, Frequency: {self.update_frequency}ms")
|
|
|
|
# elif duration > slow_threshold or avg_duration > slow_threshold:
|
|
# # Slow performance - increase throttling moderately
|
|
# if self.consecutive_slow_updates >= 2: # Only throttle after 2 consecutive slow updates
|
|
# self.throttle_level = min(3, self.throttle_level + 1)
|
|
# self.update_frequency = min(self.min_frequency, self.update_frequency * 1.1)
|
|
# logger.info(f"SLOW PERFORMANCE: {duration:.2f}s (avg: {avg_duration:.2f}s) - Throttle level: {self.throttle_level}")
|
|
# self.consecutive_slow_updates += 1
|
|
# self.consecutive_fast_updates = 0
|
|
|
|
# elif duration < fast_threshold and avg_duration < fast_threshold:
|
|
# # Good performance - reduce throttling
|
|
# self.consecutive_fast_updates += 1
|
|
# self.consecutive_slow_updates = 0
|
|
|
|
# # Only reduce throttling after several consecutive fast updates
|
|
# if self.consecutive_fast_updates >= 3: # Reduced from 5 to 3
|
|
# if self.throttle_level > 0:
|
|
# self.throttle_level = max(0, self.throttle_level - 1)
|
|
# logger.info(f"GOOD PERFORMANCE: {duration:.2f}s - Reduced throttle level to: {self.throttle_level}")
|
|
|
|
# # Increase update frequency if throttle level is low
|
|
# if self.throttle_level == 0:
|
|
# self.update_frequency = max(self.max_frequency, self.update_frequency * 0.95)
|
|
# logger.info(f"OPTIMIZING: Increased frequency to {self.update_frequency}ms")
|
|
|
|
# self.consecutive_fast_updates = 0 # Reset counter
|
|
|
|
# # Log performance summary every 10 callbacks
|
|
# if len(self.callback_duration_history) % 10 == 0:
|
|
# logger.info(f"PERFORMANCE SUMMARY: Avg: {avg_duration:.2f}s, Throttle: {self.throttle_level}, Frequency: {self.update_frequency}ms")
|
|
|
|
# def _should_update_now(self, n_intervals):
|
|
# """Check if dashboard should update now based on throttling"""
|
|
# current_time = time.time()
|
|
|
|
# # Always allow first few updates
|
|
# if n_intervals <= 3:
|
|
# return True, "Initial updates"
|
|
|
|
# # Check if enough time has passed based on update frequency
|
|
# time_since_last = (current_time - self.last_callback_time) * 1000 # Convert to ms
|
|
# if time_since_last < self.update_frequency:
|
|
# return False, f"Throttled: {time_since_last:.0f}ms < {self.update_frequency}ms"
|
|
|
|
# # Check throttle level
|
|
# if self.throttle_level > 0:
|
|
# # Skip some updates based on throttle level
|
|
# if n_intervals % (self.throttle_level + 1) != 0:
|
|
# return False, f"Throttle level {self.throttle_level}: skipping interval {n_intervals}"
|
|
|
|
# return True, "Update allowed"
|
|
|
|
# def _get_last_known_state(self):
|
|
# """Get last known state for throttled updates"""
|
|
# if self.last_known_state:
|
|
# return self.last_known_state
|
|
|
|
# # Return safe default state
|
|
# empty_fig = {
|
|
# 'data': [],
|
|
# 'layout': {
|
|
# 'template': 'plotly_dark',
|
|
# 'title': 'Loading...',
|
|
# 'paper_bgcolor': '#1e1e1e',
|
|
# 'plot_bgcolor': '#1e1e1e'
|
|
# }
|
|
# }
|
|
|
|
# return (
|
|
# "$100.00", "Change: $0.00 (0.0%)", "00:00:00", "No positions", "$0.00", "$0.00", "0.0%", "0", "WAITING",
|
|
# "Loading...", "Loading...", "OFFLINE",
|
|
# empty_fig, empty_fig, empty_fig, empty_fig, empty_fig,
|
|
# "Initializing...", "Starting...", "Loading...", "Waiting...",
|
|
# html.P("Initializing dashboard...", className="text-info")
|
|
# )
|
|
|
|
# def _reset_throttling(self):
|
|
# """Reset throttling to optimal settings"""
|
|
# self.throttle_level = 0
|
|
# self.update_frequency = 2000 # Start conservative
|
|
# self.consecutive_fast_updates = 0
|
|
# self.consecutive_slow_updates = 0
|
|
# self.callback_duration_history = []
|
|
# logger.info(f"THROTTLING RESET: Level=0, Frequency={self.update_frequency}ms")
|
|
|
|
# def _start_real_time_streaming(self):
|
|
# """Start real-time streaming using unified data stream"""
|
|
# def start_streaming():
|
|
# try:
|
|
# logger.info("Starting unified data stream for dashboard")
|
|
|
|
# # Start unified data streaming
|
|
# asyncio.run(self.unified_stream.start_streaming())
|
|
|
|
# # Start orchestrator trading if available
|
|
# if self.orchestrator:
|
|
# self._start_orchestrator_trading()
|
|
|
|
# # Start enhanced training data collection
|
|
# self._start_training_data_collection()
|
|
|
|
# logger.info("Unified data streaming started successfully")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error starting unified data streaming: {e}")
|
|
|
|
# # Start streaming in background thread
|
|
# streaming_thread = Thread(target=start_streaming, daemon=True)
|
|
# streaming_thread.start()
|
|
|
|
# # Set streaming flag
|
|
# self.streaming = True
|
|
# logger.info("Real-time streaming initiated with unified data stream")
|
|
|
|
# def _handle_data_provider_tick(self, tick: MarketTick):
|
|
# """Handle tick data from DataProvider"""
|
|
# try:
|
|
# # Convert symbol format (ETHUSDT -> ETH/USDT)
|
|
# if '/' not in tick.symbol:
|
|
# formatted_symbol = f"{tick.symbol[:3]}/{tick.symbol[3:]}"
|
|
# else:
|
|
# formatted_symbol = tick.symbol
|
|
|
|
# with self.data_lock:
|
|
# # Update live prices
|
|
# self.live_prices[formatted_symbol] = tick.price
|
|
|
|
# # Add to tick buffer for real-time chart
|
|
# tick_entry = {
|
|
# 'timestamp': tick.timestamp,
|
|
# 'price': tick.price,
|
|
# 'volume': tick.volume,
|
|
# 'quantity': tick.quantity,
|
|
# 'side': tick.side,
|
|
# 'open': tick.price,
|
|
# 'high': tick.price,
|
|
# 'low': tick.price,
|
|
# 'close': tick.price,
|
|
# 'trade_id': tick.trade_id
|
|
# }
|
|
|
|
# # Add to buffer and maintain size
|
|
# self.live_tick_buffer[formatted_symbol].append(tick_entry)
|
|
# if len(self.live_tick_buffer[formatted_symbol]) > self.max_tick_buffer_size:
|
|
# self.live_tick_buffer[formatted_symbol].pop(0)
|
|
|
|
# # Log every 200th tick to avoid spam
|
|
# if len(self.live_tick_buffer[formatted_symbol]) % 200 == 0:
|
|
# logger.info(f"DATAPROVIDER TICK: {formatted_symbol}: ${tick.price:.2f} | Vol: ${tick.volume:.2f} | Buffer: {len(self.live_tick_buffer[formatted_symbol])} ticks")
|
|
|
|
# except Exception as e:
|
|
# logger.warning(f"Error processing DataProvider tick: {e}")
|
|
|
|
# def _background_data_updater(self):
|
|
# """Periodically refresh live data and process orchestrator decisions in the background"""
|
|
# logger.info("Background data updater thread started.")
|
|
# while self.streaming:
|
|
# try:
|
|
# self._refresh_live_data()
|
|
# # Orchestrator decisions are now handled by its own loop in _start_orchestrator_trading
|
|
# time.sleep(10) # Refresh data every 10 seconds
|
|
# except Exception as e:
|
|
# logger.error(f"Error in background data updater: {e}")
|
|
# time.sleep(5) # Wait before retrying on error
|
|
|
|
# def _http_price_polling(self):
|
|
# """HTTP polling for price updates and tick buffer population"""
|
|
# logger.info("Starting HTTP price polling for live data")
|
|
|
|
# while self.streaming:
|
|
# try:
|
|
# # Poll prices every 1 second for better responsiveness
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# try:
|
|
# # Get current price via data provider
|
|
# current_price = self.data_provider.get_current_price(symbol)
|
|
# if current_price and current_price > 0:
|
|
# timestamp = datetime.now()
|
|
|
|
# with self.data_lock:
|
|
# # Update live prices
|
|
# self.live_prices[symbol] = current_price
|
|
|
|
# # Add to tick buffer for charts (HTTP polling data)
|
|
# tick_entry = {
|
|
# 'timestamp': timestamp,
|
|
# 'price': current_price,
|
|
# 'volume': 0.0, # No volume data from HTTP polling
|
|
# 'open': current_price,
|
|
# 'high': current_price,
|
|
# 'low': current_price,
|
|
# 'close': current_price
|
|
# }
|
|
|
|
# # Add to buffer and maintain size
|
|
# self.live_tick_buffer[symbol].append(tick_entry)
|
|
# if len(self.live_tick_buffer[symbol]) > self.max_tick_buffer_size:
|
|
# self.live_tick_buffer[symbol].pop(0)
|
|
|
|
# logger.debug(f"HTTP: {symbol}: ${current_price:.2f} (buffer: {len(self.live_tick_buffer[symbol])} ticks)")
|
|
# except Exception as e:
|
|
# logger.warning(f"Error fetching HTTP price for {symbol}: {e}")
|
|
|
|
# time.sleep(1) # Poll every 1 second for better responsiveness
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"HTTP polling error: {e}")
|
|
# time.sleep(3)
|
|
|
|
# def _websocket_price_stream(self, symbol: str):
|
|
# """WebSocket stream for real-time tick data using trade stream for better granularity"""
|
|
# # Use trade stream instead of ticker for real tick data
|
|
# url = f"wss://stream.binance.com:9443/ws/{symbol.lower()}@trade"
|
|
|
|
# while self.streaming:
|
|
# try:
|
|
# # Use synchronous approach to avoid asyncio issues
|
|
# import websocket
|
|
|
|
# def on_message(ws, message):
|
|
# try:
|
|
# trade_data = json.loads(message)
|
|
|
|
# # Extract trade data (more granular than ticker)
|
|
# price = float(trade_data.get('p', 0)) # Trade price
|
|
# quantity = float(trade_data.get('q', 0)) # Trade quantity
|
|
# timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000) # Trade time
|
|
# is_buyer_maker = trade_data.get('m', False) # True if buyer is market maker
|
|
|
|
# # Calculate volume in USDT
|
|
# volume_usdt = price * quantity
|
|
|
|
# # Update live prices and tick buffer
|
|
# with self.data_lock:
|
|
# formatted_symbol = f"{symbol[:3]}/{symbol[3:]}"
|
|
# self.live_prices[formatted_symbol] = price
|
|
|
|
# # Add to tick buffer for real-time chart with proper trade data
|
|
# tick_entry = {
|
|
# 'timestamp': timestamp,
|
|
# 'price': price,
|
|
# 'volume': volume_usdt,
|
|
# 'quantity': quantity,
|
|
# 'side': 'sell' if is_buyer_maker else 'buy', # Market taker side
|
|
# 'open': price, # For tick data, OHLC are same as current price
|
|
# 'high': price,
|
|
# 'low': price,
|
|
# 'close': price
|
|
# }
|
|
|
|
# # Add to buffer and maintain size
|
|
# self.live_tick_buffer[formatted_symbol].append(tick_entry)
|
|
# if len(self.live_tick_buffer[formatted_symbol]) > self.max_tick_buffer_size:
|
|
# self.live_tick_buffer[formatted_symbol].pop(0)
|
|
|
|
# # Log every 100th tick to avoid spam
|
|
# if len(self.live_tick_buffer[formatted_symbol]) % 100 == 0:
|
|
# logger.info(f"WS TRADE: {formatted_symbol}: ${price:.2f} | Vol: ${volume_usdt:.2f} | Buffer: {len(self.live_tick_buffer[formatted_symbol])} ticks")
|
|
|
|
# except Exception as e:
|
|
# logger.warning(f"Error processing WebSocket trade data for {symbol}: {e}")
|
|
|
|
# def on_error(ws, error):
|
|
# logger.warning(f"WebSocket trade stream error for {symbol}: {error}")
|
|
|
|
# def on_close(ws, close_status_code, close_msg):
|
|
# logger.info(f"WebSocket trade stream closed for {symbol}: {close_status_code}")
|
|
|
|
# def on_open(ws):
|
|
# logger.info(f"WebSocket trade stream connected for {symbol}")
|
|
|
|
# # Create WebSocket connection
|
|
# ws = websocket.WebSocketApp(url,
|
|
# on_message=on_message,
|
|
# on_error=on_error,
|
|
# on_close=on_close,
|
|
# on_open=on_open)
|
|
|
|
# # Run WebSocket with ping/pong for connection health
|
|
# ws.run_forever(ping_interval=20, ping_timeout=10)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"WebSocket trade stream connection error for {symbol}: {e}")
|
|
# if self.streaming:
|
|
# logger.info(f"Reconnecting WebSocket trade stream for {symbol} in 5 seconds...")
|
|
# time.sleep(5)
|
|
|
|
# def _refresh_live_data(self):
|
|
# """Refresh live data for all charts using proven working method"""
|
|
# logger.info("REFRESH: Refreshing LIVE data for all charts...")
|
|
|
|
# # Use the proven working approach - try multiple timeframes with fallbacks
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# if symbol == 'ETH/USDT':
|
|
# timeframes = ['1s', '1m', '1h', '1d']
|
|
# else:
|
|
# timeframes = ['1s']
|
|
|
|
# for timeframe in timeframes:
|
|
# try:
|
|
# # Try fresh data first
|
|
# limit = 100 if timeframe == '1s' else 50 if timeframe == '1m' else 30
|
|
# fresh_data = self.data_provider.get_historical_data(symbol, timeframe, limit=limit, refresh=True)
|
|
|
|
# if fresh_data is not None and not fresh_data.empty and len(fresh_data) > 5:
|
|
# with self.data_lock:
|
|
# # Initialize structure if needed
|
|
# if symbol not in self.chart_data:
|
|
# self.chart_data[symbol] = {}
|
|
# self.chart_data[symbol][timeframe] = fresh_data
|
|
# logger.info(f"SUCCESS: Updated {symbol} {timeframe} with {len(fresh_data)} LIVE candles")
|
|
# else:
|
|
# # Fallback to cached data
|
|
# logger.warning(f"WARN: No fresh data for {symbol} {timeframe}, trying cached")
|
|
# cached_data = self.data_provider.get_historical_data(symbol, timeframe, limit=200, refresh=False)
|
|
|
|
# if cached_data is not None and not cached_data.empty:
|
|
# with self.data_lock:
|
|
# if symbol not in self.chart_data:
|
|
# self.chart_data[symbol] = {}
|
|
# self.chart_data[symbol][timeframe] = cached_data
|
|
# logger.info(f"CACHE: Using cached data for {symbol} {timeframe} ({len(cached_data)} candles)")
|
|
# else:
|
|
# # No data available - use empty DataFrame
|
|
# logger.warning(f"NO DATA: No data available for {symbol} {timeframe}")
|
|
# with self.data_lock:
|
|
# if symbol not in self.chart_data:
|
|
# self.chart_data[symbol] = {}
|
|
# self.chart_data[symbol][timeframe] = pd.DataFrame()
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"ERROR: Failed to refresh {symbol} {timeframe}: {e}")
|
|
# # Use empty DataFrame as fallback
|
|
# with self.data_lock:
|
|
# if symbol not in self.chart_data:
|
|
# self.chart_data[symbol] = {}
|
|
# self.chart_data[symbol][timeframe] = pd.DataFrame()
|
|
|
|
# logger.info("REFRESH: LIVE data refresh complete")
|
|
|
|
# def _fetch_fresh_candles(self, symbol: str, timeframe: str, limit: int = 200) -> pd.DataFrame:
|
|
# """Fetch fresh candles with NO caching - always real data"""
|
|
# try:
|
|
# # Force fresh data fetch - NO CACHE
|
|
# df = self.data_provider.get_historical_data(
|
|
# symbol=symbol,
|
|
# timeframe=timeframe,
|
|
# limit=limit,
|
|
# refresh=True # Force fresh data - critical for real-time
|
|
# )
|
|
# if df is None or df.empty:
|
|
# logger.warning(f"No fresh data available for {symbol} {timeframe}")
|
|
# return pd.DataFrame()
|
|
|
|
# logger.info(f"Fetched {len(df)} fresh candles for {symbol} {timeframe}")
|
|
# return df.tail(limit)
|
|
# except Exception as e:
|
|
# logger.error(f"Error fetching fresh candles for {symbol} {timeframe}: {e}")
|
|
# return pd.DataFrame()
|
|
|
|
|
|
|
|
# def _create_live_chart(self, symbol: str, timeframe: str, main_chart: bool = False):
|
|
# """Create charts with real-time streaming data using proven working method"""
|
|
# try:
|
|
# # Simplified approach - get data with fallbacks
|
|
# data = None
|
|
|
|
# # Try cached data first (faster)
|
|
# try:
|
|
# with self.data_lock:
|
|
# if symbol in self.chart_data and timeframe in self.chart_data[symbol]:
|
|
# data = self.chart_data[symbol][timeframe].copy()
|
|
# if not data.empty and len(data) > 5:
|
|
# logger.debug(f"[CACHED] Using cached data for {symbol} {timeframe} ({len(data)} candles)")
|
|
# except Exception as e:
|
|
# logger.warning(f"[ERROR] Error getting cached data: {e}")
|
|
|
|
# # If no cached data, return empty chart
|
|
# if data is None or data.empty:
|
|
# logger.debug(f"NO DATA: No data available for {symbol} {timeframe}")
|
|
# return self._create_empty_chart(f"{symbol} {timeframe} - No Data Available")
|
|
|
|
# # Ensure we have valid data
|
|
# if data is None or data.empty:
|
|
# return self._create_empty_chart(f"{symbol} {timeframe} - No Data")
|
|
|
|
# # Create real-time chart using proven working method
|
|
# fig = go.Figure()
|
|
|
|
# # Get current price
|
|
# current_price = self.live_prices.get(symbol, data['close'].iloc[-1] if not data.empty else 0)
|
|
|
|
# if main_chart:
|
|
# # Main chart - use line chart for better compatibility (proven working method)
|
|
# fig.add_trace(go.Scatter(
|
|
# x=data['timestamp'] if 'timestamp' in data.columns else data.index,
|
|
# y=data['close'],
|
|
# mode='lines',
|
|
# name=f"{symbol} {timeframe.upper()}",
|
|
# line=dict(color='#00ff88', width=2),
|
|
# hovertemplate='<b>%{y:.2f}</b><br>%{x}<extra></extra>'
|
|
# ))
|
|
|
|
# # Add volume as bar chart on secondary y-axis
|
|
# if 'volume' in data.columns:
|
|
# fig.add_trace(go.Bar(
|
|
# x=data['timestamp'] if 'timestamp' in data.columns else data.index,
|
|
# y=data['volume'],
|
|
# name="Volume",
|
|
# yaxis='y2',
|
|
# opacity=0.4,
|
|
# marker_color='#4CAF50'
|
|
# ))
|
|
|
|
# # Add trading signals if available
|
|
# if self.recent_decisions:
|
|
# buy_decisions = []
|
|
# sell_decisions = []
|
|
|
|
# for decision in self.recent_decisions[-20:]: # Last 20 decisions
|
|
# if hasattr(decision, 'timestamp') and hasattr(decision, 'price') and hasattr(decision, 'action'):
|
|
# if decision.action == 'BUY':
|
|
# buy_decisions.append({'timestamp': decision.timestamp, 'price': decision.price})
|
|
# elif decision.action == 'SELL':
|
|
# sell_decisions.append({'timestamp': decision.timestamp, 'price': decision.price})
|
|
|
|
# # Add BUY markers
|
|
# if buy_decisions:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[d['timestamp'] for d in buy_decisions],
|
|
# y=[d['price'] for d in buy_decisions],
|
|
# mode='markers',
|
|
# marker=dict(color='#00ff88', size=12, symbol='triangle-up', line=dict(color='white', width=2)),
|
|
# name="BUY Signals",
|
|
# text=[f"BUY ${d['price']:.2f}" for d in buy_decisions],
|
|
# hoverinfo='text+x'
|
|
# ))
|
|
|
|
# # Add SELL markers
|
|
# if sell_decisions:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[d['timestamp'] for d in sell_decisions],
|
|
# y=[d['price'] for d in sell_decisions],
|
|
# mode='markers',
|
|
# marker=dict(color='#ff6b6b', size=12, symbol='triangle-down', line=dict(color='white', width=2)),
|
|
# name="SELL Signals",
|
|
# text=[f"SELL ${d['price']:.2f}" for d in sell_decisions],
|
|
# hoverinfo='text+x'
|
|
# ))
|
|
|
|
# # Current time and price info
|
|
# current_time = datetime.now().strftime("%H:%M:%S")
|
|
# latest_price = data['close'].iloc[-1] if not data.empty else current_price
|
|
|
|
# fig.update_layout(
|
|
# title=f"{symbol} LIVE CHART ({timeframe.upper()}) | ${latest_price:.2f} | {len(data)} candles | {current_time}",
|
|
# yaxis_title="Price (USDT)",
|
|
# yaxis2=dict(title="Volume", overlaying='y', side='right') if 'volume' in data.columns else None,
|
|
# template="plotly_dark",
|
|
# height=600,
|
|
# xaxis_rangeslider_visible=False,
|
|
# margin=dict(l=20, r=20, t=50, b=20),
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e',
|
|
# legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
|
|
# )
|
|
|
|
# else:
|
|
# # Small chart - use line chart for better compatibility (proven working method)
|
|
# fig.add_trace(go.Scatter(
|
|
# x=data['timestamp'] if 'timestamp' in data.columns else data.index,
|
|
# y=data['close'],
|
|
# mode='lines',
|
|
# name=f"{symbol} {timeframe}",
|
|
# line=dict(color='#00ff88', width=2),
|
|
# showlegend=False,
|
|
# hovertemplate='<b>%{y:.2f}</b><br>%{x}<extra></extra>'
|
|
# ))
|
|
|
|
# # Live price point
|
|
# if current_price > 0 and not data.empty:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[data['timestamp'].iloc[-1] if 'timestamp' in data.columns else data.index[-1]],
|
|
# y=[current_price],
|
|
# mode='markers',
|
|
# marker=dict(color='#FFD700', size=8),
|
|
# name="Live Price",
|
|
# showlegend=False
|
|
# ))
|
|
|
|
# fig.update_layout(
|
|
# template="plotly_dark",
|
|
# showlegend=False,
|
|
# margin=dict(l=10, r=10, t=40, b=10),
|
|
# height=300,
|
|
# title=f"{symbol} {timeframe.upper()} | ${current_price:.2f}",
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e'
|
|
# )
|
|
|
|
# return fig
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating live chart for {symbol} {timeframe}: {e}")
|
|
# # Return error chart
|
|
# fig = go.Figure()
|
|
# fig.add_annotation(
|
|
# text=f"Error loading {symbol} {timeframe}",
|
|
# xref="paper", yref="paper",
|
|
# x=0.5, y=0.5, showarrow=False,
|
|
# font=dict(size=14, color="#ff4444")
|
|
# )
|
|
# fig.update_layout(
|
|
# template="plotly_dark",
|
|
# height=600 if main_chart else 300,
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e'
|
|
# )
|
|
# return fig
|
|
|
|
# def _create_empty_chart(self, title: str):
|
|
# """Create an empty chart with error message"""
|
|
# fig = go.Figure()
|
|
# fig.add_annotation(
|
|
# text=f"{title}<br><br>Chart data loading...",
|
|
# xref="paper", yref="paper",
|
|
# x=0.5, y=0.5, showarrow=False,
|
|
# font=dict(size=14, color="#00ff88")
|
|
# )
|
|
# fig.update_layout(
|
|
# title=title,
|
|
# template="plotly_dark",
|
|
# height=300,
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e'
|
|
# )
|
|
# return fig
|
|
|
|
# def _create_cached_chart(self, symbol: str, timeframe: str):
|
|
# """Create chart using cached data for better performance (no API calls during updates)"""
|
|
# try:
|
|
# # Use cached data to avoid API calls during frequent updates
|
|
# data = None
|
|
|
|
# # Try to get cached data first
|
|
# try:
|
|
# with self.data_lock:
|
|
# if symbol in self.chart_data and timeframe in self.chart_data[symbol]:
|
|
# data = self.chart_data[symbol][timeframe].copy()
|
|
# if not data.empty and len(data) > 5:
|
|
# logger.debug(f"Using cached data for {symbol} {timeframe} ({len(data)} candles)")
|
|
# except Exception as e:
|
|
# logger.warning(f"Error getting cached data: {e}")
|
|
|
|
# # If no cached data, return empty chart
|
|
# if data is None or data.empty:
|
|
# logger.debug(f"NO DATA: No data available for {symbol} {timeframe}")
|
|
# return self._create_empty_chart(f"{symbol} {timeframe} - No Data Available")
|
|
|
|
# # Ensure we have valid data
|
|
# if data is None or data.empty:
|
|
# return self._create_empty_chart(f"{symbol} {timeframe} - No Data")
|
|
|
|
# # Create chart using line chart for better compatibility
|
|
# fig = go.Figure()
|
|
|
|
# # Add line chart
|
|
# fig.add_trace(go.Scatter(
|
|
# x=data['timestamp'] if 'timestamp' in data.columns else data.index,
|
|
# y=data['close'],
|
|
# mode='lines',
|
|
# name=f"{symbol} {timeframe}",
|
|
# line=dict(color='#4CAF50', width=2),
|
|
# hovertemplate='<b>%{y:.2f}</b><br>%{x}<extra></extra>'
|
|
# ))
|
|
|
|
# # Get current price for live marker
|
|
# current_price = self.live_prices.get(symbol, data['close'].iloc[-1] if not data.empty else 0)
|
|
|
|
# # Add current price marker
|
|
# if current_price > 0 and not data.empty:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[data['timestamp'].iloc[-1] if 'timestamp' in data.columns else data.index[-1]],
|
|
# y=[current_price],
|
|
# mode='markers',
|
|
# marker=dict(color='#FFD700', size=8),
|
|
# name="Live Price",
|
|
# showlegend=False
|
|
# ))
|
|
|
|
# # Update layout
|
|
# fig.update_layout(
|
|
# title=f"{symbol} {timeframe.upper()} (Cached) | ${current_price:.2f}",
|
|
# template="plotly_dark",
|
|
# height=300,
|
|
# margin=dict(l=10, r=10, t=40, b=10),
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e',
|
|
# showlegend=False
|
|
# )
|
|
|
|
# return fig
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating cached chart for {symbol} {timeframe}: {e}")
|
|
# return self._create_empty_chart(f"{symbol} {timeframe} - Cache Error")
|
|
|
|
# def _create_main_tick_chart(self, symbol: str):
|
|
# """Create main chart using real-time WebSocket tick buffer with enhanced trade visualization"""
|
|
# try:
|
|
# # Get tick buffer data
|
|
# tick_buffer = []
|
|
# current_price = 0
|
|
|
|
# try:
|
|
# with self.data_lock:
|
|
# tick_buffer = self.live_tick_buffer.get(symbol, []).copy()
|
|
# current_price = self.live_prices.get(symbol, 0)
|
|
# except Exception as e:
|
|
# logger.warning(f"Error accessing tick buffer: {e}")
|
|
|
|
# # If no tick data, use cached chart as fallback
|
|
# if not tick_buffer:
|
|
# logger.debug(f"No tick buffer for {symbol}, using cached chart")
|
|
# return self._create_cached_chart(symbol, '1s')
|
|
|
|
# # Convert tick buffer to DataFrame for plotting
|
|
# import pandas as pd
|
|
# df = pd.DataFrame(tick_buffer)
|
|
|
|
# # Create figure with enhanced tick data visualization
|
|
# fig = go.Figure()
|
|
|
|
# # Separate buy and sell trades for better visualization
|
|
# if 'side' in df.columns:
|
|
# buy_trades = df[df['side'] == 'buy']
|
|
# sell_trades = df[df['side'] == 'sell']
|
|
|
|
# # Add buy trades (green)
|
|
# if not buy_trades.empty:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=buy_trades['timestamp'],
|
|
# y=buy_trades['price'],
|
|
# mode='markers',
|
|
# name=f"{symbol} Buy Trades",
|
|
# marker=dict(color='#00ff88', size=4, opacity=0.7),
|
|
# hovertemplate='<b>BUY $%{y:.2f}</b><br>%{x}<br>Vol: %{customdata:.2f}<extra></extra>',
|
|
# customdata=buy_trades['volume'] if 'volume' in buy_trades.columns else None
|
|
# ))
|
|
|
|
# # Add sell trades (red)
|
|
# if not sell_trades.empty:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=sell_trades['timestamp'],
|
|
# y=sell_trades['price'],
|
|
# mode='markers',
|
|
# name=f"{symbol} Sell Trades",
|
|
# marker=dict(color='#ff6b6b', size=4, opacity=0.7),
|
|
# hovertemplate='<b>SELL $%{y:.2f}</b><br>%{x}<br>Vol: %{customdata:.2f}<extra></extra>',
|
|
# customdata=sell_trades['volume'] if 'volume' in sell_trades.columns else None
|
|
# ))
|
|
# else:
|
|
# # Fallback to simple line chart if no side data
|
|
# fig.add_trace(go.Scatter(
|
|
# x=df['timestamp'],
|
|
# y=df['price'],
|
|
# mode='lines+markers',
|
|
# name=f"{symbol} Live Trades",
|
|
# line=dict(color='#00ff88', width=1),
|
|
# marker=dict(size=3),
|
|
# hovertemplate='<b>$%{y:.2f}</b><br>%{x}<extra></extra>'
|
|
# ))
|
|
|
|
# # Add price trend line (moving average)
|
|
# if len(df) >= 20:
|
|
# df['ma_20'] = df['price'].rolling(window=20).mean()
|
|
# fig.add_trace(go.Scatter(
|
|
# x=df['timestamp'],
|
|
# y=df['ma_20'],
|
|
# mode='lines',
|
|
# name="20-Trade MA",
|
|
# line=dict(color='#FFD700', width=2, dash='dash'),
|
|
# opacity=0.8
|
|
# ))
|
|
|
|
# # Add current price marker
|
|
# if current_price > 0:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[df['timestamp'].iloc[-1]],
|
|
# y=[current_price],
|
|
# mode='markers',
|
|
# marker=dict(color='#FFD700', size=15, symbol='circle',
|
|
# line=dict(color='white', width=2)),
|
|
# name="Live Price",
|
|
# showlegend=False,
|
|
# hovertemplate=f'<b>LIVE: ${current_price:.2f}</b><extra></extra>'
|
|
# ))
|
|
|
|
# # Add volume bars on secondary y-axis
|
|
# if 'volume' in df.columns:
|
|
# fig.add_trace(go.Bar(
|
|
# x=df['timestamp'],
|
|
# y=df['volume'],
|
|
# name="Volume (USDT)",
|
|
# yaxis='y2',
|
|
# opacity=0.3,
|
|
# marker_color='#4CAF50',
|
|
# hovertemplate='<b>Vol: $%{y:.2f}</b><br>%{x}<extra></extra>'
|
|
# ))
|
|
|
|
# # Add trading signals if available
|
|
# if self.recent_decisions:
|
|
# buy_decisions = []
|
|
# sell_decisions = []
|
|
|
|
# for decision in self.recent_decisions[-10:]: # Last 10 decisions
|
|
# if hasattr(decision, 'timestamp') and hasattr(decision, 'price') and hasattr(decision, 'action'):
|
|
# if decision.action == 'BUY':
|
|
# buy_decisions.append({'timestamp': decision.timestamp, 'price': decision.price})
|
|
# elif decision.action == 'SELL':
|
|
# sell_decisions.append({'timestamp': decision.timestamp, 'price': decision.price})
|
|
|
|
# # Add BUY signals
|
|
# if buy_decisions:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[d['timestamp'] for d in buy_decisions],
|
|
# y=[d['price'] for d in buy_decisions],
|
|
# mode='markers',
|
|
# marker=dict(color='#00ff88', size=20, symbol='triangle-up',
|
|
# line=dict(color='white', width=3)),
|
|
# name="AI BUY Signals",
|
|
# text=[f"AI BUY ${d['price']:.2f}" for d in buy_decisions],
|
|
# hoverinfo='text+x'
|
|
# ))
|
|
|
|
# # Add SELL signals
|
|
# if sell_decisions:
|
|
# fig.add_trace(go.Scatter(
|
|
# x=[d['timestamp'] for d in sell_decisions],
|
|
# y=[d['price'] for d in sell_decisions],
|
|
# mode='markers',
|
|
# marker=dict(color='#ff6b6b', size=20, symbol='triangle-down',
|
|
# line=dict(color='white', width=3)),
|
|
# name="AI SELL Signals",
|
|
# text=[f"AI SELL ${d['price']:.2f}" for d in sell_decisions],
|
|
# hoverinfo='text+x'
|
|
# ))
|
|
|
|
# # Update layout with enhanced styling
|
|
# current_time = datetime.now().strftime("%H:%M:%S")
|
|
# tick_count = len(tick_buffer)
|
|
# latest_price = df['price'].iloc[-1] if not df.empty else current_price
|
|
# height = 600 if symbol == 'ETH/USDT' else 300
|
|
|
|
# # Calculate price change
|
|
# price_change = 0
|
|
# price_change_pct = 0
|
|
# if len(df) > 1:
|
|
# price_change = latest_price - df['price'].iloc[0]
|
|
# price_change_pct = (price_change / df['price'].iloc[0]) * 100
|
|
|
|
# # Color for price change
|
|
# change_color = '#00ff88' if price_change >= 0 else '#ff6b6b'
|
|
# change_symbol = '+' if price_change >= 0 else ''
|
|
|
|
# fig.update_layout(
|
|
# title=f"{symbol} Live Trade Stream | ${latest_price:.2f} ({change_symbol}{price_change_pct:+.2f}%) | {tick_count} trades | {current_time}",
|
|
# yaxis_title="Price (USDT)",
|
|
# yaxis2=dict(title="Volume (USDT)", overlaying='y', side='right') if 'volume' in df.columns else None,
|
|
# template="plotly_dark",
|
|
# height=height,
|
|
# xaxis_rangeslider_visible=False,
|
|
# margin=dict(l=20, r=20, t=50, b=20),
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e',
|
|
# showlegend=True,
|
|
# legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
|
|
# xaxis=dict(
|
|
# title="Time",
|
|
# type="date",
|
|
# tickformat="%H:%M:%S"
|
|
# ),
|
|
# # Add price change color to title
|
|
# title_font_color=change_color
|
|
# )
|
|
|
|
# return fig
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating main tick chart for {symbol}: {e}")
|
|
# # Return error chart
|
|
# fig = go.Figure()
|
|
# fig.add_annotation(
|
|
# text=f"Error loading {symbol} WebSocket stream<br>{str(e)}",
|
|
# xref="paper", yref="paper",
|
|
# x=0.5, y=0.5, showarrow=False,
|
|
# font=dict(size=14, color="#ff4444")
|
|
# )
|
|
# fig.update_layout(
|
|
# template="plotly_dark",
|
|
# height=600 if symbol == 'ETH/USDT' else 300,
|
|
# paper_bgcolor='#1e1e1e',
|
|
# plot_bgcolor='#1e1e1e'
|
|
# )
|
|
# return fig
|
|
|
|
# def _create_model_training_status(self):
|
|
# """Create model training status display with enhanced extrema information"""
|
|
# try:
|
|
# # Get sensitivity learning info (now includes extrema stats)
|
|
# sensitivity_info = self._get_sensitivity_learning_info()
|
|
|
|
# # Get training status in the expected format
|
|
# training_status = self._get_model_training_status()
|
|
|
|
# # Training Data Stream Status
|
|
# tick_cache_size = len(getattr(self, 'tick_cache', []))
|
|
# bars_cache_size = len(getattr(self, 'one_second_bars', []))
|
|
|
|
# training_items = []
|
|
|
|
# # Training Data Stream
|
|
# training_items.append(
|
|
# html.Div([
|
|
# html.H6([
|
|
# html.I(className="fas fa-database me-2 text-info"),
|
|
# "Training Data Stream"
|
|
# ], className="mb-2"),
|
|
# html.Div([
|
|
# html.Small([
|
|
# html.Strong("Tick Cache: "),
|
|
# html.Span(f"{tick_cache_size:,} ticks", className="text-success" if tick_cache_size > 100 else "text-warning")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("1s Bars: "),
|
|
# html.Span(f"{bars_cache_size} bars", className="text-success" if bars_cache_size > 100 else "text-warning")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Stream: "),
|
|
# html.Span("LIVE" if getattr(self, 'is_streaming', False) else "OFFLINE",
|
|
# className="text-success" if getattr(self, 'is_streaming', False) else "text-danger")
|
|
# ], className="d-block")
|
|
# ])
|
|
# ], className="mb-3 p-2 border border-info rounded")
|
|
# )
|
|
|
|
# # CNN Model Status
|
|
# training_items.append(
|
|
# html.Div([
|
|
# html.H6([
|
|
# html.I(className="fas fa-brain me-2 text-warning"),
|
|
# "CNN Model"
|
|
# ], className="mb-2"),
|
|
# html.Div([
|
|
# html.Small([
|
|
# html.Strong("Status: "),
|
|
# html.Span(training_status['cnn']['status'],
|
|
# className=f"text-{training_status['cnn']['status_color']}")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Accuracy: "),
|
|
# html.Span(f"{training_status['cnn']['accuracy']:.1%}", className="text-info")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Loss: "),
|
|
# html.Span(f"{training_status['cnn']['loss']:.4f}", className="text-muted")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Epochs: "),
|
|
# html.Span(f"{training_status['cnn']['epochs']}", className="text-muted")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Learning Rate: "),
|
|
# html.Span(f"{training_status['cnn']['learning_rate']:.6f}", className="text-muted")
|
|
# ], className="d-block")
|
|
# ])
|
|
# ], className="mb-3 p-2 border border-warning rounded")
|
|
# )
|
|
|
|
# # RL Agent Status
|
|
# training_items.append(
|
|
# html.Div([
|
|
# html.H6([
|
|
# html.I(className="fas fa-robot me-2 text-success"),
|
|
# "RL Agent (DQN)"
|
|
# ], className="mb-2"),
|
|
# html.Div([
|
|
# html.Small([
|
|
# html.Strong("Status: "),
|
|
# html.Span(training_status['rl']['status'],
|
|
# className=f"text-{training_status['rl']['status_color']}")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Win Rate: "),
|
|
# html.Span(f"{training_status['rl']['win_rate']:.1%}", className="text-info")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Avg Reward: "),
|
|
# html.Span(f"{training_status['rl']['avg_reward']:.2f}", className="text-muted")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Episodes: "),
|
|
# html.Span(f"{training_status['rl']['episodes']}", className="text-muted")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Epsilon: "),
|
|
# html.Span(f"{training_status['rl']['epsilon']:.3f}", className="text-muted")
|
|
# ], className="d-block"),
|
|
# html.Small([
|
|
# html.Strong("Memory: "),
|
|
# html.Span(f"{training_status['rl']['memory_size']:,}", className="text-muted")
|
|
# ], className="d-block")
|
|
# ])
|
|
# ], className="mb-3 p-2 border border-success rounded")
|
|
# )
|
|
|
|
# return html.Div(training_items)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating model training status: {e}")
|
|
# return html.Div([
|
|
# html.P("⚠️ Error loading training status", className="text-warning text-center"),
|
|
# html.P(f"Error: {str(e)}", className="text-muted text-center small")
|
|
# ], className="p-3")
|
|
|
|
# def _get_model_training_status(self) -> Dict:
|
|
# """Get current model training status and metrics"""
|
|
# try:
|
|
# # Initialize default status
|
|
# status = {
|
|
# 'cnn': {
|
|
# 'status': 'TRAINING',
|
|
# 'status_color': 'warning',
|
|
# 'accuracy': 0.0,
|
|
# 'loss': 0.0,
|
|
# 'epochs': 0,
|
|
# 'learning_rate': 0.001
|
|
# },
|
|
# 'rl': {
|
|
# 'status': 'TRAINING',
|
|
# 'status_color': 'success',
|
|
# 'win_rate': 0.0,
|
|
# 'avg_reward': 0.0,
|
|
# 'episodes': 0,
|
|
# 'epsilon': 1.0,
|
|
# 'memory_size': 0
|
|
# }
|
|
# }
|
|
|
|
# # Try to get real metrics from orchestrator
|
|
# if hasattr(self.orchestrator, 'get_performance_metrics'):
|
|
# try:
|
|
# perf_metrics = self.orchestrator.get_performance_metrics()
|
|
# if perf_metrics:
|
|
# # Update RL metrics from orchestrator performance
|
|
# status['rl']['win_rate'] = perf_metrics.get('win_rate', 0.0)
|
|
# status['rl']['episodes'] = perf_metrics.get('total_actions', 0)
|
|
|
|
# # Check if we have sensitivity learning data
|
|
# if hasattr(self.orchestrator, 'sensitivity_learning_queue'):
|
|
# status['rl']['memory_size'] = len(self.orchestrator.sensitivity_learning_queue)
|
|
# if status['rl']['memory_size'] > 0:
|
|
# status['rl']['status'] = 'LEARNING'
|
|
|
|
# # Check if we have extrema training data
|
|
# if hasattr(self.orchestrator, 'extrema_training_queue'):
|
|
# cnn_queue_size = len(self.orchestrator.extrema_training_queue)
|
|
# if cnn_queue_size > 0:
|
|
# status['cnn']['status'] = 'LEARNING'
|
|
# status['cnn']['epochs'] = min(cnn_queue_size // 10, 100) # Simulate epochs
|
|
|
|
# logger.debug("Updated training status from orchestrator metrics")
|
|
# except Exception as e:
|
|
# logger.warning(f"Error getting orchestrator metrics: {e}")
|
|
|
|
# # Try to get extrema stats for CNN training
|
|
# if hasattr(self.orchestrator, 'get_extrema_stats'):
|
|
# try:
|
|
# extrema_stats = self.orchestrator.get_extrema_stats()
|
|
# if extrema_stats:
|
|
# total_extrema = extrema_stats.get('total_extrema_detected', 0)
|
|
# if total_extrema > 0:
|
|
# status['cnn']['status'] = 'LEARNING'
|
|
# status['cnn']['epochs'] = min(total_extrema // 5, 200)
|
|
# # Simulate improving accuracy based on extrema detected
|
|
# status['cnn']['accuracy'] = min(0.85, total_extrema * 0.01)
|
|
# status['cnn']['loss'] = max(0.001, 1.0 - status['cnn']['accuracy'])
|
|
# except Exception as e:
|
|
# logger.warning(f"Error getting extrema stats: {e}")
|
|
|
|
# return status
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error getting model training status: {e}")
|
|
# return {
|
|
# 'cnn': {
|
|
# 'status': 'ERROR',
|
|
# 'status_color': 'danger',
|
|
# 'accuracy': 0.0,
|
|
# 'loss': 0.0,
|
|
# 'epochs': 0,
|
|
# 'learning_rate': 0.001
|
|
# },
|
|
# 'rl': {
|
|
# 'status': 'ERROR',
|
|
# 'status_color': 'danger',
|
|
# 'win_rate': 0.0,
|
|
# 'avg_reward': 0.0,
|
|
# 'episodes': 0,
|
|
# 'epsilon': 1.0,
|
|
# 'memory_size': 0
|
|
# }
|
|
# }
|
|
|
|
# def _get_sensitivity_learning_info(self) -> Dict[str, Any]:
|
|
# """Get sensitivity learning information for dashboard display"""
|
|
# try:
|
|
# if hasattr(self.orchestrator, 'get_extrema_stats'):
|
|
# # Get extrema stats from orchestrator
|
|
# extrema_stats = self.orchestrator.get_extrema_stats()
|
|
|
|
# # Get sensitivity stats
|
|
# sensitivity_info = {
|
|
# 'current_level': getattr(self.orchestrator, 'current_sensitivity_level', 2),
|
|
# 'level_name': 'medium',
|
|
# 'open_threshold': getattr(self.orchestrator, 'confidence_threshold_open', 0.6),
|
|
# 'close_threshold': getattr(self.orchestrator, 'confidence_threshold_close', 0.25),
|
|
# 'learning_cases': len(getattr(self.orchestrator, 'sensitivity_learning_queue', [])),
|
|
# 'completed_trades': len(getattr(self.orchestrator, 'completed_trades', [])),
|
|
# 'active_trades': len(getattr(self.orchestrator, 'active_trades', {}))
|
|
# }
|
|
|
|
# # Get level name
|
|
# if hasattr(self.orchestrator, 'sensitivity_levels'):
|
|
# levels = self.orchestrator.sensitivity_levels
|
|
# current_level = sensitivity_info['current_level']
|
|
# if current_level in levels:
|
|
# sensitivity_info['level_name'] = levels[current_level]['name']
|
|
|
|
# # Combine with extrema stats
|
|
# combined_info = {
|
|
# 'sensitivity': sensitivity_info,
|
|
# 'extrema': extrema_stats,
|
|
# 'context_data': extrema_stats.get('context_data_status', {}),
|
|
# 'training_active': extrema_stats.get('training_queue_size', 0) > 0
|
|
# }
|
|
|
|
# return combined_info
|
|
# else:
|
|
# # Fallback for basic sensitivity info
|
|
# return {
|
|
# 'sensitivity': {
|
|
# 'current_level': 2,
|
|
# 'level_name': 'medium',
|
|
# 'open_threshold': 0.6,
|
|
# 'close_threshold': 0.25,
|
|
# 'learning_cases': 0,
|
|
# 'completed_trades': 0,
|
|
# 'active_trades': 0
|
|
# },
|
|
# 'extrema': {
|
|
# 'total_extrema_detected': 0,
|
|
# 'training_queue_size': 0,
|
|
# 'recent_extrema': {'bottoms': 0, 'tops': 0, 'avg_confidence': 0.0}
|
|
# },
|
|
# 'context_data': {},
|
|
# 'training_active': False
|
|
# }
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error getting sensitivity learning info: {e}")
|
|
# return {
|
|
# 'sensitivity': {
|
|
# 'current_level': 2,
|
|
# 'level_name': 'medium',
|
|
# 'open_threshold': 0.6,
|
|
# 'close_threshold': 0.25,
|
|
# 'learning_cases': 0,
|
|
# 'completed_trades': 0,
|
|
# 'active_trades': 0
|
|
# },
|
|
# 'extrema': {
|
|
# 'total_extrema_detected': 0,
|
|
# 'training_queue_size': 0,
|
|
# 'recent_extrema': {'bottoms': 0, 'tops': 0, 'avg_confidence': 0.0}
|
|
# },
|
|
# 'context_data': {},
|
|
# 'training_active': False
|
|
# }
|
|
|
|
# def _create_orchestrator_status(self):
|
|
# """Create orchestrator data flow status"""
|
|
# try:
|
|
# # Get orchestrator status
|
|
# if hasattr(self.orchestrator, 'tick_processor') and self.orchestrator.tick_processor:
|
|
# tick_stats = self.orchestrator.tick_processor.get_processing_stats()
|
|
|
|
# return html.Div([
|
|
# html.Div([
|
|
# html.H6("Data Input", className="text-info"),
|
|
# html.P(f"Symbols: {tick_stats.get('symbols', [])}", className="text-white"),
|
|
# html.P(f"Streaming: {'ACTIVE' if tick_stats.get('streaming', False) else 'INACTIVE'}", className="text-white"),
|
|
# html.P(f"Subscribers: {tick_stats.get('subscribers', 0)}", className="text-white")
|
|
# ], className="col-md-6"),
|
|
|
|
# html.Div([
|
|
# html.H6("Processing", className="text-success"),
|
|
# html.P(f"Tick Counts: {tick_stats.get('tick_counts', {})}", className="text-white"),
|
|
# html.P(f"Buffer Sizes: {tick_stats.get('buffer_sizes', {})}", className="text-white"),
|
|
# html.P(f"Neural DPS: {'ACTIVE' if tick_stats.get('streaming', False) else 'INACTIVE'}", className="text-white")
|
|
# ], className="col-md-6")
|
|
# ], className="row")
|
|
# else:
|
|
# return html.Div([
|
|
# html.Div([
|
|
# html.H6("Universal Data Format", className="text-info"),
|
|
# html.P("OK ETH ticks, 1m, 1h, 1d", className="text-white"),
|
|
# html.P("OK BTC reference ticks", className="text-white"),
|
|
# html.P("OK 5-stream format active", className="text-white")
|
|
# ], className="col-md-6"),
|
|
|
|
# html.Div([
|
|
# html.H6("Model Integration", className="text-success"),
|
|
# html.P("OK CNN pipeline ready", className="text-white"),
|
|
# html.P("OK RL pipeline ready", className="text-white"),
|
|
# html.P("OK Neural DPS active", className="text-white")
|
|
# ], className="col-md-6")
|
|
# ], className="row")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating orchestrator status: {e}")
|
|
# return html.Div([
|
|
# html.P("Error loading orchestrator status", className="text-danger")
|
|
# ])
|
|
|
|
# def _create_training_events_log(self):
|
|
# """Create enhanced training events log with retrospective learning details"""
|
|
# try:
|
|
# # Get recent perfect moves and training events
|
|
# events = []
|
|
|
|
# if hasattr(self.orchestrator, 'perfect_moves') and self.orchestrator.perfect_moves:
|
|
# perfect_moves = list(self.orchestrator.perfect_moves)[-8:] # Last 8 perfect moves
|
|
|
|
# for move in perfect_moves:
|
|
# timestamp = move.timestamp.strftime('%H:%M:%S')
|
|
# outcome_pct = move.actual_outcome * 100
|
|
# confidence_gap = move.confidence_should_have_been - 0.6 # vs default threshold
|
|
|
|
# events.append({
|
|
# 'time': timestamp,
|
|
# 'type': 'CNN',
|
|
# 'event': f"Perfect {move.optimal_action} {move.symbol} ({outcome_pct:+.2f}%) - Retrospective Learning",
|
|
# 'confidence': move.confidence_should_have_been,
|
|
# 'color': 'text-warning',
|
|
# 'priority': 3 if abs(outcome_pct) > 2 else 2 # High priority for big moves
|
|
# })
|
|
|
|
# # Add confidence adjustment event
|
|
# if confidence_gap > 0.1:
|
|
# events.append({
|
|
# 'time': timestamp,
|
|
# 'type': 'TUNE',
|
|
# 'event': f"Confidence threshold adjustment needed: +{confidence_gap:.2f}",
|
|
# 'confidence': confidence_gap,
|
|
# 'color': 'text-info',
|
|
# 'priority': 2
|
|
# })
|
|
|
|
# # Add RL training events based on queue activity
|
|
# if hasattr(self.orchestrator, 'rl_evaluation_queue') and self.orchestrator.rl_evaluation_queue:
|
|
# queue_size = len(self.orchestrator.rl_evaluation_queue)
|
|
# current_time = datetime.now()
|
|
|
|
# if queue_size > 0:
|
|
# events.append({
|
|
# 'time': current_time.strftime('%H:%M:%S'),
|
|
# 'type': 'RL',
|
|
# 'event': f'Experience replay active (queue: {queue_size} actions)',
|
|
# 'confidence': min(1.0, queue_size / 10),
|
|
# 'color': 'text-success',
|
|
# 'priority': 3 if queue_size > 5 else 1
|
|
# })
|
|
|
|
# # Add tick processing events
|
|
# if hasattr(self.orchestrator, 'get_realtime_tick_stats'):
|
|
# tick_stats = self.orchestrator.get_realtime_tick_stats()
|
|
# patterns_detected = tick_stats.get('patterns_detected', 0)
|
|
|
|
# if patterns_detected > 0:
|
|
# events.append({
|
|
# 'time': datetime.now().strftime('%H:%M:%S'),
|
|
# 'type': 'TICK',
|
|
# 'event': f'Violent move patterns detected: {patterns_detected}',
|
|
# 'confidence': min(1.0, patterns_detected / 5),
|
|
# 'color': 'text-info',
|
|
# 'priority': 2
|
|
# })
|
|
|
|
# # Sort events by priority and time
|
|
# events.sort(key=lambda x: (x.get('priority', 1), x['time']), reverse=True)
|
|
|
|
# if not events:
|
|
# return html.Div([
|
|
# html.P("🤖 Models initializing... Waiting for perfect opportunities to learn from.",
|
|
# className="text-muted text-center"),
|
|
# html.P("💡 Retrospective learning will activate when significant price moves are detected.",
|
|
# className="text-muted text-center")
|
|
# ])
|
|
|
|
# log_items = []
|
|
# for event in events[:10]: # Show top 10 events
|
|
# icon = "🧠" if event['type'] == 'CNN' else "🤖" if event['type'] == 'RL' else "⚙️" if event['type'] == 'TUNE' else "⚡"
|
|
# confidence_display = f"{event['confidence']:.2f}" if event['confidence'] <= 1.0 else f"{event['confidence']:.3f}"
|
|
|
|
# log_items.append(
|
|
# html.P(f"{event['time']} {icon} [{event['type']}] {event['event']} (conf: {confidence_display})",
|
|
# className=f"{event['color']} mb-1")
|
|
# )
|
|
|
|
# return html.Div(log_items)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error creating training events log: {e}")
|
|
# return html.Div([
|
|
# html.P("Error loading training events", className="text-danger")
|
|
# ])
|
|
|
|
# def _create_live_actions_log(self):
|
|
# """Create live trading actions log with session information"""
|
|
# if not self.recent_decisions:
|
|
# return html.P("Waiting for live trading signals from session...",
|
|
# className="text-muted text-center")
|
|
|
|
# log_items = []
|
|
# for action in self.recent_decisions[-5:]:
|
|
# sofia_time = action.timestamp.astimezone(self.timezone).strftime("%H:%M:%S")
|
|
|
|
# # Find corresponding trade in session history for P&L info
|
|
# trade_pnl = ""
|
|
# for trade in reversed(self.trading_session.trade_history):
|
|
# if (trade['timestamp'].replace(tzinfo=None) - action.timestamp.replace(tzinfo=None)).total_seconds() < 5:
|
|
# if trade.get('pnl', 0) != 0:
|
|
# trade_pnl = f" | P&L: ${trade['pnl']:+.2f}"
|
|
# break
|
|
|
|
# log_items.append(
|
|
# html.P(
|
|
# f"ACTION: {sofia_time} | {action.action} {action.symbol} @ ${action.price:.2f} "
|
|
# f"(Confidence: {action.confidence:.1%}) | Session Trade{trade_pnl}",
|
|
# className="text-center mb-1 text-light"
|
|
# )
|
|
# )
|
|
|
|
# return html.Div(log_items)
|
|
|
|
# def add_trading_decision(self, decision: TradingAction):
|
|
# """Add trading decision with Sofia timezone and session tracking"""
|
|
# decision.timestamp = decision.timestamp.astimezone(self.timezone)
|
|
# self.recent_decisions.append(decision)
|
|
|
|
# if len(self.recent_decisions) > 50:
|
|
# self.recent_decisions.pop(0)
|
|
|
|
# # Update session last action (trade count is updated in execute_trade)
|
|
# self.trading_session.last_action = f"{decision.action} {decision.symbol}"
|
|
|
|
# sofia_time = decision.timestamp.strftime("%H:%M:%S %Z")
|
|
# logger.info(f"FIRE: {sofia_time} | Session trading decision: {decision.action} {decision.symbol} @ ${decision.price:.2f}")
|
|
|
|
# def stop_streaming(self):
|
|
# """Stop streaming and cleanup"""
|
|
# logger.info("Stopping dashboard streaming...")
|
|
|
|
# self.streaming = False
|
|
|
|
# # Stop unified data stream
|
|
# if hasattr(self, 'unified_stream'):
|
|
# asyncio.run(self.unified_stream.stop_streaming())
|
|
|
|
# # Unregister as consumer
|
|
# if hasattr(self, 'stream_consumer_id'):
|
|
# self.unified_stream.unregister_consumer(self.stream_consumer_id)
|
|
|
|
# # Stop any remaining WebSocket threads
|
|
# if hasattr(self, 'websocket_threads'):
|
|
# for thread in self.websocket_threads:
|
|
# if thread.is_alive():
|
|
# thread.join(timeout=2)
|
|
|
|
# logger.info("Dashboard streaming stopped")
|
|
|
|
# def run(self, host: str = '127.0.0.1', port: int = 8051, debug: bool = False):
|
|
# """Run the real-time dashboard"""
|
|
# try:
|
|
# logger.info(f"TRADING: Starting Live Scalping Dashboard (500x Leverage) at http://{host}:{port}")
|
|
# logger.info("START: SESSION TRADING FEATURES:")
|
|
# logger.info(f"Session ID: {self.trading_session.session_id}")
|
|
# logger.info(f"Starting Balance: ${self.trading_session.starting_balance:.2f}")
|
|
# logger.info(" - Session-based P&L tracking (resets each session)")
|
|
# logger.info(" - Real-time trade execution with 500x leverage")
|
|
# logger.info(" - Clean accounting logs for all trades")
|
|
# logger.info("STREAM: TECHNICAL FEATURES:")
|
|
# logger.info(" - WebSocket price streaming (1s updates)")
|
|
# logger.info(" - NO CACHED DATA - Always fresh API calls")
|
|
# logger.info(f" - Sofia timezone: {self.timezone}")
|
|
# logger.info(" - Real-time charts with throttling")
|
|
|
|
# self.app.run(host=host, port=port, debug=debug)
|
|
|
|
# except KeyboardInterrupt:
|
|
# logger.info("Shutting down session trading dashboard...")
|
|
# # Log final session summary
|
|
# summary = self.trading_session.get_session_summary()
|
|
# logger.info(f"FINAL SESSION SUMMARY:")
|
|
# logger.info(f"Session: {summary['session_id']}")
|
|
# logger.info(f"Duration: {summary['duration']}")
|
|
# logger.info(f"Final P&L: ${summary['total_pnl']:+.2f}")
|
|
# logger.info(f"Total Trades: {summary['total_trades']}")
|
|
# logger.info(f"Win Rate: {summary['win_rate']:.1%}")
|
|
# logger.info(f"Final Balance: ${summary['current_balance']:.2f}")
|
|
# finally:
|
|
# self.stop_streaming()
|
|
|
|
# def _process_orchestrator_decisions(self):
|
|
# """
|
|
# Process trading decisions from orchestrator and execute trades in the session
|
|
# """
|
|
# try:
|
|
# # Check if orchestrator has new decisions
|
|
# # This could be enhanced to use async calls, but for now we'll simulate based on market conditions
|
|
|
|
# # Get current prices for trade execution
|
|
# eth_price = self.live_prices.get('ETH/USDT', 0)
|
|
# btc_price = self.live_prices.get('BTC/USDT', 0)
|
|
|
|
# # Simple trading logic based on recent price movements (demo for session testing)
|
|
# if eth_price > 0 and len(self.chart_data['ETH/USDT']['1s']) > 0:
|
|
# recent_eth_data = self.chart_data['ETH/USDT']['1s'].tail(5)
|
|
# if not recent_eth_data.empty:
|
|
# price_change = (eth_price - recent_eth_data['close'].iloc[0]) / recent_eth_data['close'].iloc[0]
|
|
|
|
# # Generate trading signals every ~30 seconds based on price movement
|
|
# if len(self.trading_session.trade_history) == 0 or \
|
|
# (datetime.now() - self.trading_session.trade_history[-1]['timestamp']).total_seconds() > 30:
|
|
|
|
# if price_change > 0.001: # 0.1% price increase
|
|
# action = TradingAction(
|
|
# symbol='ETH/USDT',
|
|
# action='BUY',
|
|
# confidence=0.6 + min(abs(price_change) * 10, 0.3),
|
|
# timestamp=datetime.now(self.timezone),
|
|
# price=eth_price,
|
|
# quantity=0.01
|
|
# )
|
|
# self._execute_session_trade(action, eth_price)
|
|
|
|
# elif price_change < -0.001: # 0.1% price decrease
|
|
# action = TradingAction(
|
|
# symbol='ETH/USDT',
|
|
# action='SELL',
|
|
# confidence=0.6 + min(abs(price_change) * 10, 0.3),
|
|
# timestamp=datetime.now(self.timezone),
|
|
# price=eth_price,
|
|
# quantity=0.01
|
|
# )
|
|
# self._execute_session_trade(action, eth_price)
|
|
|
|
# # Similar logic for BTC (less frequent)
|
|
# if btc_price > 0 and len(self.chart_data['BTC/USDT']['1s']) > 0:
|
|
# recent_btc_data = self.chart_data['BTC/USDT']['1s'].tail(3)
|
|
# if not recent_btc_data.empty:
|
|
# price_change = (btc_price - recent_btc_data['close'].iloc[0]) / recent_btc_data['close'].iloc[0]
|
|
|
|
# # BTC trades less frequently
|
|
# btc_trades = [t for t in self.trading_session.trade_history if t['symbol'] == 'BTC/USDT']
|
|
# if len(btc_trades) == 0 or \
|
|
# (datetime.now() - btc_trades[-1]['timestamp']).total_seconds() > 60:
|
|
|
|
# if abs(price_change) > 0.002: # 0.2% price movement for BTC
|
|
# action_type = 'BUY' if price_change > 0 else 'SELL'
|
|
# action = TradingAction(
|
|
# symbol='BTC/USDT',
|
|
# action=action_type,
|
|
# confidence=0.7 + min(abs(price_change) * 5, 0.25),
|
|
# timestamp=datetime.now(self.timezone),
|
|
# price=btc_price,
|
|
# quantity=0.001
|
|
# )
|
|
# self._execute_session_trade(action, btc_price)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error processing orchestrator decisions: {e}")
|
|
|
|
# def _execute_session_trade(self, action: TradingAction, current_price: float):
|
|
# """
|
|
# Execute trade in the trading session and update all metrics
|
|
# """
|
|
# try:
|
|
# # Execute the trade in the session
|
|
# trade_info = self.trading_session.execute_trade(action, current_price)
|
|
|
|
# if trade_info:
|
|
# # Add to recent decisions for display
|
|
# self.add_trading_decision(action)
|
|
|
|
# # Log session trade
|
|
# logger.info(f"SESSION TRADE: {action.action} {action.symbol}")
|
|
# logger.info(f"Position Value: ${trade_info['value']:.2f}")
|
|
# logger.info(f"Confidence: {action.confidence:.1%}")
|
|
# logger.info(f"Session Balance: ${self.trading_session.current_balance:.2f}")
|
|
|
|
# # Log trade history for accounting
|
|
# self._log_trade_for_accounting(trade_info)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error executing session trade: {e}")
|
|
|
|
# def _log_trade_for_accounting(self, trade_info: dict):
|
|
# """
|
|
# Log trade for clean accounting purposes - this will be used even after broker API connection
|
|
# """
|
|
# try:
|
|
# # Create accounting log entry
|
|
# accounting_entry = {
|
|
# 'session_id': self.trading_session.session_id,
|
|
# 'timestamp': trade_info['timestamp'].isoformat(),
|
|
# 'symbol': trade_info['symbol'],
|
|
# 'action': trade_info['action'],
|
|
# 'price': trade_info['price'],
|
|
# 'size': trade_info['size'],
|
|
# 'value': trade_info['value'],
|
|
# 'confidence': trade_info['confidence'],
|
|
# 'pnl': trade_info.get('pnl', 0),
|
|
# 'session_balance': self.trading_session.current_balance,
|
|
# 'session_total_pnl': self.trading_session.total_pnl
|
|
# }
|
|
|
|
# # Write to trade log file (append mode)
|
|
# log_file = f"trade_logs/session_{self.trading_session.session_id}_{datetime.now().strftime('%Y%m%d')}.json"
|
|
|
|
# # Ensure trade_logs directory exists
|
|
# import os
|
|
# os.makedirs('trade_logs', exist_ok=True)
|
|
|
|
# # Append trade to log file
|
|
# import json
|
|
# with open(log_file, 'a') as f:
|
|
# f.write(json.dumps(accounting_entry) + '\n')
|
|
|
|
# logger.info(f"Trade logged for accounting: {log_file}")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error logging trade for accounting: {e}")
|
|
|
|
# def _start_orchestrator_trading(self):
|
|
# """Start orchestrator-based trading in background"""
|
|
# def orchestrator_loop():
|
|
# """Background orchestrator trading loop with retrospective learning"""
|
|
# logger.info("ORCHESTRATOR: Starting enhanced trading loop with retrospective learning")
|
|
|
|
# while self.streaming:
|
|
# try:
|
|
# # Process orchestrator decisions
|
|
# self._process_orchestrator_decisions()
|
|
|
|
# # Trigger retrospective learning analysis every 5 minutes
|
|
# if hasattr(self.orchestrator, 'trigger_retrospective_learning'):
|
|
# asyncio.run(self.orchestrator.trigger_retrospective_learning())
|
|
|
|
# # Sleep for decision frequency
|
|
# time.sleep(30) # 30 second intervals for scalping
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error in orchestrator loop: {e}")
|
|
# time.sleep(5) # Short sleep on error
|
|
|
|
# logger.info("ORCHESTRATOR: Trading loop stopped")
|
|
|
|
# # Start orchestrator in background thread
|
|
# orchestrator_thread = Thread(target=orchestrator_loop, daemon=True)
|
|
# orchestrator_thread.start()
|
|
# logger.info("ORCHESTRATOR: Enhanced trading loop started with retrospective learning")
|
|
|
|
# def _start_training_data_collection(self):
|
|
# """Start enhanced training data collection using unified stream"""
|
|
# def training_loop():
|
|
# try:
|
|
# logger.info("Enhanced training data collection started with unified stream")
|
|
|
|
# while True:
|
|
# try:
|
|
# # Get latest training data from unified stream
|
|
# training_data = self.unified_stream.get_latest_training_data()
|
|
|
|
# if training_data:
|
|
# # Send training data to enhanced RL pipeline
|
|
# self._send_training_data_to_enhanced_rl(training_data)
|
|
|
|
# # Update context data in orchestrator
|
|
# if hasattr(self.orchestrator, 'update_context_data'):
|
|
# self.orchestrator.update_context_data()
|
|
|
|
# # Initialize extrema trainer if not done
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# if not hasattr(self.orchestrator.extrema_trainer, '_initialized'):
|
|
# self.orchestrator.extrema_trainer.initialize_context_data()
|
|
# self.orchestrator.extrema_trainer._initialized = True
|
|
# logger.info("Extrema trainer context data initialized")
|
|
|
|
# # Run extrema detection with real data
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# for symbol in self.orchestrator.symbols:
|
|
# detected = self.orchestrator.extrema_trainer.detect_local_extrema(symbol)
|
|
# if detected:
|
|
# logger.info(f"Detected {len(detected)} extrema for {symbol}")
|
|
|
|
# time.sleep(30) # Update every 30 seconds
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error in enhanced training loop: {e}")
|
|
# time.sleep(10) # Wait before retrying
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Enhanced training loop failed: {e}")
|
|
|
|
# # Start enhanced training thread
|
|
# training_thread = Thread(target=training_loop, daemon=True)
|
|
# training_thread.start()
|
|
# logger.info("Enhanced training data collection thread started")
|
|
|
|
# def _send_training_data_to_enhanced_rl(self, training_data: TrainingDataPacket):
|
|
# """Send training data to enhanced RL training pipeline"""
|
|
# try:
|
|
# if not self.orchestrator:
|
|
# return
|
|
|
|
# # Extract comprehensive training data
|
|
# market_state = training_data.market_state
|
|
# universal_stream = training_data.universal_stream
|
|
|
|
# if market_state and universal_stream:
|
|
# # Send to enhanced RL trainer if available
|
|
# if hasattr(self.orchestrator, 'enhanced_rl_trainer'):
|
|
# # Create RL training step with comprehensive data
|
|
# asyncio.run(self.orchestrator.enhanced_rl_trainer.training_step(universal_stream))
|
|
# logger.debug("Sent comprehensive data to enhanced RL trainer")
|
|
|
|
# # Send to extrema trainer for CNN training
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# extrema_data = self.orchestrator.extrema_trainer.get_extrema_training_data(count=50)
|
|
# perfect_moves = self.orchestrator.extrema_trainer.get_perfect_moves_for_cnn(count=100)
|
|
|
|
# if extrema_data:
|
|
# logger.info(f"Enhanced RL: {len(extrema_data)} extrema training samples available")
|
|
|
|
# if perfect_moves:
|
|
# logger.info(f"Enhanced RL: {len(perfect_moves)} perfect moves for CNN training")
|
|
|
|
# # Send to sensitivity learning DQN
|
|
# if hasattr(self.orchestrator, 'sensitivity_learning_queue') and len(self.orchestrator.sensitivity_learning_queue) > 0:
|
|
# logger.info("Enhanced RL: Sensitivity learning data available for DQN training")
|
|
|
|
# # Get context features for models with real data
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# for symbol in self.orchestrator.symbols:
|
|
# context_features = self.orchestrator.extrema_trainer.get_context_features_for_model(symbol)
|
|
# if context_features is not None:
|
|
# logger.debug(f"Enhanced RL: Context features available for {symbol}: {context_features.shape}")
|
|
|
|
# # Log training data statistics
|
|
# logger.info(f"Enhanced RL Training Data:")
|
|
# logger.info(f" Tick cache: {len(training_data.tick_cache)} ticks")
|
|
# logger.info(f" 1s bars: {len(training_data.one_second_bars)} bars")
|
|
# logger.info(f" Multi-timeframe data: {len(training_data.multi_timeframe_data)} symbols")
|
|
# logger.info(f" CNN features: {'Available' if training_data.cnn_features else 'Not available'}")
|
|
# logger.info(f" CNN predictions: {'Available' if training_data.cnn_predictions else 'Not available'}")
|
|
# logger.info(f" Market state: {'Available' if training_data.market_state else 'Not available'}")
|
|
# logger.info(f" Universal stream: {'Available' if training_data.universal_stream else 'Not available'}")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error sending training data to enhanced RL: {e}")
|
|
|
|
# def _collect_training_ticks(self):
|
|
# """Collect real tick data for training cache from data provider"""
|
|
# try:
|
|
# # Get real tick data from data provider subscribers
|
|
# for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
# try:
|
|
# # Get recent ticks from data provider
|
|
# recent_ticks = self.data_provider.get_recent_ticks(symbol, count=10)
|
|
|
|
# for tick in recent_ticks:
|
|
# # Create tick data from real market data
|
|
# tick_data = {
|
|
# 'symbol': tick.symbol,
|
|
# 'price': tick.price,
|
|
# 'timestamp': tick.timestamp,
|
|
# 'volume': tick.volume
|
|
# }
|
|
|
|
# # Add to tick cache
|
|
# self.tick_cache.append(tick_data)
|
|
|
|
# # Create 1s bar data from real tick
|
|
# bar_data = {
|
|
# 'symbol': tick.symbol,
|
|
# 'open': tick.price,
|
|
# 'high': tick.price,
|
|
# 'low': tick.price,
|
|
# 'close': tick.price,
|
|
# 'volume': tick.volume,
|
|
# 'timestamp': tick.timestamp
|
|
# }
|
|
|
|
# # Add to 1s bars cache
|
|
# self.one_second_bars.append(bar_data)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error collecting real tick data for {symbol}: {e}")
|
|
|
|
# # Set streaming status based on real data availability
|
|
# self.is_streaming = len(self.tick_cache) > 0
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error in real tick data collection: {e}")
|
|
|
|
# def _send_training_data_to_models(self):
|
|
# """Send training data to models for actual training"""
|
|
# try:
|
|
# # Get extrema training data from orchestrator
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# extrema_data = self.orchestrator.extrema_trainer.get_extrema_training_data(count=50)
|
|
# perfect_moves = self.orchestrator.extrema_trainer.get_perfect_moves_for_cnn(count=100)
|
|
|
|
# if extrema_data:
|
|
# logger.info(f"Sending {len(extrema_data)} extrema training samples to models")
|
|
|
|
# if perfect_moves:
|
|
# logger.info(f"Sending {len(perfect_moves)} perfect moves to CNN models")
|
|
|
|
# # Get context features for models
|
|
# if hasattr(self.orchestrator, 'extrema_trainer'):
|
|
# for symbol in self.orchestrator.symbols:
|
|
# context_features = self.orchestrator.extrema_trainer.get_context_features_for_model(symbol)
|
|
# if context_features is not None:
|
|
# logger.debug(f"Context features available for {symbol}: {context_features.shape}")
|
|
|
|
# # Simulate model training progress
|
|
# if hasattr(self.orchestrator, 'extrema_training_queue') and len(self.orchestrator.extrema_training_queue) > 0:
|
|
# logger.info("CNN model training in progress with extrema data")
|
|
|
|
# if hasattr(self.orchestrator, 'sensitivity_learning_queue') and len(self.orchestrator.sensitivity_learning_queue) > 0:
|
|
# logger.info("RL agent training in progress with sensitivity learning data")
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error sending training data to models: {e}")
|
|
|
|
# def _handle_unified_stream_data(self, data_packet: Dict[str, Any]):
|
|
# """Handle data from unified stream"""
|
|
# try:
|
|
# # Extract UI data
|
|
# if 'ui_data' in data_packet:
|
|
# self.latest_ui_data = data_packet['ui_data']
|
|
# self.current_prices = self.latest_ui_data.current_prices
|
|
# self.is_streaming = self.latest_ui_data.streaming_status == 'LIVE'
|
|
# self.training_data_available = self.latest_ui_data.training_data_available
|
|
|
|
# # Extract training data
|
|
# if 'training_data' in data_packet:
|
|
# self.latest_training_data = data_packet['training_data']
|
|
|
|
# # Extract tick data
|
|
# if 'ticks' in data_packet:
|
|
# ticks = data_packet['ticks']
|
|
# for tick in ticks[-100:]: # Keep last 100 ticks
|
|
# self.tick_cache.append(tick)
|
|
|
|
# # Extract OHLCV data
|
|
# if 'one_second_bars' in data_packet:
|
|
# bars = data_packet['one_second_bars']
|
|
# for bar in bars[-100:]: # Keep last 100 bars
|
|
# self.one_second_bars.append(bar)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Error handling unified stream data: {e}")
|
|
|
|
# def create_scalping_dashboard(data_provider=None, orchestrator=None, trading_executor=None):
|
|
# """Create real-time dashboard instance with MEXC integration"""
|
|
# return RealTimeScalpingDashboard(data_provider, orchestrator, trading_executor)
|
|
|
|
# # For backward compatibility
|
|
# ScalpingDashboard = RealTimeScalpingDashboard
|