import asyncio import logging import time from collections import deque from datetime import datetime, timedelta import matplotlib.pyplot as plt import numpy as np import pandas as pd from matplotlib.colors import LogNorm from core.data_provider import DataProvider, MarketTick from core.config import get_config # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class COBStabilityTester: def __init__(self, symbol='ETHUSDT', duration_seconds=15): self.symbol = symbol self.duration = timedelta(seconds=duration_seconds) self.ticks = deque() # Set granularity (buckets) based on symbol if 'ETH' in symbol.upper(): self.price_granularity = 1.0 # 1 USD for ETH elif 'BTC' in symbol.upper(): self.price_granularity = 10.0 # 10 USD for BTC else: self.price_granularity = 1.0 # Default 1 USD logger.info(f"Using price granularity: ${self.price_granularity} for {symbol}") # Initialize DataProvider the same way as clean_dashboard logger.info("Initializing DataProvider like in clean_dashboard...") self.data_provider = DataProvider() # Use default constructor like clean_dashboard # Initialize COB data collection like clean_dashboard does self.cob_data_received = 0 self.latest_cob_data = {} # Store all COB snapshots for heatmap generation self.cob_snapshots = deque() self.price_data = [] # For price line chart self.start_time = None self.subscriber_id = None def _tick_callback(self, tick: MarketTick): """Callback function to receive ticks from the DataProvider.""" if self.start_time is None: self.start_time = datetime.now() logger.info(f"Started collecting ticks at {self.start_time}") # Store all ticks self.ticks.append(tick) def _cob_data_callback(self, symbol: str, cob_data: dict): """Callback function to receive COB data from the DataProvider.""" self.cob_data_received += 1 self.latest_cob_data[symbol] = cob_data # Store the complete COB snapshot for heatmap generation if 'bids' in cob_data and 'asks' in cob_data: snapshot = { 'timestamp': cob_data.get('timestamp', datetime.now()), 'bids': cob_data['bids'], 'asks': cob_data['asks'], 'stats': cob_data.get('stats', {}) } self.cob_snapshots.append(snapshot) # Convert COB data to tick-like format for analysis if 'stats' in cob_data and 'mid_price' in cob_data['stats']: mid_price = cob_data['stats']['mid_price'] if mid_price > 0: # Store price data for line chart self.price_data.append({ 'timestamp': cob_data.get('timestamp', datetime.now()), 'price': mid_price }) # Create a synthetic tick from COB data synthetic_tick = MarketTick( symbol=symbol, timestamp=cob_data.get('timestamp', datetime.now()), price=mid_price, volume=cob_data.get('stats', {}).get('total_volume', 0), quantity=0, # Not available in COB data side='unknown', # COB data doesn't have side info trade_id=f"cob_{self.cob_data_received}", is_buyer_maker=False, raw_data=cob_data ) self.ticks.append(synthetic_tick) if self.cob_data_received % 10 == 0: # Log every 10th update logger.info(f"COB update #{self.cob_data_received}: {symbol} @ ${mid_price:.2f}") async def run_test(self): """Run the data collection and plotting test.""" logger.info(f"Starting COB stability test for {self.symbol} for {self.duration.total_seconds()} seconds...") # Initialize COB collection like clean_dashboard does try: logger.info("Starting COB collection in data provider...") self.data_provider.start_cob_collection() logger.info("Started COB collection in data provider") # Subscribe to COB updates logger.info("Subscribing to COB data updates...") self.data_provider.subscribe_to_cob(self._cob_data_callback) logger.info("Subscribed to COB data updates from data provider") except Exception as e: logger.error(f"Failed to start COB collection or subscribe: {e}") # Subscribe to ticks as fallback try: self.subscriber_id = self.data_provider.subscribe_to_ticks(self._tick_callback, symbols=[self.symbol]) logger.info("Subscribed to tick data as fallback") except Exception as e: logger.warning(f"Failed to subscribe to ticks: {e}") # Start the data provider's real-time streaming try: await self.data_provider.start_real_time_streaming() logger.info("Started real-time streaming") except Exception as e: logger.error(f"Failed to start real-time streaming: {e}") # Collect data for the specified duration self.start_time = datetime.now() while datetime.now() - self.start_time < self.duration: await asyncio.sleep(1) logger.info(f"Collected {len(self.ticks)} ticks so far...") # Stop streaming and unsubscribe await self.data_provider.stop_real_time_streaming() self.data_provider.unsubscribe_from_ticks(self.subscriber_id) logger.info(f"Finished collecting data. Total ticks: {len(self.ticks)}") # Plot the results if self.price_data and self.cob_snapshots: self.create_price_heatmap_chart() elif self.ticks: self._create_simple_price_chart() else: logger.warning("No data was collected. Cannot generate plot.") def plot_spectrogram(self): """Create a bookmap-style visualization showing order book depth over time.""" if not self.ticks: logger.warning("No ticks to plot.") return logger.info(f"Creating bookmap-style visualization with {len(self.ticks)} data points...") # Extract order book data from ticks time_points = [] bid_data = [] ask_data = [] price_levels = set() for tick in self.ticks: if hasattr(tick, 'raw_data') and tick.raw_data: cob_data = tick.raw_data if 'bids' in cob_data and 'asks' in cob_data: timestamp = tick.timestamp # Extract bid levels (green - buy orders) bids = cob_data['bids'][:20] # Top 20 levels for bid in bids: if isinstance(bid, dict) and 'price' in bid and 'size' in bid: bid_data.append({ 'time': timestamp, 'price': bid['price'], 'size': bid['size'], 'side': 'bid' }) price_levels.add(bid['price']) # Extract ask levels (red - sell orders) asks = cob_data['asks'][:20] # Top 20 levels for ask in asks: if isinstance(ask, dict) and 'price' in ask and 'size' in ask: ask_data.append({ 'time': timestamp, 'price': ask['price'], 'size': ask['size'], 'side': 'ask' }) price_levels.add(ask['price']) if not bid_data and not ask_data: logger.warning("No order book data found in ticks. Cannot create bookmap visualization.") # Fallback to simple price chart self._create_simple_price_chart() return logger.info(f"Extracted {len(bid_data)} bid levels and {len(ask_data)} ask levels") # Create the bookmap visualization fig, ax = plt.subplots(figsize=(16, 10)) # Combine all data all_data = bid_data + ask_data if not all_data: logger.warning("No order book data to plot") return # Create DataFrames df = pd.DataFrame(all_data) df['time'] = pd.to_datetime(df['time']) # Create price bins (like in bookmap) price_min = df['price'].min() price_max = df['price'].max() price_range = price_max - price_min if price_range == 0: logger.warning("No price variation in data") return # Create time bins time_min = df['time'].min() time_max = df['time'].max() # Create 2D heatmaps for bids and asks separately time_bins = pd.date_range(time_min, time_max, periods=100) price_bins = np.linspace(price_min, price_max, 200) # Higher resolution for price # Separate bid and ask data bid_df = df[df['side'] == 'bid'] ask_df = df[df['side'] == 'ask'] # Create bid heatmap (green) if not bid_df.empty: bid_hist, _, _ = np.histogram2d( bid_df['time'].astype(np.int64) // 10**9, bid_df['price'], bins=[time_bins.astype(np.int64) // 10**9, price_bins], weights=bid_df['size'] ) # Plot bids in green (buying pressure) bid_mask = bid_hist > 0 pcm_bid = ax.pcolormesh( time_bins, price_bins, bid_hist.T, cmap='Greens', alpha=0.7, vmin=0, vmax=bid_hist.max() ) # Create ask heatmap (red) if not ask_df.empty: ask_hist, _, _ = np.histogram2d( ask_df['time'].astype(np.int64) // 10**9, ask_df['price'], bins=[time_bins.astype(np.int64) // 10**9, price_bins], weights=ask_df['size'] ) # Plot asks in red (selling pressure) ask_mask = ask_hist > 0 pcm_ask = ax.pcolormesh( time_bins, price_bins, ask_hist.T, cmap='Reds', alpha=0.7, vmin=0, vmax=ask_hist.max() ) # Add mid price line mid_prices = [] mid_times = [] for tick in self.ticks: if hasattr(tick, 'raw_data') and tick.raw_data and 'stats' in tick.raw_data: stats = tick.raw_data['stats'] if 'mid_price' in stats and stats['mid_price'] > 0: mid_prices.append(stats['mid_price']) mid_times.append(tick.timestamp) if mid_prices: ax.plot(pd.to_datetime(mid_times), mid_prices, 'yellow', linewidth=2, alpha=0.8, label='Mid Price') # Styling like bookmap ax.set_facecolor('black') fig.patch.set_facecolor('black') ax.set_title(f'Order Book Depth Map - {self.symbol}\n(Green=Bids/Buy Orders, Red=Asks/Sell Orders)', color='white', fontsize=14) ax.set_xlabel('Time', color='white') ax.set_ylabel('Price (USDT)', color='white') # White ticks and labels ax.tick_params(colors='white') ax.spines['bottom'].set_color('white') ax.spines['top'].set_color('white') ax.spines['right'].set_color('white') ax.spines['left'].set_color('white') # Add colorbar for bid data if not bid_df.empty: cbar_bid = fig.colorbar(pcm_bid, ax=ax, location='right', pad=0.02, shrink=0.5) cbar_bid.set_label('Bid Size (Order Volume)', color='white', labelpad=15) cbar_bid.ax.yaxis.set_tick_params(color='white') cbar_bid.ax.yaxis.set_tick_params(labelcolor='white') # Format the x-axis to show time properly fig.autofmt_xdate() if mid_prices: ax.legend(loc='upper left') plt.tight_layout() plot_filename = f"cob_bookmap_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" plt.savefig(plot_filename, facecolor='black', dpi=150) logger.info(f"Bookmap-style plot saved to {plot_filename}") plt.show() def _create_simple_price_chart(self): """Create a simple price chart as fallback""" logger.info("Creating simple price chart as fallback...") prices = [] times = [] for tick in self.ticks: if tick.price > 0: prices.append(tick.price) times.append(tick.timestamp) if not prices: logger.warning("No price data to plot") return fig, ax = plt.subplots(figsize=(15, 8)) ax.plot(pd.to_datetime(times), prices, 'cyan', linewidth=1) ax.set_title(f'Price Chart - {self.symbol}') ax.set_xlabel('Time') ax.set_ylabel('Price (USDT)') fig.autofmt_xdate() plot_filename = f"cob_price_chart_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" plt.savefig(plot_filename) logger.info(f"Price chart saved to {plot_filename}") plt.show() async def main(): tester = COBStabilityTester() await tester.run_test() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Test interrupted by user.")