dash fixes
This commit is contained in:
235
web/dashboard.py
235
web/dashboard.py
@ -1204,20 +1204,25 @@ class TradingDashboard:
|
||||
# OPTIMIZED CHART - Using new optimized version with trade caching
|
||||
if is_chart_update:
|
||||
try:
|
||||
if hasattr(self, '_cached_chart_data_time'):
|
||||
cache_time = self._cached_chart_data_time
|
||||
if time.time() - cache_time < 3: # Use cached chart if < 3s old for faster updates
|
||||
price_chart = getattr(self, '_cached_price_chart', None)
|
||||
else:
|
||||
price_chart = self._create_price_chart(symbol)
|
||||
self._cached_price_chart = price_chart
|
||||
self._cached_chart_data_time = time.time()
|
||||
else:
|
||||
price_chart = self._create_price_chart(symbol)
|
||||
# Always try to create fresh chart for real-time updates
|
||||
# Only use cache as emergency fallback
|
||||
price_chart = self._create_price_chart(symbol)
|
||||
|
||||
# Cache the successful chart for emergency fallback
|
||||
if price_chart is not None:
|
||||
self._cached_price_chart = price_chart
|
||||
self._cached_chart_data_time = time.time()
|
||||
else:
|
||||
# If chart creation failed, try cached version
|
||||
if hasattr(self, '_cached_price_chart'):
|
||||
price_chart = self._cached_price_chart
|
||||
logger.debug("Using cached chart due to creation failure")
|
||||
else:
|
||||
price_chart = self._create_empty_chart("Chart Loading", "Initializing chart data...")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Chart error: {e}")
|
||||
# Try cached chart first, then empty chart
|
||||
price_chart = getattr(self, '_cached_price_chart',
|
||||
self._create_empty_chart("Chart Error", "Chart temporarily unavailable"))
|
||||
else:
|
||||
@ -1554,27 +1559,41 @@ class TradingDashboard:
|
||||
def _create_price_chart(self, symbol: str) -> go.Figure:
|
||||
"""Create price chart with volume and Williams pivot points from cached data"""
|
||||
try:
|
||||
# For Williams Market Structure, we need 1s data for proper recursive analysis
|
||||
# Get 4 hours (240 minutes) of 1m data for better trade visibility
|
||||
df_1s = None
|
||||
df_1m = None
|
||||
# Try to get real-time WebSocket data first for best performance (1-second updates)
|
||||
ws_df = self.get_realtime_tick_data(symbol, limit=2000)
|
||||
|
||||
# Try to get 1s data first for Williams analysis (reduced to 10 minutes for performance)
|
||||
try:
|
||||
df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=600, refresh=False)
|
||||
if df_1s is None or df_1s.empty:
|
||||
logger.warning("[CHART] No 1s cached data available, trying fresh 1s data")
|
||||
df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=300, refresh=True)
|
||||
|
||||
if df_1s is not None and not df_1s.empty:
|
||||
# Aggregate 1s data to 1m for chart display (cleaner visualization)
|
||||
df = self._aggregate_1s_to_1m(df_1s)
|
||||
actual_timeframe = '1s→1m'
|
||||
else:
|
||||
df_1s = None
|
||||
except Exception as e:
|
||||
logger.warning(f"[CHART] Error getting 1s data: {e}")
|
||||
if ws_df is not None and not ws_df.empty and len(ws_df) >= 10:
|
||||
# Use WebSocket data (ultra-fast, real-time streaming)
|
||||
df = ws_df
|
||||
df_1s = ws_df # Use for Williams analysis too
|
||||
actual_timeframe = 'WS-1s'
|
||||
logger.debug(f"[CHART] Using WebSocket real-time data: {len(df)} ticks")
|
||||
else:
|
||||
# Fallback to traditional data provider approach
|
||||
# For Williams Market Structure, we need 1s data for proper recursive analysis
|
||||
# Get 4 hours (240 minutes) of 1m data for better trade visibility
|
||||
df_1s = None
|
||||
df_1m = None
|
||||
|
||||
if ws_df is not None:
|
||||
logger.debug(f"[CHART] WebSocket data insufficient ({len(ws_df) if not ws_df.empty else 0} rows), falling back to data provider")
|
||||
|
||||
# Try to get 1s data first for Williams analysis (reduced to 10 minutes for performance)
|
||||
try:
|
||||
df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=600, refresh=False)
|
||||
if df_1s is None or df_1s.empty:
|
||||
logger.warning("[CHART] No 1s cached data available, trying fresh 1s data")
|
||||
df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=300, refresh=True)
|
||||
|
||||
if df_1s is not None and not df_1s.empty:
|
||||
# Aggregate 1s data to 1m for chart display (cleaner visualization)
|
||||
df = self._aggregate_1s_to_1m(df_1s)
|
||||
actual_timeframe = '1s→1m'
|
||||
else:
|
||||
df_1s = None
|
||||
except Exception as e:
|
||||
logger.warning(f"[CHART] Error getting 1s data: {e}")
|
||||
df_1s = None
|
||||
|
||||
# Fallback to 1m data if 1s not available (4 hours for historical trades)
|
||||
if df_1s is None:
|
||||
@ -1591,6 +1610,26 @@ class TradingDashboard:
|
||||
if 'volume' not in df.columns:
|
||||
df['volume'] = 100 # Default volume for demo
|
||||
actual_timeframe = '1m'
|
||||
|
||||
# Hybrid approach: If we have some WebSocket data, append it to historical data
|
||||
if ws_df is not None and not ws_df.empty:
|
||||
try:
|
||||
# Combine historical 1m data with recent WebSocket ticks
|
||||
ws_df_resampled = ws_df.resample('1min').agg({
|
||||
'open': 'first',
|
||||
'high': 'max',
|
||||
'low': 'min',
|
||||
'close': 'last',
|
||||
'volume': 'sum'
|
||||
}).dropna()
|
||||
|
||||
if not ws_df_resampled.empty:
|
||||
# Merge the datasets - WebSocket data is more recent
|
||||
df = pd.concat([df, ws_df_resampled]).drop_duplicates().sort_index()
|
||||
actual_timeframe = '1m+WS'
|
||||
logger.debug(f"[CHART] Hybrid mode: {len(df)} total bars (historical + WebSocket)")
|
||||
except Exception as hybrid_error:
|
||||
logger.debug(f"[CHART] Hybrid combination failed: {hybrid_error}")
|
||||
else:
|
||||
return self._create_empty_chart(
|
||||
f"{symbol} Chart",
|
||||
@ -1606,6 +1645,26 @@ class TradingDashboard:
|
||||
# Ensure timezone consistency for cached data
|
||||
df = self._ensure_timezone_consistency(df)
|
||||
actual_timeframe = '1m'
|
||||
|
||||
# Hybrid approach: If we have some WebSocket data, append it to cached data too
|
||||
if ws_df is not None and not ws_df.empty:
|
||||
try:
|
||||
# Combine cached 1m data with recent WebSocket ticks
|
||||
ws_df_resampled = ws_df.resample('1min').agg({
|
||||
'open': 'first',
|
||||
'high': 'max',
|
||||
'low': 'min',
|
||||
'close': 'last',
|
||||
'volume': 'sum'
|
||||
}).dropna()
|
||||
|
||||
if not ws_df_resampled.empty:
|
||||
# Merge the datasets - WebSocket data is more recent
|
||||
df = pd.concat([df, ws_df_resampled]).drop_duplicates().sort_index()
|
||||
actual_timeframe = '1m+WS'
|
||||
logger.debug(f"[CHART] Hybrid mode: {len(df)} total bars (cached + WebSocket)")
|
||||
except Exception as hybrid_error:
|
||||
logger.debug(f"[CHART] Hybrid combination failed: {hybrid_error}")
|
||||
|
||||
# Final check: ensure we have valid data with proper index
|
||||
if df is None or df.empty:
|
||||
@ -1627,7 +1686,7 @@ class TradingDashboard:
|
||||
df.index = pd.date_range(start=pd.Timestamp.now() - pd.Timedelta(minutes=len(df)),
|
||||
periods=len(df), freq='1min')
|
||||
|
||||
# Create subplot with secondary y-axis for volume
|
||||
# Create subplot with secondary y-axis for volume and JavaScript data management
|
||||
fig = make_subplots(
|
||||
rows=2, cols=1,
|
||||
shared_xaxes=True,
|
||||
@ -1636,6 +1695,16 @@ class TradingDashboard:
|
||||
row_heights=[0.7, 0.3]
|
||||
)
|
||||
|
||||
# Add JavaScript for client-side data management and real-time updates
|
||||
fig.add_annotation(
|
||||
text="",
|
||||
xref="paper", yref="paper",
|
||||
x=0, y=0,
|
||||
showarrow=False,
|
||||
font=dict(size=1),
|
||||
opacity=0
|
||||
)
|
||||
|
||||
# Add price line chart (main chart)
|
||||
fig.add_trace(
|
||||
go.Scatter(
|
||||
@ -3481,12 +3550,16 @@ class TradingDashboard:
|
||||
}
|
||||
|
||||
def _start_lightweight_websocket(self):
|
||||
"""Start ultra-lightweight WebSocket for real-time price updates only"""
|
||||
"""Start enhanced WebSocket for real-time price and tick data streaming"""
|
||||
try:
|
||||
if self.is_streaming:
|
||||
logger.warning("[WS] WebSocket already running")
|
||||
return
|
||||
|
||||
# Initialize tick cache for chart updates
|
||||
self.tick_cache = []
|
||||
self.max_tick_cache = 2000 # Keep last 2000 1-second ticks for chart
|
||||
|
||||
# ETH/USDT primary symbol for scalping
|
||||
symbol = "ethusdt"
|
||||
|
||||
@ -3498,23 +3571,58 @@ class TradingDashboard:
|
||||
def on_message(ws, message):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
# Extract only current price - ultra minimal processing
|
||||
current_time = time.time()
|
||||
|
||||
# Extract price data for ultra-fast updates
|
||||
if 'c' in data: # Current price from ticker
|
||||
price = float(data['c'])
|
||||
|
||||
# Update price cache (no history, just current)
|
||||
self.ws_price_cache['ETHUSDT'] = price
|
||||
self.current_prices['ETHUSDT'] = price
|
||||
|
||||
# Create tick data point for chart
|
||||
tick = {
|
||||
'timestamp': current_time,
|
||||
'datetime': datetime.now(self.timezone),
|
||||
'symbol': 'ETHUSDT',
|
||||
'price': price,
|
||||
'open': float(data.get('o', price)),
|
||||
'high': float(data.get('h', price)),
|
||||
'low': float(data.get('l', price)),
|
||||
'close': price,
|
||||
'volume': float(data.get('v', 0)),
|
||||
'count': int(data.get('c', 1))
|
||||
}
|
||||
|
||||
# Thread-safe tick cache management
|
||||
try:
|
||||
# Add to tick cache (thread-safe append)
|
||||
self.tick_cache.append(tick)
|
||||
|
||||
# Maintain cache size for performance - use slicing for thread safety
|
||||
if len(self.tick_cache) > self.max_tick_cache:
|
||||
# Keep the most recent data, remove oldest
|
||||
excess = len(self.tick_cache) - self.max_tick_cache
|
||||
self.tick_cache = self.tick_cache[excess:]
|
||||
|
||||
except Exception as cache_error:
|
||||
logger.warning(f"[WS] Cache management error: {cache_error}")
|
||||
# Reinitialize cache if corrupted
|
||||
self.tick_cache = [tick] if tick else []
|
||||
|
||||
# Performance tracking
|
||||
current_time = time.time()
|
||||
self.last_ws_update = current_time
|
||||
self.ws_update_count += 1
|
||||
|
||||
# Log every 100 updates for monitoring
|
||||
if self.ws_update_count % 100 == 0:
|
||||
logger.debug(f"[WS] {self.ws_update_count} price updates, latest: ${price:.2f}")
|
||||
cache_size = len(self.tick_cache) if hasattr(self, 'tick_cache') else 0
|
||||
logger.debug(f"[WS] {self.ws_update_count} updates, cache: {cache_size} ticks, latest: ${price:.2f}")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"[WS] Error processing message: {e}")
|
||||
# Continue processing, don't break the stream
|
||||
|
||||
def on_error(ws, error):
|
||||
logger.error(f"[WS] Error: {error}")
|
||||
@ -3529,7 +3637,7 @@ class TradingDashboard:
|
||||
self._start_lightweight_websocket()
|
||||
|
||||
def on_open(ws):
|
||||
logger.info(f"[WS] Connected for real-time ETHUSDT price updates")
|
||||
logger.info(f"[WS] Connected for real-time ETHUSDT streaming with tick cache")
|
||||
self.is_streaming = True
|
||||
|
||||
# Binance WebSocket for ticker (price only, not trades)
|
||||
@ -3554,7 +3662,7 @@ class TradingDashboard:
|
||||
self.ws_thread = threading.Thread(target=ws_worker, daemon=True)
|
||||
self.ws_thread.start()
|
||||
|
||||
logger.info("[WS] Lightweight WebSocket started for real-time price updates")
|
||||
logger.info("[WS] Enhanced WebSocket started for real-time tick streaming")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[WS] Failed to start: {e}")
|
||||
@ -3584,6 +3692,61 @@ class TradingDashboard:
|
||||
logger.warning(f"[WS] Error getting realtime price: {e}")
|
||||
return None
|
||||
|
||||
def get_realtime_tick_data(self, symbol: str, limit: int = 2000) -> pd.DataFrame:
|
||||
"""Get real-time tick data from WebSocket cache for chart updates"""
|
||||
try:
|
||||
if not hasattr(self, 'tick_cache') or not self.tick_cache:
|
||||
logger.debug(f"[WS] No tick cache available for {symbol}")
|
||||
return None
|
||||
|
||||
# Filter by symbol and convert to DataFrame
|
||||
symbol_ticks = [tick for tick in self.tick_cache if tick.get('symbol') == symbol.replace('/', '')]
|
||||
|
||||
if not symbol_ticks:
|
||||
logger.debug(f"[WS] No ticks found for symbol {symbol} in cache of {len(self.tick_cache)} items")
|
||||
return None
|
||||
|
||||
# Ensure we have enough data points for a meaningful chart
|
||||
if len(symbol_ticks) < 10:
|
||||
logger.debug(f"[WS] Only {len(symbol_ticks)} ticks available for {symbol}, need more data")
|
||||
return None
|
||||
|
||||
# Take the most recent ticks
|
||||
recent_ticks = symbol_ticks[-limit:] if len(symbol_ticks) > limit else symbol_ticks
|
||||
|
||||
# Convert to DataFrame with proper format
|
||||
df = pd.DataFrame(recent_ticks)
|
||||
|
||||
# Ensure datetime column exists and is valid
|
||||
if 'datetime' not in df.columns:
|
||||
logger.warning(f"[WS] No datetime column in tick data for {symbol}")
|
||||
return None
|
||||
|
||||
df['datetime'] = pd.to_datetime(df['datetime'])
|
||||
df.set_index('datetime', inplace=True)
|
||||
|
||||
# Ensure required columns exist with proper fallback values
|
||||
required_columns = ['open', 'high', 'low', 'close', 'volume']
|
||||
for col in required_columns:
|
||||
if col not in df.columns:
|
||||
if col == 'volume':
|
||||
df[col] = 100 # Default volume
|
||||
else:
|
||||
# Use price for OHLC if not available
|
||||
df[col] = df.get('price', df.get('close', 0))
|
||||
|
||||
# Validate data integrity
|
||||
if df.empty or len(df) < 5:
|
||||
logger.debug(f"[WS] Insufficient data after processing for {symbol}: {len(df)} rows")
|
||||
return None
|
||||
|
||||
logger.debug(f"[WS] Successfully retrieved {len(df)} ticks for {symbol}")
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"[WS] Error getting tick data for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def _create_cnn_monitoring_content(self) -> List:
|
||||
"""Create CNN monitoring and prediction analysis content"""
|
||||
try:
|
||||
|
Reference in New Issue
Block a user