From 8677c4c01cc6155b2a3be102cc18a7188430ca6f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 23 Jul 2025 23:10:54 +0300 Subject: [PATCH] cob wip --- tests/cob/test_cob_data_stability.py | 218 ++++++++++++++++++++++----- 1 file changed, 177 insertions(+), 41 deletions(-) diff --git a/tests/cob/test_cob_data_stability.py b/tests/cob/test_cob_data_stability.py index a325e3e..6584c65 100644 --- a/tests/cob/test_cob_data_stability.py +++ b/tests/cob/test_cob_data_stability.py @@ -47,6 +47,7 @@ class COBStabilityTester: self.start_time = None self.subscriber_id = None + self.last_log_time = None def _tick_callback(self, tick: MarketTick): """Callback function to receive ticks from the DataProvider.""" @@ -76,6 +77,14 @@ class COBStabilityTester: # Store the complete COB snapshot for heatmap generation if 'bids' in cob_data and 'asks' in cob_data: + # Debug: Log structure of first few COB snapshots + if len(self.cob_snapshots) < 3: + logger.info(f"DEBUG: COB data structure - bids: {len(cob_data['bids'])} items, asks: {len(cob_data['asks'])} items") + if cob_data['bids']: + logger.info(f"DEBUG: First bid: {cob_data['bids'][0]}") + if cob_data['asks']: + logger.info(f"DEBUG: First ask: {cob_data['asks'][0]}") + snapshot = { 'timestamp': cob_data.get('timestamp', datetime.now()), 'bids': cob_data['bids'], @@ -84,6 +93,12 @@ class COBStabilityTester: } self.cob_snapshots.append(snapshot) + # Log bucketed COB data every second + now = datetime.now() + if self.last_log_time is None or (now - self.last_log_time).total_seconds() >= 1.0: + self.last_log_time = now + self._log_bucketed_cob_data(cob_data) + # Convert COB data to tick-like format for analysis if 'stats' in cob_data and 'mid_price' in cob_data['stats']: mid_price = cob_data['stats']['mid_price'] @@ -110,6 +125,68 @@ class COBStabilityTester: if self.cob_data_received % 10 == 0: # Log every 10th update logger.info(f"COB update #{self.cob_data_received}: {symbol} @ ${mid_price:.2f}") + + def _log_bucketed_cob_data(self, cob_data: dict): + """Log bucketed COB data every second""" + try: + if 'bids' not in cob_data or 'asks' not in cob_data: + logger.info("COB-1s: No order book data available") + return + + if 'stats' not in cob_data or 'mid_price' not in cob_data['stats']: + logger.info("COB-1s: No mid price available") + return + + mid_price = cob_data['stats']['mid_price'] + if mid_price <= 0: + return + + # Bucket the order book data + bid_buckets = {} + ask_buckets = {} + + # Process bids (top 10) + for bid in cob_data['bids'][:10]: + try: + if isinstance(bid, dict): + price = float(bid['price']) + size = float(bid['size']) + elif isinstance(bid, (list, tuple)) and len(bid) >= 2: + price = float(bid[0]) + size = float(bid[1]) + else: + continue + + bucketed_price = round(price / self.price_granularity) * self.price_granularity + bid_buckets[bucketed_price] = bid_buckets.get(bucketed_price, 0) + size + except (ValueError, TypeError, IndexError): + continue + + # Process asks (top 10) + for ask in cob_data['asks'][:10]: + try: + if isinstance(ask, dict): + price = float(ask['price']) + size = float(ask['size']) + elif isinstance(ask, (list, tuple)) and len(ask) >= 2: + price = float(ask[0]) + size = float(ask[1]) + else: + continue + + bucketed_price = round(price / self.price_granularity) * self.price_granularity + ask_buckets[bucketed_price] = ask_buckets.get(bucketed_price, 0) + size + except (ValueError, TypeError, IndexError): + continue + + # Format for log output + bid_str = ", ".join([f"${p:.0f}:{s:.3f}" for p, s in sorted(bid_buckets.items(), reverse=True)]) + ask_str = ", ".join([f"${p:.0f}:{s:.3f}" for p, s in sorted(ask_buckets.items())]) + + logger.info(f"COB-1s @ ${mid_price:.2f} | BIDS: {bid_str} | ASKS: {ask_str}") + + except Exception as e: + logger.warning(f"Error logging bucketed COB data: {e}") async def run_test(self): """Run the data collection and plotting test.""" @@ -169,66 +246,125 @@ class COBStabilityTester: return logger.info(f"Creating price and order book heatmap chart...") + logger.info(f"Data summary: {len(self.price_data)} price points, {len(self.cob_snapshots)} COB snapshots") - # Prepare data + # Prepare price data with consistent timestamp handling price_df = pd.DataFrame(self.price_data) price_df['timestamp'] = pd.to_datetime(price_df['timestamp']) + + logger.info(f"Price data time range: {price_df['timestamp'].min()} to {price_df['timestamp'].max()}") + logger.info(f"Price range: ${price_df['price'].min():.2f} to ${price_df['price'].max():.2f}") - # Extract order book data for heatmap + # Extract order book data for heatmap with consistent timestamp handling heatmap_data = [] for snapshot in self.cob_snapshots: - timestamp = snapshot['timestamp'] + timestamp = pd.to_datetime(snapshot['timestamp']) # Ensure datetime + for side in ['bids', 'asks']: - for order in snapshot[side]: - # Handle both dict and list formats - if isinstance(order, dict): - price = order['price'] - size = order['size'] - elif isinstance(order, (list, tuple)) and len(order) >= 2: - price = float(order[0]) - size = float(order[1]) - else: - logger.warning(f"Unknown order format: {order}") - continue + if side not in snapshot or not snapshot[side]: + continue - bucketed_price = round(price / self.price_granularity) * self.price_granularity - heatmap_data.append({ - 'time': timestamp, - 'price': bucketed_price, - 'size': size, - 'side': side - }) + # Take top 50 levels for better visualization + orders = snapshot[side][:50] + for order in orders: + try: + # Handle both dict and list formats + if isinstance(order, dict): + price = float(order['price']) + size = float(order['size']) + elif isinstance(order, (list, tuple)) and len(order) >= 2: + price = float(order[0]) + size = float(order[1]) + else: + continue + + # Apply granularity bucketing + bucketed_price = round(price / self.price_granularity) * self.price_granularity + + heatmap_data.append({ + 'time': timestamp, + 'price': bucketed_price, + 'size': size, + 'side': side + }) + except (ValueError, TypeError, IndexError) as e: + continue + if not heatmap_data: + logger.warning("No valid heatmap data found, creating price chart only") + self._create_simple_price_chart() + return + heatmap_df = pd.DataFrame(heatmap_data) + logger.info(f"Heatmap data: {len(heatmap_df)} order book entries") + logger.info(f"Heatmap time range: {heatmap_df['time'].min()} to {heatmap_df['time'].max()}") - # Create plot - fig, ax1 = plt.subplots(figsize=(16, 8)) + # Create plot with better time handling + fig, ax = plt.subplots(figsize=(16, 10)) - # Plot price line - ax1.plot(price_df['timestamp'], price_df['price'], 'cyan', linewidth=1, label='Price') + # Determine overall time range + all_times = pd.concat([price_df['timestamp'], heatmap_df['time']]) + time_min = all_times.min() + time_max = all_times.max() + + # Create price range for heatmap + price_min = min(price_df['price'].min(), heatmap_df['price'].min()) - self.price_granularity * 2 + price_max = max(price_df['price'].max(), heatmap_df['price'].max()) + self.price_granularity * 2 + + logger.info(f"Chart time range: {time_min} to {time_max}") + logger.info(f"Chart price range: ${price_min:.2f} to ${price_max:.2f}") - # Prepare heatmap - for side, cmap in zip(['bids', 'asks'], ['Greens', 'Reds']): + # Create heatmap first (background) + for side, cmap, alpha in zip(['bids', 'asks'], ['Greens', 'Reds'], [0.6, 0.6]): side_df = heatmap_df[heatmap_df['side'] == side] if not side_df.empty: - hist, xedges, yedges = np.histogram2d( - side_df['time'].astype(np.int64) // 10**9, - side_df['price'], - bins=[np.unique(side_df['time'].astype(np.int64) // 10**9), np.arange(price_df['price'].min(), price_df['price'].max(), self.price_granularity)], - weights=side_df['size'] - ) - ax1.pcolormesh(pd.to_datetime(xedges, unit='s'), yedges, hist.T, cmap=cmap, alpha=0.5) + # Create more granular bins + time_bins = pd.date_range(time_min, time_max, periods=min(100, len(side_df) // 10 + 10)) + price_bins = np.arange(price_min, price_max + self.price_granularity, self.price_granularity) + + try: + # Convert to seconds for histogram + time_seconds = (side_df['time'] - time_min).dt.total_seconds() + time_range_seconds = (time_max - time_min).total_seconds() + + if time_range_seconds > 0: + hist, xedges, yedges = np.histogram2d( + time_seconds, + side_df['price'], + bins=[np.linspace(0, time_range_seconds, len(time_bins)), price_bins], + weights=side_df['size'] + ) + + # Convert back to datetime for plotting + time_edges = pd.to_datetime(xedges, unit='s', origin=time_min) + + if hist.max() > 0: # Only plot if we have data + pcm = ax.pcolormesh(time_edges, yedges, hist.T, + cmap=cmap, alpha=alpha, shading='auto') + logger.info(f"Plotted {side} heatmap: max value = {hist.max():.2f}") + except Exception as e: + logger.warning(f"Error creating {side} heatmap: {e}") - # Enhance plot - ax1.set_title(f'Price Chart with Order Book Heatmap - {self.symbol}') - ax1.set_xlabel('Time') - ax1.set_ylabel('Price (USDT)') - ax1.legend(loc='upper left') - ax1.grid(True, alpha=0.3) + # Plot price line on top + ax.plot(price_df['timestamp'], price_df['price'], 'yellow', linewidth=2, + label='Mid Price', alpha=0.9, zorder=10) + + # Enhance plot appearance + ax.set_title(f'Price Chart with Order Book Heatmap - {self.symbol}\n' + f'Granularity: ${self.price_granularity} | Duration: {self.duration.total_seconds()}s\n' + f'Green=Bids, Red=Asks (darker = more volume)', fontsize=14) + ax.set_xlabel('Time') + ax.set_ylabel('Price (USDT)') + ax.legend(loc='upper left') + ax.grid(True, alpha=0.3) + + # Format time axis + ax.set_xlim(time_min, time_max) + fig.autofmt_xdate() plt.tight_layout() plot_filename = f"price_heatmap_chart_{self.symbol.replace('/', '_')}_{datetime.now():%Y%m%d_%H%M%S}.png" - plt.savefig(plot_filename, dpi=150) + plt.savefig(plot_filename, dpi=150, bbox_inches='tight') logger.info(f"Price and heatmap chart saved to {plot_filename}") plt.show()