""" Bookmap Order Book Data Provider This module integrates with Bookmap to gather: - Current Order Book (COB) data - Session Volume Profile (SVP) data - Order book sweeps and momentum trades detection - Real-time order size heatmap matrix (last 10 minutes) - Level 2 market depth analysis The data is processed and fed to CNN and DQN networks for enhanced trading decisions. """ import asyncio import json import logging import time import websockets import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass from threading import Thread, Lock import requests logger = logging.getLogger(__name__) @dataclass class OrderBookLevel: """Represents a single order book level""" price: float size: float orders: int side: str # 'bid' or 'ask' timestamp: datetime @dataclass class OrderBookSnapshot: """Complete order book snapshot""" symbol: str timestamp: datetime bids: List[OrderBookLevel] asks: List[OrderBookLevel] spread: float mid_price: float @dataclass class VolumeProfileLevel: """Volume profile level data""" price: float volume: float buy_volume: float sell_volume: float trades_count: int vwap: float @dataclass class OrderFlowSignal: """Order flow signal detection""" timestamp: datetime signal_type: str # 'sweep', 'absorption', 'iceberg', 'momentum' price: float volume: float confidence: float description: str class BookmapDataProvider: """ Real-time order book data provider using Bookmap-style analysis Features: - Level 2 order book monitoring - Order flow detection (sweeps, absorptions) - Volume profile analysis - Order size heatmap generation - Market microstructure analysis """ def __init__(self, symbols: List[str] = None, depth_levels: int = 20): """ Initialize Bookmap data provider Args: symbols: List of symbols to monitor depth_levels: Number of order book levels to track """ self.symbols = symbols or ['ETHUSDT', 'BTCUSDT'] self.depth_levels = depth_levels self.is_streaming = False # Order book data storage self.order_books: Dict[str, OrderBookSnapshot] = {} self.order_book_history: Dict[str, deque] = {} self.volume_profiles: Dict[str, List[VolumeProfileLevel]] = {} # Heatmap data (10-minute rolling window) self.heatmap_window = timedelta(minutes=10) self.order_heatmaps: Dict[str, deque] = {} self.price_levels: Dict[str, List[float]] = {} # Order flow detection self.flow_signals: Dict[str, deque] = {} self.sweep_threshold = 0.8 # Minimum confidence for sweep detection self.absorption_threshold = 0.7 # Minimum confidence for absorption # Market microstructure metrics self.bid_ask_spreads: Dict[str, deque] = {} self.order_book_imbalances: Dict[str, deque] = {} self.liquidity_metrics: Dict[str, Dict] = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} self.data_lock = Lock() # Callbacks for CNN/DQN integration self.cnn_callbacks: List[Callable] = [] self.dqn_callbacks: List[Callable] = [] # Performance tracking self.update_counts = defaultdict(int) self.last_update_times = {} # Initialize data structures for symbol in self.symbols: self.order_book_history[symbol] = deque(maxlen=1000) self.order_heatmaps[symbol] = deque(maxlen=600) # 10 min at 1s intervals self.flow_signals[symbol] = deque(maxlen=500) self.bid_ask_spreads[symbol] = deque(maxlen=1000) self.order_book_imbalances[symbol] = deque(maxlen=1000) self.liquidity_metrics[symbol] = { 'total_bid_size': 0.0, 'total_ask_size': 0.0, 'weighted_mid': 0.0, 'liquidity_ratio': 1.0 } logger.info(f"BookmapDataProvider initialized for {len(self.symbols)} symbols") logger.info(f"Tracking {depth_levels} order book levels per side") def add_cnn_callback(self, callback: Callable[[str, Dict], None]): """Add callback for CNN model updates""" self.cnn_callbacks.append(callback) logger.info(f"Added CNN callback: {len(self.cnn_callbacks)} total") def add_dqn_callback(self, callback: Callable[[str, Dict], None]): """Add callback for DQN model updates""" self.dqn_callbacks.append(callback) logger.info(f"Added DQN callback: {len(self.dqn_callbacks)} total") async def start_streaming(self): """Start real-time order book streaming""" if self.is_streaming: logger.warning("Bookmap streaming already active") return self.is_streaming = True logger.info("Starting Bookmap order book streaming") # Start order book streams for each symbol for symbol in self.symbols: # Order book depth stream depth_task = asyncio.create_task(self._stream_order_book_depth(symbol)) self.websocket_tasks[f"{symbol}_depth"] = depth_task # Trade stream for order flow analysis trade_task = asyncio.create_task(self._stream_trades(symbol)) self.websocket_tasks[f"{symbol}_trades"] = trade_task # Start analysis threads analysis_task = asyncio.create_task(self._continuous_analysis()) self.websocket_tasks["analysis"] = analysis_task logger.info(f"Started streaming for {len(self.symbols)} symbols") async def stop_streaming(self): """Stop order book streaming""" if not self.is_streaming: return logger.info("Stopping Bookmap streaming") self.is_streaming = False # Cancel all tasks for name, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() logger.info("Bookmap streaming stopped") async def _stream_order_book_depth(self, symbol: str): """Stream order book depth data""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@depth20@100ms" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Order book depth WebSocket connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_depth_update(symbol, data) except Exception as e: logger.warning(f"Error processing depth for {symbol}: {e}") except Exception as e: logger.error(f"Depth WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _stream_trades(self, symbol: str): """Stream trade data for order flow analysis""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@trade" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Trade WebSocket connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_trade_update(symbol, data) except Exception as e: logger.warning(f"Error processing trade for {symbol}: {e}") except Exception as e: logger.error(f"Trade WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _process_depth_update(self, symbol: str, data: Dict): """Process order book depth update""" try: timestamp = datetime.now() # Parse bids and asks bids = [] asks = [] for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) bids.append(OrderBookLevel( price=price, size=size, orders=1, # Binance doesn't provide order count side='bid', timestamp=timestamp )) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) asks.append(OrderBookLevel( price=price, size=size, orders=1, side='ask', timestamp=timestamp )) # Sort order book levels bids.sort(key=lambda x: x.price, reverse=True) asks.sort(key=lambda x: x.price) # Calculate spread and mid price if bids and asks: best_bid = bids[0].price best_ask = asks[0].price spread = best_ask - best_bid mid_price = (best_bid + best_ask) / 2 else: spread = 0.0 mid_price = 0.0 # Create order book snapshot snapshot = OrderBookSnapshot( symbol=symbol, timestamp=timestamp, bids=bids, asks=asks, spread=spread, mid_price=mid_price ) with self.data_lock: self.order_books[symbol] = snapshot self.order_book_history[symbol].append(snapshot) # Update liquidity metrics self._update_liquidity_metrics(symbol, snapshot) # Update order book imbalance self._calculate_order_book_imbalance(symbol, snapshot) # Update heatmap data self._update_order_heatmap(symbol, snapshot) # Update counters self.update_counts[f"{symbol}_depth"] += 1 self.last_update_times[f"{symbol}_depth"] = timestamp except Exception as e: logger.error(f"Error processing depth update for {symbol}: {e}") async def _process_trade_update(self, symbol: str, data: Dict): """Process trade data for order flow analysis""" try: timestamp = datetime.fromtimestamp(int(data['T']) / 1000) price = float(data['p']) quantity = float(data['q']) is_buyer_maker = data['m'] # Analyze for order flow signals await self._analyze_order_flow(symbol, timestamp, price, quantity, is_buyer_maker) # Update volume profile self._update_volume_profile(symbol, price, quantity, is_buyer_maker) self.update_counts[f"{symbol}_trades"] += 1 except Exception as e: logger.error(f"Error processing trade for {symbol}: {e}") def _update_liquidity_metrics(self, symbol: str, snapshot: OrderBookSnapshot): """Update liquidity metrics from order book snapshot""" try: total_bid_size = sum(level.size for level in snapshot.bids) total_ask_size = sum(level.size for level in snapshot.asks) # Calculate weighted mid price if snapshot.bids and snapshot.asks: bid_weight = total_bid_size / (total_bid_size + total_ask_size) ask_weight = total_ask_size / (total_bid_size + total_ask_size) weighted_mid = (snapshot.bids[0].price * ask_weight + snapshot.asks[0].price * bid_weight) else: weighted_mid = snapshot.mid_price # Liquidity ratio (bid/ask balance) if total_ask_size > 0: liquidity_ratio = total_bid_size / total_ask_size else: liquidity_ratio = 1.0 self.liquidity_metrics[symbol] = { 'total_bid_size': total_bid_size, 'total_ask_size': total_ask_size, 'weighted_mid': weighted_mid, 'liquidity_ratio': liquidity_ratio, 'spread_bps': (snapshot.spread / snapshot.mid_price) * 10000 if snapshot.mid_price > 0 else 0 } except Exception as e: logger.error(f"Error updating liquidity metrics for {symbol}: {e}") def _calculate_order_book_imbalance(self, symbol: str, snapshot: OrderBookSnapshot): """Calculate order book imbalance ratio""" try: if not snapshot.bids or not snapshot.asks: return # Calculate imbalance for top N levels n_levels = min(5, len(snapshot.bids), len(snapshot.asks)) total_bid_size = sum(snapshot.bids[i].size for i in range(n_levels)) total_ask_size = sum(snapshot.asks[i].size for i in range(n_levels)) if total_bid_size + total_ask_size > 0: imbalance = (total_bid_size - total_ask_size) / (total_bid_size + total_ask_size) else: imbalance = 0.0 self.order_book_imbalances[symbol].append({ 'timestamp': snapshot.timestamp, 'imbalance': imbalance, 'bid_size': total_bid_size, 'ask_size': total_ask_size }) except Exception as e: logger.error(f"Error calculating imbalance for {symbol}: {e}") def _update_order_heatmap(self, symbol: str, snapshot: OrderBookSnapshot): """Update order size heatmap matrix""" try: # Create heatmap entry heatmap_entry = { 'timestamp': snapshot.timestamp, 'mid_price': snapshot.mid_price, 'levels': {} } # Add bid levels for level in snapshot.bids: price_offset = level.price - snapshot.mid_price heatmap_entry['levels'][price_offset] = { 'side': 'bid', 'size': level.size, 'price': level.price } # Add ask levels for level in snapshot.asks: price_offset = level.price - snapshot.mid_price heatmap_entry['levels'][price_offset] = { 'side': 'ask', 'size': level.size, 'price': level.price } self.order_heatmaps[symbol].append(heatmap_entry) # Clean old entries (keep 10 minutes) cutoff_time = snapshot.timestamp - self.heatmap_window while (self.order_heatmaps[symbol] and self.order_heatmaps[symbol][0]['timestamp'] < cutoff_time): self.order_heatmaps[symbol].popleft() except Exception as e: logger.error(f"Error updating heatmap for {symbol}: {e}") def _update_volume_profile(self, symbol: str, price: float, quantity: float, is_buyer_maker: bool): """Update volume profile with new trade""" try: # Initialize if not exists if symbol not in self.volume_profiles: self.volume_profiles[symbol] = [] # Find or create price level price_level = None for level in self.volume_profiles[symbol]: if abs(level.price - price) < 0.01: # Price tolerance price_level = level break if not price_level: price_level = VolumeProfileLevel( price=price, volume=0.0, buy_volume=0.0, sell_volume=0.0, trades_count=0, vwap=price ) self.volume_profiles[symbol].append(price_level) # Update volume profile volume = price * quantity old_total = price_level.volume price_level.volume += volume price_level.trades_count += 1 if is_buyer_maker: price_level.sell_volume += volume else: price_level.buy_volume += volume # Update VWAP if price_level.volume > 0: price_level.vwap = ((price_level.vwap * old_total) + (price * volume)) / price_level.volume except Exception as e: logger.error(f"Error updating volume profile for {symbol}: {e}") async def _analyze_order_flow(self, symbol: str, timestamp: datetime, price: float, quantity: float, is_buyer_maker: bool): """Analyze order flow for sweep and absorption patterns""" try: # Get recent order book data if symbol not in self.order_book_history or not self.order_book_history[symbol]: return recent_snapshots = list(self.order_book_history[symbol])[-10:] # Last 10 snapshots # Check for order book sweeps sweep_signal = self._detect_order_sweep(symbol, recent_snapshots, price, quantity, is_buyer_maker) if sweep_signal: self.flow_signals[symbol].append(sweep_signal) await self._notify_flow_signal(symbol, sweep_signal) # Check for absorption patterns absorption_signal = self._detect_absorption(symbol, recent_snapshots, price, quantity) if absorption_signal: self.flow_signals[symbol].append(absorption_signal) await self._notify_flow_signal(symbol, absorption_signal) # Check for momentum trades momentum_signal = self._detect_momentum_trade(symbol, price, quantity, is_buyer_maker) if momentum_signal: self.flow_signals[symbol].append(momentum_signal) await self._notify_flow_signal(symbol, momentum_signal) except Exception as e: logger.error(f"Error analyzing order flow for {symbol}: {e}") def _detect_order_sweep(self, symbol: str, snapshots: List[OrderBookSnapshot], price: float, quantity: float, is_buyer_maker: bool) -> Optional[OrderFlowSignal]: """Detect order book sweep patterns""" try: if len(snapshots) < 2: return None before_snapshot = snapshots[-2] after_snapshot = snapshots[-1] # Check if multiple levels were consumed if is_buyer_maker: # Sell order, check ask side levels_consumed = 0 total_consumed_size = 0 for level in before_snapshot.asks[:5]: # Check top 5 levels if level.price <= price: levels_consumed += 1 total_consumed_size += level.size if levels_consumed >= 2 and total_consumed_size > quantity * 1.5: confidence = min(0.9, levels_consumed / 5.0 + 0.3) return OrderFlowSignal( timestamp=datetime.now(), signal_type='sweep', price=price, volume=quantity * price, confidence=confidence, description=f"Sell sweep: {levels_consumed} levels, {total_consumed_size:.2f} size" ) else: # Buy order, check bid side levels_consumed = 0 total_consumed_size = 0 for level in before_snapshot.bids[:5]: if level.price >= price: levels_consumed += 1 total_consumed_size += level.size if levels_consumed >= 2 and total_consumed_size > quantity * 1.5: confidence = min(0.9, levels_consumed / 5.0 + 0.3) return OrderFlowSignal( timestamp=datetime.now(), signal_type='sweep', price=price, volume=quantity * price, confidence=confidence, description=f"Buy sweep: {levels_consumed} levels, {total_consumed_size:.2f} size" ) return None except Exception as e: logger.error(f"Error detecting sweep for {symbol}: {e}") return None def _detect_absorption(self, symbol: str, snapshots: List[OrderBookSnapshot], price: float, quantity: float) -> Optional[OrderFlowSignal]: """Detect absorption patterns where large orders are absorbed without price movement""" try: if len(snapshots) < 3: return None # Check if large order was absorbed with minimal price impact volume_threshold = 10000 # $10K minimum for absorption price_impact_threshold = 0.001 # 0.1% max price impact trade_value = price * quantity if trade_value < volume_threshold: return None # Calculate price impact price_before = snapshots[-3].mid_price price_after = snapshots[-1].mid_price price_impact = abs(price_after - price_before) / price_before if price_impact < price_impact_threshold: confidence = min(0.8, (trade_value / 50000) * 0.5 + 0.3) # Scale with size return OrderFlowSignal( timestamp=datetime.now(), signal_type='absorption', price=price, volume=trade_value, confidence=confidence, description=f"Absorption: ${trade_value:.0f} with {price_impact*100:.3f}% impact" ) return None except Exception as e: logger.error(f"Error detecting absorption for {symbol}: {e}") return None def _detect_momentum_trade(self, symbol: str, price: float, quantity: float, is_buyer_maker: bool) -> Optional[OrderFlowSignal]: """Detect momentum trades based on size and direction""" try: trade_value = price * quantity momentum_threshold = 25000 # $25K minimum for momentum classification if trade_value < momentum_threshold: return None # Calculate confidence based on trade size confidence = min(0.9, trade_value / 100000 * 0.6 + 0.3) direction = "sell" if is_buyer_maker else "buy" return OrderFlowSignal( timestamp=datetime.now(), signal_type='momentum', price=price, volume=trade_value, confidence=confidence, description=f"Large {direction}: ${trade_value:.0f}" ) except Exception as e: logger.error(f"Error detecting momentum for {symbol}: {e}") return None async def _notify_flow_signal(self, symbol: str, signal: OrderFlowSignal): """Notify CNN and DQN models of order flow signals""" try: signal_data = { 'signal_type': signal.signal_type, 'price': signal.price, 'volume': signal.volume, 'confidence': signal.confidence, 'timestamp': signal.timestamp, 'description': signal.description } # Notify CNN callbacks for callback in self.cnn_callbacks: try: callback(symbol, signal_data) except Exception as e: logger.warning(f"Error in CNN callback: {e}") # Notify DQN callbacks for callback in self.dqn_callbacks: try: callback(symbol, signal_data) except Exception as e: logger.warning(f"Error in DQN callback: {e}") except Exception as e: logger.error(f"Error notifying flow signal: {e}") async def _continuous_analysis(self): """Continuous analysis of market microstructure""" while self.is_streaming: try: await asyncio.sleep(1) # Analyze every second for symbol in self.symbols: # Generate CNN features cnn_features = self.get_cnn_features(symbol) if cnn_features is not None: for callback in self.cnn_callbacks: try: callback(symbol, {'features': cnn_features, 'type': 'orderbook'}) except Exception as e: logger.warning(f"Error in CNN feature callback: {e}") # Generate DQN state features dqn_features = self.get_dqn_state_features(symbol) if dqn_features is not None: for callback in self.dqn_callbacks: try: callback(symbol, {'state': dqn_features, 'type': 'orderbook'}) except Exception as e: logger.warning(f"Error in DQN state callback: {e}") except Exception as e: logger.error(f"Error in continuous analysis: {e}") await asyncio.sleep(5) def get_cnn_features(self, symbol: str) -> Optional[np.ndarray]: """Generate CNN input features from order book data""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] features = [] # Order book features (40 features: 20 levels x 2 sides) for i in range(min(20, len(snapshot.bids))): bid = snapshot.bids[i] features.append(bid.size) features.append(bid.price - snapshot.mid_price) # Price offset # Pad if not enough bid levels while len(features) < 40: features.extend([0.0, 0.0]) for i in range(min(20, len(snapshot.asks))): ask = snapshot.asks[i] features.append(ask.size) features.append(ask.price - snapshot.mid_price) # Price offset # Pad if not enough ask levels while len(features) < 80: features.extend([0.0, 0.0]) # Liquidity metrics (10 features) metrics = self.liquidity_metrics.get(symbol, {}) features.extend([ metrics.get('total_bid_size', 0.0), metrics.get('total_ask_size', 0.0), metrics.get('liquidity_ratio', 1.0), metrics.get('spread_bps', 0.0), snapshot.spread, metrics.get('weighted_mid', snapshot.mid_price) - snapshot.mid_price, len(snapshot.bids), len(snapshot.asks), snapshot.mid_price, time.time() % 86400 # Time of day ]) # Order book imbalance features (5 features) if self.order_book_imbalances[symbol]: latest_imbalance = self.order_book_imbalances[symbol][-1] features.extend([ latest_imbalance['imbalance'], latest_imbalance['bid_size'], latest_imbalance['ask_size'], latest_imbalance['bid_size'] + latest_imbalance['ask_size'], abs(latest_imbalance['imbalance']) ]) else: features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) # Flow signal features (5 features) recent_signals = [s for s in self.flow_signals[symbol] if (datetime.now() - s.timestamp).seconds < 60] sweep_count = sum(1 for s in recent_signals if s.signal_type == 'sweep') absorption_count = sum(1 for s in recent_signals if s.signal_type == 'absorption') momentum_count = sum(1 for s in recent_signals if s.signal_type == 'momentum') max_confidence = max([s.confidence for s in recent_signals], default=0.0) total_flow_volume = sum(s.volume for s in recent_signals) features.extend([ sweep_count, absorption_count, momentum_count, max_confidence, total_flow_volume ]) return np.array(features, dtype=np.float32) except Exception as e: logger.error(f"Error generating CNN features for {symbol}: {e}") return None def get_dqn_state_features(self, symbol: str) -> Optional[np.ndarray]: """Generate DQN state features from order book data""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] state_features = [] # Normalized order book state (20 features) total_bid_size = sum(level.size for level in snapshot.bids[:10]) total_ask_size = sum(level.size for level in snapshot.asks[:10]) total_size = total_bid_size + total_ask_size if total_size > 0: for i in range(min(10, len(snapshot.bids))): state_features.append(snapshot.bids[i].size / total_size) # Pad bids while len(state_features) < 10: state_features.append(0.0) for i in range(min(10, len(snapshot.asks))): state_features.append(snapshot.asks[i].size / total_size) # Pad asks while len(state_features) < 20: state_features.append(0.0) else: state_features.extend([0.0] * 20) # Market state indicators (10 features) metrics = self.liquidity_metrics.get(symbol, {}) # Normalize spread as percentage spread_pct = (snapshot.spread / snapshot.mid_price) if snapshot.mid_price > 0 else 0 # Liquidity imbalance liquidity_ratio = metrics.get('liquidity_ratio', 1.0) liquidity_imbalance = (liquidity_ratio - 1) / (liquidity_ratio + 1) # Recent flow signals strength recent_signals = [s for s in self.flow_signals[symbol] if (datetime.now() - s.timestamp).seconds < 30] flow_strength = sum(s.confidence for s in recent_signals) / max(len(recent_signals), 1) # Price volatility (from recent snapshots) if len(self.order_book_history[symbol]) >= 10: recent_prices = [s.mid_price for s in list(self.order_book_history[symbol])[-10:]] price_volatility = np.std(recent_prices) / np.mean(recent_prices) if recent_prices else 0 else: price_volatility = 0 state_features.extend([ spread_pct * 10000, # Spread in basis points liquidity_imbalance, flow_strength, price_volatility * 100, # Volatility as percentage min(len(snapshot.bids), 20) / 20, # Book depth ratio min(len(snapshot.asks), 20) / 20, sweep_count / 10 if 'sweep_count' in locals() else 0, # From CNN features absorption_count / 5 if 'absorption_count' in locals() else 0, momentum_count / 5 if 'momentum_count' in locals() else 0, (datetime.now().hour * 60 + datetime.now().minute) / 1440 # Time of day normalized ]) return np.array(state_features, dtype=np.float32) except Exception as e: logger.error(f"Error generating DQN features for {symbol}: {e}") return None def get_order_heatmap_matrix(self, symbol: str, levels: int = 40) -> Optional[np.ndarray]: """Generate order size heatmap matrix for dashboard visualization""" try: if symbol not in self.order_heatmaps or not self.order_heatmaps[symbol]: return None # Create price levels around current mid price current_snapshot = self.order_books.get(symbol) if not current_snapshot: return None mid_price = current_snapshot.mid_price price_step = mid_price * 0.0001 # 1 basis point steps # Create matrix: time x price levels time_window = min(600, len(self.order_heatmaps[symbol])) # 10 minutes max heatmap_matrix = np.zeros((time_window, levels)) # Fill matrix with order sizes for t, entry in enumerate(list(self.order_heatmaps[symbol])[-time_window:]): for price_offset, level_data in entry['levels'].items(): # Convert price offset to matrix index level_idx = int((price_offset + (levels/2) * price_step) / price_step) if 0 <= level_idx < levels: size_weight = 1.0 if level_data['side'] == 'bid' else -1.0 heatmap_matrix[t, level_idx] = level_data['size'] * size_weight return heatmap_matrix except Exception as e: logger.error(f"Error generating heatmap matrix for {symbol}: {e}") return None def get_volume_profile_data(self, symbol: str) -> Optional[List[Dict]]: """Get session volume profile data""" try: if symbol not in self.volume_profiles: return None profile_data = [] for level in sorted(self.volume_profiles[symbol], key=lambda x: x.price): profile_data.append({ 'price': level.price, 'volume': level.volume, 'buy_volume': level.buy_volume, 'sell_volume': level.sell_volume, 'trades_count': level.trades_count, 'vwap': level.vwap, 'net_volume': level.buy_volume - level.sell_volume }) return profile_data except Exception as e: logger.error(f"Error getting volume profile for {symbol}: {e}") return None def get_current_order_book(self, symbol: str) -> Optional[Dict]: """Get current order book snapshot""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] return { 'timestamp': snapshot.timestamp.isoformat(), 'symbol': symbol, 'mid_price': snapshot.mid_price, 'spread': snapshot.spread, 'bids': [{'price': l.price, 'size': l.size} for l in snapshot.bids[:20]], 'asks': [{'price': l.price, 'size': l.size} for l in snapshot.asks[:20]], 'liquidity_metrics': self.liquidity_metrics.get(symbol, {}), 'recent_signals': [ { 'type': s.signal_type, 'price': s.price, 'volume': s.volume, 'confidence': s.confidence, 'timestamp': s.timestamp.isoformat() } for s in list(self.flow_signals[symbol])[-5:] # Last 5 signals ] } except Exception as e: logger.error(f"Error getting order book for {symbol}: {e}") return None def get_statistics(self) -> Dict[str, Any]: """Get provider statistics""" return { 'symbols': self.symbols, 'is_streaming': self.is_streaming, 'update_counts': dict(self.update_counts), 'last_update_times': {k: v.isoformat() if isinstance(v, datetime) else v for k, v in self.last_update_times.items()}, 'order_books_active': len(self.order_books), 'flow_signals_total': sum(len(signals) for signals in self.flow_signals.values()), 'cnn_callbacks': len(self.cnn_callbacks), 'dqn_callbacks': len(self.dqn_callbacks), 'websocket_tasks': len(self.websocket_tasks) }