From c30267bf0bec7b238881aa247a39db0b07747ff3 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 23 Jul 2025 22:39:10 +0300 Subject: [PATCH] COB tests and data analysis --- .../specs/multi-modal-trading-system/tasks.md | 2 +- test_cob_comparison.py | 276 +++++++++++++++ test_cob_data_stability.py | 324 +++++++++++++++--- 3 files changed, 555 insertions(+), 47 deletions(-) create mode 100644 test_cob_comparison.py diff --git a/.kiro/specs/multi-modal-trading-system/tasks.md b/.kiro/specs/multi-modal-trading-system/tasks.md index 3a31bee..2bedbfc 100644 --- a/.kiro/specs/multi-modal-trading-system/tasks.md +++ b/.kiro/specs/multi-modal-trading-system/tasks.md @@ -60,7 +60,7 @@ - Include COB ±20 buckets and MA (1s,5s,15s,60s) of COB imbalance ±5 buckets - Output BUY/SELL trading action with confidence scores - _Requirements: 2.1, 2.2, 2.8, 1.10_ -- [ ] 2.1. Implement CNN inference with standardized input format +- [x] 2.1. Implement CNN inference with standardized input format - Accept BaseDataInput with standardized COB+OHLCV format - Process 300 frames of multi-timeframe data with COB buckets - Output BUY/SELL recommendations with confidence scores diff --git a/test_cob_comparison.py b/test_cob_comparison.py new file mode 100644 index 0000000..1215dd1 --- /dev/null +++ b/test_cob_comparison.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +Compare COB data quality between DataProvider and COBIntegration + +This test compares: +1. DataProvider COB collection (used in our test) +2. COBIntegration direct access (used in cob_realtime_dashboard.py) + +To understand why cob_realtime_dashboard.py gets more stable data. +""" + +import asyncio +import logging +import time +from collections import deque +from datetime import datetime, timedelta + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +from core.data_provider import DataProvider, MarketTick +from core.config import get_config + +# Try to import COBIntegration like cob_realtime_dashboard does +try: + from core.cob_integration import COBIntegration + COB_INTEGRATION_AVAILABLE = True +except ImportError: + COB_INTEGRATION_AVAILABLE = False + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class COBComparisonTester: + def __init__(self, symbol='ETH/USDT', duration_seconds=15): + self.symbol = symbol + self.duration = timedelta(seconds=duration_seconds) + + # Data storage for both methods + self.dp_ticks = deque() # DataProvider ticks + self.cob_data = deque() # COBIntegration data + + # Initialize DataProvider (method 1) + logger.info("Initializing DataProvider...") + self.data_provider = DataProvider() + self.dp_cob_received = 0 + + # Initialize COBIntegration (method 2) + self.cob_integration = None + self.cob_received = 0 + if COB_INTEGRATION_AVAILABLE: + logger.info("Initializing COBIntegration...") + self.cob_integration = COBIntegration(symbols=[self.symbol]) + else: + logger.warning("COBIntegration not available - will only test DataProvider") + + self.start_time = None + self.subscriber_id = None + + def _dp_cob_callback(self, symbol: str, cob_data: dict): + """Callback for DataProvider COB data""" + self.dp_cob_received += 1 + + if 'stats' in cob_data and 'mid_price' in cob_data['stats']: + mid_price = cob_data['stats']['mid_price'] + if mid_price > 0: + synthetic_tick = MarketTick( + symbol=symbol, + timestamp=cob_data.get('timestamp', datetime.now()), + price=mid_price, + volume=cob_data.get('stats', {}).get('total_volume', 0), + quantity=0, + side='dp_cob', + trade_id=f"dp_{self.dp_cob_received}", + is_buyer_maker=False, + raw_data=cob_data + ) + self.dp_ticks.append(synthetic_tick) + + if self.dp_cob_received % 20 == 0: + logger.info(f"[DataProvider] Update #{self.dp_cob_received}: {symbol} @ ${mid_price:.2f}") + + def _cob_integration_callback(self, symbol: str, data: dict): + """Callback for COBIntegration data""" + self.cob_received += 1 + + # Store COBIntegration data directly + cob_record = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'data': data, + 'source': 'cob_integration' + } + self.cob_data.append(cob_record) + + if self.cob_received % 20 == 0: + stats = data.get('stats', {}) + mid_price = stats.get('mid_price', 0) + logger.info(f"[COBIntegration] Update #{self.cob_received}: {symbol} @ ${mid_price:.2f}") + + async def run_comparison_test(self): + """Run the comparison test""" + logger.info(f"Starting COB comparison test for {self.symbol} for {self.duration.total_seconds()} seconds...") + + # Start DataProvider COB collection + try: + logger.info("Starting DataProvider COB collection...") + self.data_provider.start_cob_collection() + self.data_provider.subscribe_to_cob(self._dp_cob_callback) + await self.data_provider.start_real_time_streaming() + logger.info("DataProvider streaming started") + except Exception as e: + logger.error(f"Failed to start DataProvider: {e}") + + # Start COBIntegration if available + if self.cob_integration: + try: + logger.info("Starting COBIntegration...") + self.cob_integration.add_dashboard_callback(self._cob_integration_callback) + await self.cob_integration.start() + logger.info("COBIntegration started") + except Exception as e: + logger.error(f"Failed to start COBIntegration: {e}") + + # Collect data for specified duration + self.start_time = datetime.now() + while datetime.now() - self.start_time < self.duration: + await asyncio.sleep(1) + logger.info(f"DataProvider: {len(self.dp_ticks)} ticks | COBIntegration: {len(self.cob_data)} updates") + + # Stop data collection + try: + await self.data_provider.stop_real_time_streaming() + if self.cob_integration: + await self.cob_integration.stop() + except Exception as e: + logger.error(f"Error stopping data collection: {e}") + + logger.info(f"Comparison complete:") + logger.info(f" DataProvider: {len(self.dp_ticks)} ticks received") + logger.info(f" COBIntegration: {len(self.cob_data)} updates received") + + # Analyze and plot the differences + self.analyze_differences() + self.create_comparison_plots() + + def analyze_differences(self): + """Analyze the differences between the two data sources""" + logger.info("Analyzing data quality differences...") + + # Analyze DataProvider data + dp_order_book_count = 0 + dp_mid_prices = [] + + for tick in self.dp_ticks: + if hasattr(tick, 'raw_data') and tick.raw_data: + if 'bids' in tick.raw_data and 'asks' in tick.raw_data: + dp_order_book_count += 1 + if 'stats' in tick.raw_data and 'mid_price' in tick.raw_data['stats']: + dp_mid_prices.append(tick.raw_data['stats']['mid_price']) + + # Analyze COBIntegration data + cob_order_book_count = 0 + cob_mid_prices = [] + + for record in self.cob_data: + data = record['data'] + if 'bids' in data and 'asks' in data: + cob_order_book_count += 1 + if 'stats' in data and 'mid_price' in data['stats']: + cob_mid_prices.append(data['stats']['mid_price']) + + logger.info("Data Quality Analysis:") + logger.info(f" DataProvider:") + logger.info(f" Total updates: {len(self.dp_ticks)}") + logger.info(f" With order book data: {dp_order_book_count}") + logger.info(f" Mid prices collected: {len(dp_mid_prices)}") + if dp_mid_prices: + logger.info(f" Price range: ${min(dp_mid_prices):.2f} - ${max(dp_mid_prices):.2f}") + + logger.info(f" COBIntegration:") + logger.info(f" Total updates: {len(self.cob_data)}") + logger.info(f" With order book data: {cob_order_book_count}") + logger.info(f" Mid prices collected: {len(cob_mid_prices)}") + if cob_mid_prices: + logger.info(f" Price range: ${min(cob_mid_prices):.2f} - ${max(cob_mid_prices):.2f}") + + def create_comparison_plots(self): + """Create comparison plots showing the difference""" + logger.info("Creating comparison plots...") + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12)) + + # Plot 1: Price comparison + dp_times = [] + dp_prices = [] + for tick in self.dp_ticks: + if tick.price > 0: + dp_times.append(tick.timestamp) + dp_prices.append(tick.price) + + cob_times = [] + cob_prices = [] + for record in self.cob_data: + data = record['data'] + if 'stats' in data and 'mid_price' in data['stats']: + cob_times.append(record['timestamp']) + cob_prices.append(data['stats']['mid_price']) + + if dp_times: + ax1.plot(pd.to_datetime(dp_times), dp_prices, 'b-', alpha=0.7, label='DataProvider COB', linewidth=1) + if cob_times: + ax1.plot(pd.to_datetime(cob_times), cob_prices, 'r-', alpha=0.7, label='COBIntegration', linewidth=1) + + ax1.set_title('Price Comparison: DataProvider vs COBIntegration') + ax1.set_ylabel('Price (USDT)') + ax1.legend() + ax1.grid(True, alpha=0.3) + + # Plot 2: Data quality comparison (order book depth) + dp_bid_counts = [] + dp_ask_counts = [] + dp_ob_times = [] + + for tick in self.dp_ticks: + if hasattr(tick, 'raw_data') and tick.raw_data: + if 'bids' in tick.raw_data and 'asks' in tick.raw_data: + dp_bid_counts.append(len(tick.raw_data['bids'])) + dp_ask_counts.append(len(tick.raw_data['asks'])) + dp_ob_times.append(tick.timestamp) + + cob_bid_counts = [] + cob_ask_counts = [] + cob_ob_times = [] + + for record in self.cob_data: + data = record['data'] + if 'bids' in data and 'asks' in data: + cob_bid_counts.append(len(data['bids'])) + cob_ask_counts.append(len(data['asks'])) + cob_ob_times.append(record['timestamp']) + + if dp_ob_times: + ax2.plot(pd.to_datetime(dp_ob_times), dp_bid_counts, 'b--', alpha=0.7, label='DP Bid Levels') + ax2.plot(pd.to_datetime(dp_ob_times), dp_ask_counts, 'b:', alpha=0.7, label='DP Ask Levels') + if cob_ob_times: + ax2.plot(pd.to_datetime(cob_ob_times), cob_bid_counts, 'r--', alpha=0.7, label='COB Bid Levels') + ax2.plot(pd.to_datetime(cob_ob_times), cob_ask_counts, 'r:', alpha=0.7, label='COB Ask Levels') + + ax2.set_title('Order Book Depth Comparison') + ax2.set_ylabel('Number of Levels') + ax2.set_xlabel('Time') + ax2.legend() + ax2.grid(True, alpha=0.3) + + plt.tight_layout() + + plot_filename = f"cob_comparison_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" + plt.savefig(plot_filename, dpi=150) + logger.info(f"Comparison plot saved to {plot_filename}") + plt.show() + + +async def main(): + tester = COBComparisonTester() + await tester.run_comparison_test() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Test interrupted by user.") diff --git a/test_cob_data_stability.py b/test_cob_data_stability.py index aec4dfa..3f7675e 100644 --- a/test_cob_data_stability.py +++ b/test_cob_data_stability.py @@ -10,6 +10,7 @@ import pandas as pd from matplotlib.colors import LogNorm from core.data_provider import DataProvider, MarketTick +from core.config import get_config # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -17,11 +18,33 @@ logger = logging.getLogger(__name__) class COBStabilityTester: - def __init__(self, symbol='ETH/USDT', duration_seconds=15): + def __init__(self, symbol='ETHUSDT', duration_seconds=15): self.symbol = symbol self.duration = timedelta(seconds=duration_seconds) self.ticks = deque() - self.data_provider = DataProvider(symbols=[self.symbol], timeframes=['1s']) + + # Set granularity (buckets) based on symbol + if 'ETH' in symbol.upper(): + self.price_granularity = 1.0 # 1 USD for ETH + elif 'BTC' in symbol.upper(): + self.price_granularity = 10.0 # 10 USD for BTC + else: + self.price_granularity = 1.0 # Default 1 USD + + logger.info(f"Using price granularity: ${self.price_granularity} for {symbol}") + + # Initialize DataProvider the same way as clean_dashboard + logger.info("Initializing DataProvider like in clean_dashboard...") + self.data_provider = DataProvider() # Use default constructor like clean_dashboard + + # Initialize COB data collection like clean_dashboard does + self.cob_data_received = 0 + self.latest_cob_data = {} + + # Store all COB snapshots for heatmap generation + self.cob_snapshots = deque() + self.price_data = [] # For price line chart + self.start_time = None self.subscriber_id = None @@ -33,16 +56,79 @@ class COBStabilityTester: # Store all ticks self.ticks.append(tick) + + def _cob_data_callback(self, symbol: str, cob_data: dict): + """Callback function to receive COB data from the DataProvider.""" + self.cob_data_received += 1 + self.latest_cob_data[symbol] = cob_data + + # Store the complete COB snapshot for heatmap generation + if 'bids' in cob_data and 'asks' in cob_data: + snapshot = { + 'timestamp': cob_data.get('timestamp', datetime.now()), + 'bids': cob_data['bids'], + 'asks': cob_data['asks'], + 'stats': cob_data.get('stats', {}) + } + self.cob_snapshots.append(snapshot) + + # Convert COB data to tick-like format for analysis + if 'stats' in cob_data and 'mid_price' in cob_data['stats']: + mid_price = cob_data['stats']['mid_price'] + if mid_price > 0: + # Store price data for line chart + self.price_data.append({ + 'timestamp': cob_data.get('timestamp', datetime.now()), + 'price': mid_price + }) + + # Create a synthetic tick from COB data + synthetic_tick = MarketTick( + symbol=symbol, + timestamp=cob_data.get('timestamp', datetime.now()), + price=mid_price, + volume=cob_data.get('stats', {}).get('total_volume', 0), + quantity=0, # Not available in COB data + side='unknown', # COB data doesn't have side info + trade_id=f"cob_{self.cob_data_received}", + is_buyer_maker=False, + raw_data=cob_data + ) + self.ticks.append(synthetic_tick) + + if self.cob_data_received % 10 == 0: # Log every 10th update + logger.info(f"COB update #{self.cob_data_received}: {symbol} @ ${mid_price:.2f}") async def run_test(self): """Run the data collection and plotting test.""" logger.info(f"Starting COB stability test for {self.symbol} for {self.duration.total_seconds()} seconds...") - # Subscribe to ticks - self.subscriber_id = self.data_provider.subscribe_to_ticks(self._tick_callback, symbols=[self.symbol]) + # Initialize COB collection like clean_dashboard does + try: + logger.info("Starting COB collection in data provider...") + self.data_provider.start_cob_collection() + logger.info("Started COB collection in data provider") + + # Subscribe to COB updates + logger.info("Subscribing to COB data updates...") + self.data_provider.subscribe_to_cob(self._cob_data_callback) + logger.info("Subscribed to COB data updates from data provider") + except Exception as e: + logger.error(f"Failed to start COB collection or subscribe: {e}") + + # Subscribe to ticks as fallback + try: + self.subscriber_id = self.data_provider.subscribe_to_ticks(self._tick_callback, symbols=[self.symbol]) + logger.info("Subscribed to tick data as fallback") + except Exception as e: + logger.warning(f"Failed to subscribe to ticks: {e}") # Start the data provider's real-time streaming - await self.data_provider.start_real_time_streaming() + try: + await self.data_provider.start_real_time_streaming() + logger.info("Started real-time streaming") + except Exception as e: + logger.error(f"Failed to start real-time streaming: {e}") # Collect data for the specified duration self.start_time = datetime.now() @@ -57,57 +143,203 @@ class COBStabilityTester: logger.info(f"Finished collecting data. Total ticks: {len(self.ticks)}") # Plot the results - if self.ticks: - self.plot_spectrogram() + if self.price_data and self.cob_snapshots: + self.create_price_heatmap_chart() + elif self.ticks: + self._create_simple_price_chart() else: - logger.warning("No ticks were collected. Cannot generate plot.") + logger.warning("No data was collected. Cannot generate plot.") def plot_spectrogram(self): - """Create a spectrogram-like plot of trade intensity.""" + """Create a bookmap-style visualization showing order book depth over time.""" if not self.ticks: logger.warning("No ticks to plot.") return - df = pd.DataFrame([{ - 'timestamp': tick.timestamp, - 'price': tick.price, - 'volume': tick.volume, - 'side': 1 if tick.side == 'buy' else -1 - } for tick in self.ticks]) - - df['timestamp'] = pd.to_datetime(df['timestamp']) - df = df.set_index('timestamp') - - # Create the plot - fig, ax = plt.subplots(figsize=(15, 8)) - - # Define bins for the 2D histogram - time_bins = pd.date_range(df.index.min(), df.index.max(), periods=100) - price_bins = np.linspace(df['price'].min(), df['price'].max(), 100) - - # Create the 2D histogram - # x-axis: time, y-axis: price, weights: volume - h, xedges, yedges = np.histogram2d( - df.index.astype(np.int64) // 10**9, - df['price'], - bins=[time_bins.astype(np.int64) // 10**9, price_bins], - weights=df['volume'] - ) - - # Use a logarithmic color scale for better visibility of smaller trades - pcm = ax.pcolormesh(time_bins, price_bins, h.T, norm=LogNorm(vmin=1e-3, vmax=h.max()), cmap='inferno') - - fig.colorbar(pcm, ax=ax, label='Trade Volume (USDT)') - ax.set_title(f'Trade Intensity Spectrogram for {self.symbol}') - ax.set_xlabel('Time') - ax.set_ylabel('Price (USDT)') - + logger.info(f"Creating bookmap-style visualization with {len(self.ticks)} data points...") + + # Extract order book data from ticks + time_points = [] + bid_data = [] + ask_data = [] + price_levels = set() + + for tick in self.ticks: + if hasattr(tick, 'raw_data') and tick.raw_data: + cob_data = tick.raw_data + if 'bids' in cob_data and 'asks' in cob_data: + timestamp = tick.timestamp + + # Extract bid levels (green - buy orders) + bids = cob_data['bids'][:20] # Top 20 levels + for bid in bids: + if isinstance(bid, dict) and 'price' in bid and 'size' in bid: + bid_data.append({ + 'time': timestamp, + 'price': bid['price'], + 'size': bid['size'], + 'side': 'bid' + }) + price_levels.add(bid['price']) + + # Extract ask levels (red - sell orders) + asks = cob_data['asks'][:20] # Top 20 levels + for ask in asks: + if isinstance(ask, dict) and 'price' in ask and 'size' in ask: + ask_data.append({ + 'time': timestamp, + 'price': ask['price'], + 'size': ask['size'], + 'side': 'ask' + }) + price_levels.add(ask['price']) + + if not bid_data and not ask_data: + logger.warning("No order book data found in ticks. Cannot create bookmap visualization.") + # Fallback to simple price chart + self._create_simple_price_chart() + return + + logger.info(f"Extracted {len(bid_data)} bid levels and {len(ask_data)} ask levels") + + # Create the bookmap visualization + fig, ax = plt.subplots(figsize=(16, 10)) + + # Combine all data + all_data = bid_data + ask_data + if not all_data: + logger.warning("No order book data to plot") + return + + # Create DataFrames + df = pd.DataFrame(all_data) + df['time'] = pd.to_datetime(df['time']) + + # Create price bins (like in bookmap) + price_min = df['price'].min() + price_max = df['price'].max() + price_range = price_max - price_min + if price_range == 0: + logger.warning("No price variation in data") + return + + # Create time bins + time_min = df['time'].min() + time_max = df['time'].max() + + # Create 2D heatmaps for bids and asks separately + time_bins = pd.date_range(time_min, time_max, periods=100) + price_bins = np.linspace(price_min, price_max, 200) # Higher resolution for price + + # Separate bid and ask data + bid_df = df[df['side'] == 'bid'] + ask_df = df[df['side'] == 'ask'] + + # Create bid heatmap (green) + if not bid_df.empty: + bid_hist, _, _ = np.histogram2d( + bid_df['time'].astype(np.int64) // 10**9, + bid_df['price'], + bins=[time_bins.astype(np.int64) // 10**9, price_bins], + weights=bid_df['size'] + ) + # Plot bids in green (buying pressure) + bid_mask = bid_hist > 0 + pcm_bid = ax.pcolormesh( + time_bins, price_bins, bid_hist.T, + cmap='Greens', alpha=0.7, vmin=0, vmax=bid_hist.max() + ) + + # Create ask heatmap (red) + if not ask_df.empty: + ask_hist, _, _ = np.histogram2d( + ask_df['time'].astype(np.int64) // 10**9, + ask_df['price'], + bins=[time_bins.astype(np.int64) // 10**9, price_bins], + weights=ask_df['size'] + ) + # Plot asks in red (selling pressure) + ask_mask = ask_hist > 0 + pcm_ask = ax.pcolormesh( + time_bins, price_bins, ask_hist.T, + cmap='Reds', alpha=0.7, vmin=0, vmax=ask_hist.max() + ) + + # Add mid price line + mid_prices = [] + mid_times = [] + for tick in self.ticks: + if hasattr(tick, 'raw_data') and tick.raw_data and 'stats' in tick.raw_data: + stats = tick.raw_data['stats'] + if 'mid_price' in stats and stats['mid_price'] > 0: + mid_prices.append(stats['mid_price']) + mid_times.append(tick.timestamp) + + if mid_prices: + ax.plot(pd.to_datetime(mid_times), mid_prices, 'yellow', linewidth=2, alpha=0.8, label='Mid Price') + + # Styling like bookmap + ax.set_facecolor('black') + fig.patch.set_facecolor('black') + + ax.set_title(f'Order Book Depth Map - {self.symbol}\n(Green=Bids/Buy Orders, Red=Asks/Sell Orders)', + color='white', fontsize=14) + ax.set_xlabel('Time', color='white') + ax.set_ylabel('Price (USDT)', color='white') + + # White ticks and labels + ax.tick_params(colors='white') + ax.spines['bottom'].set_color('white') + ax.spines['top'].set_color('white') + ax.spines['right'].set_color('white') + ax.spines['left'].set_color('white') + + # Add colorbar for bid data + if not bid_df.empty: + cbar_bid = fig.colorbar(pcm_bid, ax=ax, location='right', pad=0.02, shrink=0.5) + cbar_bid.set_label('Bid Size (Order Volume)', color='white', labelpad=15) + cbar_bid.ax.yaxis.set_tick_params(color='white') + cbar_bid.ax.yaxis.set_tick_params(labelcolor='white') + # Format the x-axis to show time properly fig.autofmt_xdate() - - plot_filename = f"cob_stability_spectrogram_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" + + if mid_prices: + ax.legend(loc='upper left') + + plt.tight_layout() + + plot_filename = f"cob_bookmap_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" + plt.savefig(plot_filename, facecolor='black', dpi=150) + logger.info(f"Bookmap-style plot saved to {plot_filename}") + plt.show() + + def _create_simple_price_chart(self): + """Create a simple price chart as fallback""" + logger.info("Creating simple price chart as fallback...") + + prices = [] + times = [] + + for tick in self.ticks: + if tick.price > 0: + prices.append(tick.price) + times.append(tick.timestamp) + + if not prices: + logger.warning("No price data to plot") + return + + fig, ax = plt.subplots(figsize=(15, 8)) + ax.plot(pd.to_datetime(times), prices, 'cyan', linewidth=1) + ax.set_title(f'Price Chart - {self.symbol}') + ax.set_xlabel('Time') + ax.set_ylabel('Price (USDT)') + fig.autofmt_xdate() + + plot_filename = f"cob_price_chart_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" plt.savefig(plot_filename) - logger.info(f"Plot saved to {plot_filename}") + logger.info(f"Price chart saved to {plot_filename}") plt.show()