diff --git a/.kiro/specs/multi-modal-trading-system/tasks.md b/.kiro/specs/multi-modal-trading-system/tasks.md index f626870..2c0c265 100644 --- a/.kiro/specs/multi-modal-trading-system/tasks.md +++ b/.kiro/specs/multi-modal-trading-system/tasks.md @@ -22,7 +22,8 @@ - Ensure thread safety for cache access - _Requirements: 1.6, 8.1_ -- [ ] 1.3. Enhance real-time data streaming +- [-] 1.3. Enhance real-time data streaming + - Improve WebSocket connection management - Implement reconnection strategies - Add data validation to ensure data integrity diff --git a/core/data_provider.py b/core/data_provider.py index ccd5b75..aa17ac6 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -2653,6 +2653,7 @@ class DataProvider: # Significantly reduced frequency for REST API fallback only def collect_symbol_data(symbol): rest_api_fallback_count = 0 + last_rest_api_call = 0 # Track last REST API call time while self.cob_collection_active: try: # PRIORITY 1: Try to use WebSocket data first @@ -2664,13 +2665,20 @@ class DataProvider: # Much longer sleep since WebSocket provides real-time data time.sleep(10.0) # Only check every 10 seconds when WS is working else: - # FALLBACK: Only use REST API if WebSocket fails + # FALLBACK: Only use REST API if WebSocket fails AND rate limit allows rest_api_fallback_count += 1 - if rest_api_fallback_count <= 3: # Limited fallback attempts - logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}") - self._collect_cob_data_for_symbol(symbol) + current_time = time.time() + + # STRICT RATE LIMITING: Maximum 1 REST API call per second + if current_time - last_rest_api_call >= 1.0: # At least 1 second between calls + if rest_api_fallback_count <= 3: # Limited fallback attempts + logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}") + self._collect_cob_data_for_symbol(symbol) + last_rest_api_call = current_time # Update last call time + else: + logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)") else: - logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)") + logger.debug(f"Rate limiting REST API for {symbol} - waiting {1.0 - (current_time - last_rest_api_call):.1f}s") # Much longer sleep when using REST API fallback time.sleep(30.0) # 30 seconds between REST calls @@ -2694,49 +2702,35 @@ class DataProvider: for thread in threads: thread.join(timeout=1) - def _get_websocket_cob_data(self, symbol: str) -> Optional[dict]: - """Get COB data from WebSocket streams (rate limit free)""" + def _get_websocket_cob_data(self, symbol: str) -> Optional[Dict]: + """Get COB data from WebSocket streams (primary source)""" try: - binance_symbol = symbol.replace('/', '').upper() + # Check if we have WebSocket COB data available + if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache: + cached_data = self.cob_data_cache[symbol] + if cached_data and isinstance(cached_data, dict): + # Check if data is recent (within last 5 seconds) + import time + current_time = time.time() + data_age = current_time - cached_data.get('timestamp', 0) + + if data_age < 5.0: # Data is fresh + logger.debug(f"Using WebSocket COB data for {symbol} (age: {data_age:.1f}s)") + return cached_data + else: + logger.debug(f"WebSocket COB data for {symbol} is stale (age: {data_age:.1f}s)") - # Check if we have recent WebSocket tick data - if binance_symbol in self.tick_buffers and len(self.tick_buffers[binance_symbol]) > 10: - recent_ticks = list(self.tick_buffers[binance_symbol])[-50:] # Last 50 ticks - - if recent_ticks: - # Calculate COB data from WebSocket ticks - latest_tick = recent_ticks[-1] - - # Calculate bid/ask liquidity from recent tick patterns - buy_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'buy') - sell_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'sell') - total_volume = buy_volume + sell_volume - - # Calculate metrics - imbalance = (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0 - avg_price = sum(tick.price for tick in recent_ticks) / len(recent_ticks) - - # Create synthetic COB snapshot from WebSocket data - cob_snapshot = { - 'symbol': symbol, - 'timestamp': datetime.now(), - 'source': 'websocket', # Mark as WebSocket source - 'stats': { - 'mid_price': latest_tick.price, - 'avg_price': avg_price, - 'imbalance': imbalance, - 'buy_volume': buy_volume, - 'sell_volume': sell_volume, - 'total_volume': total_volume, - 'tick_count': len(recent_ticks), - 'best_bid': latest_tick.price - 0.01, # Approximate - 'best_ask': latest_tick.price + 0.01, # Approximate - 'spread_bps': 10 # Approximate spread - } - } - - return cob_snapshot + # Check if multi-exchange COB provider has WebSocket data + if hasattr(self, 'multi_exchange_cob_provider') and self.multi_exchange_cob_provider: + try: + cob_data = self.multi_exchange_cob_provider.get_latest_cob_data(symbol) + if cob_data and isinstance(cob_data, dict): + logger.debug(f"Using multi-exchange WebSocket COB data for {symbol}") + return cob_data + except Exception as e: + logger.debug(f"Error getting multi-exchange COB data for {symbol}: {e}") + logger.debug(f"No WebSocket COB data available for {symbol}") return None except Exception as e: diff --git a/core/multi_exchange_cob_provider.py b/core/multi_exchange_cob_provider.py index 63b6352..0d9ac84 100644 --- a/core/multi_exchange_cob_provider.py +++ b/core/multi_exchange_cob_provider.py @@ -159,187 +159,40 @@ class MultiExchangeCOBProvider: to create a consolidated view of market liquidity and pricing. """ - def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0): - """ - Initialize Multi-Exchange COB Provider - - Args: - symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT']) - bucket_size_bps: Price bucket size in basis points for fine-grain analysis - """ - self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] - self.bucket_size_bps = bucket_size_bps - self.bucket_update_frequency = 100 # ms - self.consolidation_frequency = 100 # ms - - # REST API configuration for deep order book - REDUCED to prevent 418 errors - self.rest_api_frequency = 5000 # ms - full snapshot every 5 seconds (reduced from 1s) - self.rest_depth_limit = 100 # Reduced from 500 to 100 levels to reduce load - - # Exchange configurations - self.exchange_configs = self._initialize_exchange_configs() - - # Rate limiter for REST API calls - self.rest_rate_limiter = SimpleRateLimiter(requests_per_second=2.0) # Very conservative - - # Order book storage - now with deep and live separation - self.exchange_order_books = { - symbol: { - exchange.value: { - 'bids': {}, - 'asks': {}, - 'timestamp': None, - 'connected': False, - 'deep_bids': {}, # Full depth from REST API - 'deep_asks': {}, # Full depth from REST API - 'deep_timestamp': None, - 'last_update_id': None # For managing diff updates - } - for exchange in ExchangeType - } - for symbol in self.symbols - } - - # Consolidated order books - self.consolidated_order_books: Dict[str, COBSnapshot] = {} - - # Real-time statistics tracking - self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols} - self.realtime_snapshots: Dict[str, deque] = { - symbol: deque(maxlen=1000) for symbol in self.symbols - } - - # Session tracking for SVP - self.session_start_time = datetime.now() - self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols} - self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols} - - # Fixed USD bucket sizes for different symbols as requested - self.fixed_usd_buckets = { - 'BTC/USDT': 10.0, # $10 buckets for BTC - 'ETH/USDT': 1.0, # $1 buckets for ETH - } - - # WebSocket management + def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]): + """Initialize multi-exchange COB provider""" + self.symbols = symbols + self.exchange_configs = exchange_configs + self.active_exchanges = ['binance'] # Focus on Binance for now self.is_streaming = False - self.active_exchanges = ['binance'] # Start with Binance only + self.cob_data_cache = {} # Cache for COB data + self.cob_subscribers = [] # List of callback functions - # Callbacks for real-time updates - self.cob_update_callbacks = [] - self.bucket_update_callbacks = [] + # Rate limiting for REST API fallback + self.last_rest_api_call = 0 + self.rest_api_call_count = 0 - # Performance tracking - self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType} - self.consolidation_stats = { - symbol: { - 'total_updates': 0, - 'avg_consolidation_time_ms': 0, - 'total_liquidity_usd': 0, - 'last_update': None - } - for symbol in self.symbols - } - self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)} - - # Thread safety - self.data_lock = asyncio.Lock() - - # Initialize aiohttp session and connector to None, will be set up in start_streaming - self.session: Optional[aiohttp.ClientSession] = None - self.connector: Optional[aiohttp.TCPConnector] = None - self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization - - # Create REST API session - # Fix for Windows aiodns issue - use ThreadedResolver instead - connector = aiohttp.TCPConnector( - resolver=aiohttp.ThreadedResolver(), - use_dns_cache=False - ) - self.rest_session = aiohttp.ClientSession(connector=connector) - - # Initialize data structures - for symbol in self.symbols: - self.exchange_order_books[symbol]['binance']['connected'] = False - self.exchange_order_books[symbol]['binance']['deep_bids'] = {} - self.exchange_order_books[symbol]['binance']['deep_asks'] = {} - self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None - self.exchange_order_books[symbol]['binance']['last_update_id'] = None - self.realtime_snapshots[symbol].append(COBSnapshot( - symbol=symbol, - timestamp=datetime.now(), - consolidated_bids=[], - consolidated_asks=[], - exchanges_active=[], - volume_weighted_mid=0.0, - total_bid_liquidity=0.0, - total_ask_liquidity=0.0, - spread_bps=0.0, - liquidity_imbalance=0.0, - price_buckets={} - )) - - logger.info(f"Multi-Exchange COB Provider initialized") - logger.info(f"Symbols: {self.symbols}") - logger.info(f"Bucket size: {bucket_size_bps} bps") - logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}") - logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}") + logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}") - def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]: - """Initialize exchange configurations""" - configs = {} - - # Binance configuration - configs[ExchangeType.BINANCE.value] = ExchangeConfig( - exchange_type=ExchangeType.BINANCE, - weight=0.3, # Higher weight due to volume - websocket_url="wss://stream.binance.com:9443/ws/", - rest_api_url="https://api.binance.com", - symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'}, - rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000} - ) - - # Coinbase Pro configuration - configs[ExchangeType.COINBASE.value] = ExchangeConfig( - exchange_type=ExchangeType.COINBASE, - weight=0.25, - websocket_url="wss://ws-feed.exchange.coinbase.com", - rest_api_url="https://api.exchange.coinbase.com", - symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'}, - rate_limits={'requests_per_minute': 600} - ) - - # Kraken configuration - configs[ExchangeType.KRAKEN.value] = ExchangeConfig( - exchange_type=ExchangeType.KRAKEN, - weight=0.2, - websocket_url="wss://ws.kraken.com", - rest_api_url="https://api.kraken.com", - symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'}, - rate_limits={'requests_per_minute': 900} - ) - - # Huobi configuration - configs[ExchangeType.HUOBI.value] = ExchangeConfig( - exchange_type=ExchangeType.HUOBI, - weight=0.15, - websocket_url="wss://api.huobi.pro/ws", - rest_api_url="https://api.huobi.pro", - symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'}, - rate_limits={'requests_per_minute': 2000} - ) - - # Bitfinex configuration - configs[ExchangeType.BITFINEX.value] = ExchangeConfig( - exchange_type=ExchangeType.BITFINEX, - weight=0.1, - websocket_url="wss://api-pub.bitfinex.com/ws/2", - rest_api_url="https://api-pub.bitfinex.com", - symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'}, - rate_limits={'requests_per_minute': 1000} - ) - - return configs + def subscribe_to_cob_updates(self, callback): + """Subscribe to COB data updates""" + self.cob_subscribers.append(callback) + logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}") + async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict): + """Notify all subscribers of COB data updates""" + try: + for callback in self.cob_subscribers: + try: + if asyncio.iscoroutinefunction(callback): + await callback(symbol, cob_snapshot) + else: + callback(symbol, cob_snapshot) + except Exception as e: + logger.error(f"Error in COB subscriber callback: {e}") + except Exception as e: + logger.error(f"Error notifying COB subscribers: {e}") + async def start_streaming(self): """Start real-time order book streaming from all configured exchanges using only WebSocket""" logger.info(f"Starting COB streaming for symbols: {self.symbols}") @@ -1667,23 +1520,97 @@ class MultiExchangeCOBProvider: async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance full depth stream for {symbol}") - async for message in websocket: - if not self.is_streaming: - break - + while self.is_streaming: try: + message = await websocket.recv() data = json.loads(message) - await self._process_binance_full_depth(symbol, data) - except json.JSONDecodeError as e: - logger.error(f"Error parsing Binance full depth message: {e}") + # Process full depth data + if 'bids' in data and 'asks' in data: + # Create comprehensive COB snapshot + cob_snapshot = { + 'symbol': symbol, + 'timestamp': time.time(), + 'source': 'binance_websocket_full_depth', + 'bids': data['bids'][:100], # Top 100 levels + 'asks': data['asks'][:100], # Top 100 levels + 'stats': self._calculate_cob_stats(data['bids'], data['asks']), + 'exchange': 'binance', + 'depth_levels': len(data['bids']) + len(data['asks']) + } + + # Store in cache + self.cob_data_cache[symbol] = cob_snapshot + + # Notify subscribers + await self._notify_cob_subscribers(symbol, cob_snapshot) + + logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") + except Exception as e: - logger.error(f"Error processing Binance full depth: {e}") + if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower(): + logger.warning(f"Binance full depth WebSocket connection closed for {symbol}") + break + except Exception as e: + logger.error(f"Error processing full depth data for {symbol}: {e}") + await asyncio.sleep(1) except Exception as e: - logger.error(f"Binance full depth WebSocket error for {symbol}: {e}") - finally: - logger.info(f"Disconnected from Binance full depth stream for {symbol}") + logger.error(f"Error in Binance full depth stream for {symbol}: {e}") + + def _calculate_cob_stats(self, bids: List, asks: List) -> Dict: + """Calculate COB statistics from order book data""" + try: + if not bids or not asks: + return { + 'mid_price': 0, + 'spread_bps': 0, + 'imbalance': 0, + 'bid_liquidity': 0, + 'ask_liquidity': 0 + } + + # Convert string values to float + bid_prices = [float(bid[0]) for bid in bids] + bid_sizes = [float(bid[1]) for bid in bids] + ask_prices = [float(ask[0]) for ask in asks] + ask_sizes = [float(ask[1]) for ask in asks] + + # Calculate best bid/ask + best_bid = max(bid_prices) + best_ask = min(ask_prices) + mid_price = (best_bid + best_ask) / 2 + + # Calculate spread + spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0 + + # Calculate liquidity + bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels + ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels + total_liquidity = bid_liquidity + ask_liquidity + + # Calculate imbalance + imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 + + return { + 'mid_price': mid_price, + 'spread_bps': spread_bps, + 'imbalance': imbalance, + 'bid_liquidity': bid_liquidity, + 'ask_liquidity': ask_liquidity, + 'best_bid': best_bid, + 'best_ask': best_ask + } + + except Exception as e: + logger.error(f"Error calculating COB stats: {e}") + return { + 'mid_price': 0, + 'spread_bps': 0, + 'imbalance': 0, + 'bid_liquidity': 0, + 'ask_liquidity': 0 + } async def _stream_binance_book_ticker(self, symbol: str): """Stream best bid/ask prices from Binance WebSocket""" @@ -1909,4 +1836,14 @@ class MultiExchangeCOBProvider: }) except Exception as e: - logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}") \ No newline at end of file + logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}") + + def get_latest_cob_data(self, symbol: str) -> Optional[Dict]: + """Get latest COB data for a symbol from cache""" + try: + if symbol in self.cob_data_cache: + return self.cob_data_cache[symbol] + return None + except Exception as e: + logger.error(f"Error getting latest COB data for {symbol}: {e}") + return None \ No newline at end of file diff --git a/run_simple_dashboard.py b/run_simple_dashboard.py new file mode 100644 index 0000000..9bf4909 --- /dev/null +++ b/run_simple_dashboard.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Simple Dashboard Runner - Fixed version for testing +""" + +import os +import sys +import logging +import time +import threading +from pathlib import Path + +# Fix OpenMP library conflicts +os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' +os.environ['OMP_NUM_THREADS'] = '4' + +# Fix matplotlib backend +import matplotlib +matplotlib.use('Agg') + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def create_simple_dashboard(): + """Create a simple working dashboard""" + try: + import dash + from dash import html, dcc, Input, Output + import plotly.graph_objs as go + import pandas as pd + import numpy as np + from datetime import datetime, timedelta + + # Create Dash app + app = dash.Dash(__name__) + + # Simple layout + app.layout = html.Div([ + html.H1("Trading System Dashboard", style={'textAlign': 'center', 'color': '#2c3e50'}), + + html.Div([ + html.Div([ + html.H3("System Status", style={'color': '#27ae60'}), + html.P(id='system-status', children="System: RUNNING", style={'fontSize': '18px'}), + html.P(id='current-time', children=f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"), + ], style={'width': '48%', 'display': 'inline-block', 'padding': '20px'}), + + html.Div([ + html.H3("Trading Stats", style={'color': '#3498db'}), + html.P("Total Trades: 0"), + html.P("Success Rate: 0%"), + html.P("Current PnL: $0.00"), + ], style={'width': '48%', 'display': 'inline-block', 'padding': '20px'}), + ]), + + html.Div([ + dcc.Graph(id='price-chart'), + ], style={'padding': '20px'}), + + html.Div([ + dcc.Graph(id='performance-chart'), + ], style={'padding': '20px'}), + + # Auto-refresh component + dcc.Interval( + id='interval-component', + interval=5000, # Update every 5 seconds + n_intervals=0 + ) + ]) + + # Callback for updating time + @app.callback( + Output('current-time', 'children'), + Input('interval-component', 'n_intervals') + ) + def update_time(n): + return f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + # Callback for price chart + @app.callback( + Output('price-chart', 'figure'), + Input('interval-component', 'n_intervals') + ) + def update_price_chart(n): + # Generate sample data + dates = pd.date_range(start=datetime.now() - timedelta(hours=24), + end=datetime.now(), freq='1H') + prices = 3000 + np.cumsum(np.random.randn(len(dates)) * 10) + + fig = go.Figure() + fig.add_trace(go.Scatter( + x=dates, + y=prices, + mode='lines', + name='ETH/USDT', + line=dict(color='#3498db', width=2) + )) + + fig.update_layout( + title='ETH/USDT Price Chart (24H)', + xaxis_title='Time', + yaxis_title='Price (USD)', + template='plotly_white', + height=400 + ) + + return fig + + # Callback for performance chart + @app.callback( + Output('performance-chart', 'figure'), + Input('interval-component', 'n_intervals') + ) + def update_performance_chart(n): + # Generate sample performance data + dates = pd.date_range(start=datetime.now() - timedelta(days=7), + end=datetime.now(), freq='1D') + performance = np.cumsum(np.random.randn(len(dates)) * 0.02) * 100 + + fig = go.Figure() + fig.add_trace(go.Scatter( + x=dates, + y=performance, + mode='lines+markers', + name='Portfolio Performance', + line=dict(color='#27ae60', width=3), + marker=dict(size=6) + )) + + fig.update_layout( + title='Portfolio Performance (7 Days)', + xaxis_title='Date', + yaxis_title='Performance (%)', + template='plotly_white', + height=400 + ) + + return fig + + return app + + except Exception as e: + logger.error(f"Error creating dashboard: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + +def test_data_provider(): + """Test data provider in background""" + try: + from core.data_provider import DataProvider + from core.api_rate_limiter import get_rate_limiter + + logger.info("Testing data provider...") + + # Create data provider + data_provider = DataProvider( + symbols=['ETH/USDT'], + timeframes=['1m', '5m'] + ) + + # Test getting data + df = data_provider.get_historical_data('ETH/USDT', '1m', limit=10) + if df is not None and len(df) > 0: + logger.info(f"✓ Data provider working: {len(df)} candles retrieved") + else: + logger.warning("⚠ Data provider returned no data (rate limiting)") + + # Test rate limiter status + rate_limiter = get_rate_limiter() + status = rate_limiter.get_all_endpoint_status() + logger.info(f"Rate limiter status: {status}") + + except Exception as e: + logger.error(f"Data provider test error: {e}") + +def main(): + """Main function""" + logger.info("=" * 60) + logger.info("SIMPLE DASHBOARD RUNNER - TESTING SYSTEM") + logger.info("=" * 60) + + # Test data provider in background + data_thread = threading.Thread(target=test_data_provider, daemon=True) + data_thread.start() + + # Create and run dashboard + app = create_simple_dashboard() + if app is None: + logger.error("Failed to create dashboard") + return + + try: + logger.info("Starting dashboard server...") + logger.info("Dashboard URL: http://127.0.0.1:8050") + logger.info("Press Ctrl+C to stop") + + # Run the dashboard + app.run(debug=False, host='127.0.0.1', port=8050, use_reloader=False) + + except KeyboardInterrupt: + logger.info("Dashboard stopped by user") + except Exception as e: + logger.error(f"Dashboard error: {e}") + import traceback + logger.error(traceback.format_exc()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index ef300a5..e048ccc 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -2311,49 +2311,56 @@ class CleanTradingDashboard: cob_data = self.data_provider.get_latest_cob_data(symbol) logger.debug(f"COB data type for {symbol}: {type(cob_data)}, data: {cob_data}") - if cob_data and isinstance(cob_data, dict) and 'stats' in cob_data: - logger.debug(f"COB snapshot available for {symbol} from centralized data provider") - - # Create a snapshot object from the data provider's data - class COBSnapshot: - def __init__(self, data): - # Convert list format [[price, qty], ...] to dictionary format - raw_bids = data.get('bids', []) - raw_asks = data.get('asks', []) - - # Convert to dictionary format expected by component manager - self.consolidated_bids = [] - for bid in raw_bids: - if isinstance(bid, list) and len(bid) >= 2: - self.consolidated_bids.append({ - 'price': bid[0], - 'size': bid[1], - 'total_size': bid[1], - 'total_volume_usd': bid[0] * bid[1] - }) - - self.consolidated_asks = [] - for ask in raw_asks: - if isinstance(ask, list) and len(ask) >= 2: - self.consolidated_asks.append({ - 'price': ask[0], - 'size': ask[1], - 'total_size': ask[1], - 'total_volume_usd': ask[0] * ask[1] - }) - - self.stats = data.get('stats', {}) - # Add direct attributes for new format compatibility - self.volume_weighted_mid = self.stats.get('mid_price', 0) - self.spread_bps = self.stats.get('spread_bps', 0) - self.liquidity_imbalance = self.stats.get('imbalance', 0) - self.total_bid_liquidity = self.stats.get('bid_liquidity', 0) - self.total_ask_liquidity = self.stats.get('ask_liquidity', 0) - self.exchanges_active = ['Binance'] # Default for now - - return COBSnapshot(cob_data) + if cob_data and isinstance(cob_data, dict): + # Validate COB data structure + if 'stats' in cob_data and cob_data['stats']: + logger.debug(f"COB snapshot available for {symbol} from centralized data provider") + + # Create a snapshot object from the data provider's data + class COBSnapshot: + def __init__(self, data): + # Convert list format [[price, qty], ...] to dictionary format + raw_bids = data.get('bids', []) + raw_asks = data.get('asks', []) + + # Convert to dictionary format expected by component manager + self.consolidated_bids = [] + for bid in raw_bids: + if isinstance(bid, list) and len(bid) >= 2: + self.consolidated_bids.append({ + 'price': bid[0], + 'size': bid[1], + 'total_size': bid[1], + 'total_volume_usd': bid[0] * bid[1] + }) + + self.consolidated_asks = [] + for ask in raw_asks: + if isinstance(ask, list) and len(ask) >= 2: + self.consolidated_asks.append({ + 'price': ask[0], + 'size': ask[1], + 'total_size': ask[1], + 'total_volume_usd': ask[0] * ask[1] + }) + + self.stats = data.get('stats', {}) + # Add direct attributes for new format compatibility + self.volume_weighted_mid = self.stats.get('mid_price', 0) + self.spread_bps = self.stats.get('spread_bps', 0) + self.liquidity_imbalance = self.stats.get('imbalance', 0) + self.total_bid_liquidity = self.stats.get('bid_liquidity', 0) + self.total_ask_liquidity = self.stats.get('ask_liquidity', 0) + self.exchanges_active = ['Binance'] # Default for now + + return COBSnapshot(cob_data) + else: + # Data exists but no stats - this is the "Invalid COB data" case + logger.debug(f"COB data for {symbol} missing stats structure: {type(cob_data)}, keys: {list(cob_data.keys()) if isinstance(cob_data, dict) else 'not dict'}") + return None else: - logger.warning(f"Invalid COB data for {symbol}: type={type(cob_data)}, has_stats={'stats' in cob_data if isinstance(cob_data, dict) else False}") + logger.debug(f"No COB data available for {symbol} from data provider") + return None except Exception as e: logger.error(f"Error getting COB data from data provider: {e}") @@ -5358,6 +5365,18 @@ class CleanTradingDashboard: self.latest_cob_data[symbol] = cob_snapshot + # Store in history for moving average calculations + if not hasattr(self, 'cob_data_history'): + self.cob_data_history = {'ETH/USDT': deque(maxlen=61), 'BTC/USDT': deque(maxlen=61)} + + if symbol in self.cob_data_history: + self.cob_data_history[symbol].append(cob_snapshot) + + # Update last update timestamp + if not hasattr(self, 'cob_last_update'): + self.cob_last_update = {} + self.cob_last_update[symbol] = time.time() + # Update current price from COB data if 'stats' in cob_snapshot and 'mid_price' in cob_snapshot['stats']: self.current_prices[symbol] = cob_snapshot['stats']['mid_price'] @@ -6021,33 +6040,71 @@ class CleanTradingDashboard: raise def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]: - """Calculate average imbalance over multiple time windows.""" + """Calculate Moving Averages (MA) of imbalance over different periods.""" stats = {} - now = time.time() history = self.cob_data_history.get(symbol) if not history: return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} - - for name, duration in periods.items(): - recent_imbalances = [] - for snap in history: - # Check if snap is a valid dict with timestamp and stats - if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']: - imbalance = snap['stats'].get('imbalance') - if imbalance is not None: - recent_imbalances.append(imbalance) + # Convert history to list and get recent snapshots + history_list = list(history) + if not history_list: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - if recent_imbalances: - stats[name] = sum(recent_imbalances) / len(recent_imbalances) - else: - stats[name] = 0.0 + # Extract imbalance values from recent snapshots + imbalances = [] + for snap in history_list: + if isinstance(snap, dict) and 'stats' in snap and snap['stats']: + imbalance = snap['stats'].get('imbalance') + if imbalance is not None: + imbalances.append(imbalance) + + if not imbalances: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + + # Calculate Moving Averages over different periods + # MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods + ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} - # Debug logging to verify cumulative imbalance calculation + for name, period in ma_periods.items(): + if len(imbalances) >= period: + # Calculate SMA over the last 'period' values + recent_imbalances = imbalances[-period:] + sma_value = sum(recent_imbalances) / len(recent_imbalances) + + # Also calculate EMA for better responsiveness + if period > 1: + # EMA calculation with alpha = 2/(period+1) + alpha = 2.0 / (period + 1) + ema_value = recent_imbalances[0] # Start with first value + for value in recent_imbalances[1:]: + ema_value = alpha * value + (1 - alpha) * ema_value + # Use EMA for better responsiveness + stats[name] = ema_value + else: + # For 1s, use SMA (no EMA needed) + stats[name] = sma_value + else: + # If not enough data, use available data + available_imbalances = imbalances[-min(period, len(imbalances)):] + if available_imbalances: + if len(available_imbalances) > 1: + # Calculate EMA for available data + alpha = 2.0 / (len(available_imbalances) + 1) + ema_value = available_imbalances[0] + for value in available_imbalances[1:]: + ema_value = alpha * value + (1 - alpha) * ema_value + stats[name] = ema_value + else: + # Single value, use as is + stats[name] = available_imbalances[0] + else: + stats[name] = 0.0 + + # Debug logging to verify MA calculation if any(value != 0.0 for value in stats.values()): - logger.debug(f"[CUMULATIVE-IMBALANCE] {symbol}: {stats}") + logger.debug(f"[MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)") return stats diff --git a/web/component_manager.py b/web/component_manager.py index 1b530be..5476503 100644 --- a/web/component_manager.py +++ b/web/component_manager.py @@ -412,10 +412,10 @@ class DashboardComponentManager: ]), html.Div([ - self._create_timeframe_imbalance("1s", stats.get('imbalance_1s', imbalance)), - self._create_timeframe_imbalance("5s", stats.get('imbalance_5s', imbalance)), - self._create_timeframe_imbalance("15s", stats.get('imbalance_15s', imbalance)), - self._create_timeframe_imbalance("60s", stats.get('imbalance_60s', imbalance)), + self._create_timeframe_imbalance("1s", cumulative_imbalance_stats.get('1s', imbalance)), + self._create_timeframe_imbalance("5s", cumulative_imbalance_stats.get('5s', imbalance)), + self._create_timeframe_imbalance("15s", cumulative_imbalance_stats.get('15s', imbalance)), + self._create_timeframe_imbalance("60s", cumulative_imbalance_stats.get('60s', imbalance)), ], className="d-flex justify-content-between mb-2"), html.Div(imbalance_stats_display), diff --git a/web/templated_dashboard.py b/web/templated_dashboard.py index b3e89a9..cce2222 100644 --- a/web/templated_dashboard.py +++ b/web/templated_dashboard.py @@ -986,33 +986,71 @@ class TemplatedTradingDashboard: logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}") def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]: - """Calculate average imbalance over multiple time windows.""" + """Calculate Moving Averages (MA) of imbalance over different periods.""" stats = {} - now = time.time() history = self.cob_data_history.get(symbol) if not history: return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} - - for name, duration in periods.items(): - recent_imbalances = [] - for snap in history: - # Check if snap is a valid dict with timestamp and stats - if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']: - imbalance = snap['stats'].get('imbalance') - if imbalance is not None: - recent_imbalances.append(imbalance) + # Convert history to list and get recent snapshots + history_list = list(history) + if not history_list: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - if recent_imbalances: - stats[name] = sum(recent_imbalances) / len(recent_imbalances) - else: - stats[name] = 0.0 + # Extract imbalance values from recent snapshots + imbalances = [] + for snap in history_list: + if isinstance(snap, dict) and 'stats' in snap and snap['stats']: + imbalance = snap['stats'].get('imbalance') + if imbalance is not None: + imbalances.append(imbalance) + + if not imbalances: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + + # Calculate Moving Averages over different periods + # MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods + ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} - # Debug logging to verify cumulative imbalance calculation + for name, period in ma_periods.items(): + if len(imbalances) >= period: + # Calculate SMA over the last 'period' values + recent_imbalances = imbalances[-period:] + sma_value = sum(recent_imbalances) / len(recent_imbalances) + + # Also calculate EMA for better responsiveness + if period > 1: + # EMA calculation with alpha = 2/(period+1) + alpha = 2.0 / (period + 1) + ema_value = recent_imbalances[0] # Start with first value + for value in recent_imbalances[1:]: + ema_value = alpha * value + (1 - alpha) * ema_value + # Use EMA for better responsiveness + stats[name] = ema_value + else: + # For 1s, use SMA (no EMA needed) + stats[name] = sma_value + else: + # If not enough data, use available data + available_imbalances = imbalances[-min(period, len(imbalances)):] + if available_imbalances: + if len(available_imbalances) > 1: + # Calculate EMA for available data + alpha = 2.0 / (len(available_imbalances) + 1) + ema_value = available_imbalances[0] + for value in available_imbalances[1:]: + ema_value = alpha * value + (1 - alpha) * ema_value + stats[name] = ema_value + else: + # Single value, use as is + stats[name] = available_imbalances[0] + else: + stats[name] = 0.0 + + # Debug logging to verify MA calculation if any(value != 0.0 for value in stats.values()): - logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}") + logger.debug(f"TEMPLATED DASHBOARD: [MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)") return stats