""" Order Book Analysis Integration (Free Data Sources) This module provides Bookmap-style functionality using free order book data: - Current Order Book (COB) analysis using Binance free depth streams - Session Volume Profile (SVP) calculated from trade and depth data - Order flow detection (sweeps, absorptions, momentum) - Real-time order book heatmap generation - Level 2 market depth streaming (20 levels via Binance free API) Data is fed to CNN and DQN networks for enhanced trading decisions. Uses only free data sources - no paid APIs required. """ import asyncio import json import logging import time import websockets import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass from threading import Thread, Lock import requests logger = logging.getLogger(__name__) @dataclass class OrderBookLevel: """Single order book level""" price: float size: float orders: int side: str # 'bid' or 'ask' timestamp: datetime @dataclass class OrderBookSnapshot: """Complete order book snapshot""" symbol: str timestamp: datetime bids: List[OrderBookLevel] asks: List[OrderBookLevel] spread: float mid_price: float @dataclass class VolumeProfileLevel: """Volume profile level data""" price: float volume: float buy_volume: float sell_volume: float trades_count: int vwap: float @dataclass class OrderFlowSignal: """Order flow signal detection""" timestamp: datetime signal_type: str # 'sweep', 'absorption', 'iceberg', 'momentum' price: float volume: float confidence: float description: str class BookmapIntegration: """ Order book analysis using free data sources Features: - Real-time order book monitoring (Binance free depth@20 levels) - Order flow pattern detection - Enhanced Session Volume Profile (SVP) analysis - Market microstructure metrics - CNN/DQN model integration - High-frequency order book snapshots for pattern detection """ def __init__(self, symbols: List[str] = None): self.symbols = symbols or ['ETHUSDT', 'BTCUSDT'] self.is_streaming = False # Data storage self.order_books: Dict[str, OrderBookSnapshot] = {} self.order_book_history: Dict[str, deque] = {} self.volume_profiles: Dict[str, List[VolumeProfileLevel]] = {} self.flow_signals: Dict[str, deque] = {} # Enhanced Session Volume Profile tracking self.session_start_time = {} # Track session start for each symbol self.session_volume_profiles: Dict[str, List[VolumeProfileLevel]] = {} self.price_level_cache: Dict[str, Dict[float, VolumeProfileLevel]] = {} # Heatmap data (10-minute rolling window) self.heatmap_window = timedelta(minutes=10) self.order_heatmaps: Dict[str, deque] = {} # Market metrics self.liquidity_metrics: Dict[str, Dict] = {} self.order_book_imbalances: Dict[str, deque] = {} # Enhanced Order Flow Analysis self.aggressive_passive_ratios: Dict[str, deque] = {} self.trade_size_distributions: Dict[str, deque] = {} self.market_maker_taker_flows: Dict[str, deque] = {} self.order_flow_intensity: Dict[str, deque] = {} self.liquidity_consumption_rates: Dict[str, deque] = {} self.price_impact_measurements: Dict[str, deque] = {} # Advanced metrics for institutional vs retail detection self.large_order_threshold = 50000 # $50K+ considered institutional self.block_trade_threshold = 100000 # $100K+ considered block trades self.iceberg_detection_window = 30 # seconds for iceberg detection self.trade_clustering_window = 5 # seconds for trade clustering analysis # Free data source optimization self.depth_snapshots_per_second = 10 # 100ms updates = 10 per second self.trade_aggregation_window = 1.0 # 1 second aggregation self.last_trade_aggregation = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} self.data_lock = Lock() # Model callbacks self.cnn_callbacks: List[Callable] = [] self.dqn_callbacks: List[Callable] = [] # Initialize data structures for symbol in self.symbols: self.order_book_history[symbol] = deque(maxlen=1000) self.order_heatmaps[symbol] = deque(maxlen=600) # 10 min at 1s intervals self.flow_signals[symbol] = deque(maxlen=500) self.order_book_imbalances[symbol] = deque(maxlen=1000) self.session_volume_profiles[symbol] = [] self.price_level_cache[symbol] = {} self.session_start_time[symbol] = datetime.now() self.last_trade_aggregation[symbol] = datetime.now() # Enhanced order flow analysis buffers self.aggressive_passive_ratios[symbol] = deque(maxlen=300) # 5 minutes at 1s intervals self.trade_size_distributions[symbol] = deque(maxlen=1000) self.market_maker_taker_flows[symbol] = deque(maxlen=600) self.order_flow_intensity[symbol] = deque(maxlen=300) self.liquidity_consumption_rates[symbol] = deque(maxlen=300) self.price_impact_measurements[symbol] = deque(maxlen=300) self.liquidity_metrics[symbol] = { 'total_bid_size': 0.0, 'total_ask_size': 0.0, 'weighted_mid': 0.0, 'liquidity_ratio': 1.0, 'avg_spread_bps': 0.0, 'volume_weighted_spread': 0.0 } logger.info(f"Order Book Integration initialized for symbols: {self.symbols}") logger.info("Using FREE data sources: Binance WebSocket depth@20 + trades") def add_cnn_callback(self, callback: Callable[[str, Dict], None]): """Add CNN model callback""" self.cnn_callbacks.append(callback) logger.info(f"Added CNN callback: {len(self.cnn_callbacks)} total") def add_dqn_callback(self, callback: Callable[[str, Dict], None]): """Add DQN model callback""" self.dqn_callbacks.append(callback) logger.info(f"Added DQN callback: {len(self.dqn_callbacks)} total") async def start_streaming(self): """Start order book data streaming""" if self.is_streaming: logger.warning("Bookmap streaming already active") return self.is_streaming = True logger.info("Starting Bookmap order book streaming") # Start streams for each symbol for symbol in self.symbols: # Order book depth stream (20 levels, 100ms updates) depth_task = asyncio.create_task(self._stream_order_book_depth(symbol)) self.websocket_tasks[f"{symbol}_depth"] = depth_task # Trade stream for order flow analysis trade_task = asyncio.create_task(self._stream_trades(symbol)) self.websocket_tasks[f"{symbol}_trades"] = trade_task # Aggregated trade stream (for larger trades and better order flow analysis) agg_trade_task = asyncio.create_task(self._stream_aggregate_trades(symbol)) self.websocket_tasks[f"{symbol}_aggTrade"] = agg_trade_task # 24hr ticker stream (for volume and statistical analysis) ticker_task = asyncio.create_task(self._stream_ticker(symbol)) self.websocket_tasks[f"{symbol}_ticker"] = ticker_task # Start continuous analysis analysis_task = asyncio.create_task(self._continuous_analysis()) self.websocket_tasks["analysis"] = analysis_task logger.info(f"Started streaming for {len(self.symbols)} symbols") async def stop_streaming(self): """Stop streaming""" if not self.is_streaming: return logger.info("Stopping Bookmap streaming") self.is_streaming = False # Cancel all tasks for name, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() logger.info("Bookmap streaming stopped") async def _stream_order_book_depth(self, symbol: str): """Stream order book depth data""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@depth20@100ms" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Order book depth connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_depth_update(symbol, data) except Exception as e: logger.warning(f"Error processing depth for {symbol}: {e}") except Exception as e: logger.error(f"Depth WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _stream_trades(self, symbol: str): """Stream individual trade data for order flow analysis""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@trade" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Trade stream connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_trade_update(symbol, data) except Exception as e: logger.warning(f"Error processing trade for {symbol}: {e}") except Exception as e: logger.error(f"Trade WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _stream_aggregate_trades(self, symbol: str): """Stream aggregated trade data for institutional order flow detection""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@aggTrade" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Aggregate Trade stream connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_aggregate_trade_update(symbol, data) except Exception as e: logger.warning(f"Error processing aggTrade for {symbol}: {e}") except Exception as e: logger.error(f"Aggregate Trade WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _stream_ticker(self, symbol: str): """Stream 24hr ticker data for volume analysis""" binance_symbol = symbol.lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@ticker" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Ticker stream connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_ticker_update(symbol, data) except Exception as e: logger.warning(f"Error processing ticker for {symbol}: {e}") except Exception as e: logger.error(f"Ticker WebSocket error for {symbol}: {e}") if self.is_streaming: await asyncio.sleep(2) async def _process_depth_update(self, symbol: str, data: Dict): """Process order book depth update""" try: timestamp = datetime.now() # Parse bids and asks bids = [] asks = [] for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) bids.append(OrderBookLevel( price=price, size=size, orders=1, side='bid', timestamp=timestamp )) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) asks.append(OrderBookLevel( price=price, size=size, orders=1, side='ask', timestamp=timestamp )) # Sort levels bids.sort(key=lambda x: x.price, reverse=True) asks.sort(key=lambda x: x.price) # Calculate spread and mid price if bids and asks: best_bid = bids[0].price best_ask = asks[0].price spread = best_ask - best_bid mid_price = (best_bid + best_ask) / 2 else: spread = 0.0 mid_price = 0.0 # Create snapshot snapshot = OrderBookSnapshot( symbol=symbol, timestamp=timestamp, bids=bids, asks=asks, spread=spread, mid_price=mid_price ) with self.data_lock: self.order_books[symbol] = snapshot self.order_book_history[symbol].append(snapshot) # Update metrics self._update_liquidity_metrics(symbol, snapshot) self._calculate_order_book_imbalance(symbol, snapshot) self._update_order_heatmap(symbol, snapshot) except Exception as e: logger.error(f"Error processing depth update for {symbol}: {e}") async def _process_trade_update(self, symbol: str, data: Dict): """Process individual trade data with enhanced order flow analysis""" try: timestamp = datetime.fromtimestamp(int(data['T']) / 1000) price = float(data['p']) quantity = float(data['q']) is_buyer_maker = data['m'] trade_id = data.get('t', '') # Calculate trade value trade_value = price * quantity # Enhanced order flow analysis await self._analyze_enhanced_order_flow(symbol, timestamp, price, quantity, trade_value, is_buyer_maker, 'individual') # Traditional order flow analysis await self._analyze_order_flow(symbol, timestamp, price, quantity, is_buyer_maker) # Update volume profile self._update_volume_profile(symbol, price, quantity, is_buyer_maker) except Exception as e: logger.error(f"Error processing trade for {symbol}: {e}") async def _process_aggregate_trade_update(self, symbol: str, data: Dict): """Process aggregated trade data for institutional flow detection""" try: timestamp = datetime.fromtimestamp(int(data['T']) / 1000) price = float(data['p']) quantity = float(data['q']) is_buyer_maker = data['m'] first_trade_id = data.get('f', '') last_trade_id = data.get('l', '') # Calculate trade value and aggregation size trade_value = price * quantity trade_aggregation_size = int(last_trade_id) - int(first_trade_id) + 1 if first_trade_id and last_trade_id else 1 # Enhanced analysis for aggregated trades (institutional detection) await self._analyze_enhanced_order_flow(symbol, timestamp, price, quantity, trade_value, is_buyer_maker, 'aggregated', trade_aggregation_size) # Detect large block trades and iceberg orders await self._detect_institutional_activity(symbol, timestamp, price, quantity, trade_value, trade_aggregation_size, is_buyer_maker) except Exception as e: logger.error(f"Error processing aggregate trade for {symbol}: {e}") async def _process_ticker_update(self, symbol: str, data: Dict): """Process ticker data for volume and statistical analysis""" try: # Extract relevant ticker data volume_24h = float(data.get('v', 0)) # 24hr volume quote_volume_24h = float(data.get('q', 0)) # 24hr quote volume price_change_24h = float(data.get('P', 0)) # 24hr price change % high_24h = float(data.get('h', 0)) low_24h = float(data.get('l', 0)) weighted_avg_price = float(data.get('w', 0)) # Weighted average price # Update volume statistics for relative analysis self._update_volume_statistics(symbol, volume_24h, quote_volume_24h, weighted_avg_price) except Exception as e: logger.error(f"Error processing ticker for {symbol}: {e}") def _update_liquidity_metrics(self, symbol: str, snapshot: OrderBookSnapshot): """Update liquidity metrics""" try: total_bid_size = sum(level.size for level in snapshot.bids) total_ask_size = sum(level.size for level in snapshot.asks) # Weighted mid price if snapshot.bids and snapshot.asks: bid_weight = total_bid_size / (total_bid_size + total_ask_size) ask_weight = total_ask_size / (total_bid_size + total_ask_size) weighted_mid = (snapshot.bids[0].price * ask_weight + snapshot.asks[0].price * bid_weight) else: weighted_mid = snapshot.mid_price # Liquidity ratio liquidity_ratio = total_bid_size / total_ask_size if total_ask_size > 0 else 1.0 self.liquidity_metrics[symbol] = { 'total_bid_size': total_bid_size, 'total_ask_size': total_ask_size, 'weighted_mid': weighted_mid, 'liquidity_ratio': liquidity_ratio, 'spread_bps': (snapshot.spread / snapshot.mid_price) * 10000 if snapshot.mid_price > 0 else 0 } except Exception as e: logger.error(f"Error updating liquidity metrics for {symbol}: {e}") def _calculate_order_book_imbalance(self, symbol: str, snapshot: OrderBookSnapshot): """Calculate order book imbalance""" try: if not snapshot.bids or not snapshot.asks: return # Top 5 levels imbalance n_levels = min(5, len(snapshot.bids), len(snapshot.asks)) total_bid_size = sum(snapshot.bids[i].size for i in range(n_levels)) total_ask_size = sum(snapshot.asks[i].size for i in range(n_levels)) if total_bid_size + total_ask_size > 0: imbalance = (total_bid_size - total_ask_size) / (total_bid_size + total_ask_size) else: imbalance = 0.0 self.order_book_imbalances[symbol].append({ 'timestamp': snapshot.timestamp, 'imbalance': imbalance, 'bid_size': total_bid_size, 'ask_size': total_ask_size }) except Exception as e: logger.error(f"Error calculating imbalance for {symbol}: {e}") def _update_order_heatmap(self, symbol: str, snapshot: OrderBookSnapshot): """Update order heatmap matrix""" try: heatmap_entry = { 'timestamp': snapshot.timestamp, 'mid_price': snapshot.mid_price, 'levels': {} } # Add bid levels for level in snapshot.bids: price_offset = level.price - snapshot.mid_price heatmap_entry['levels'][price_offset] = { 'side': 'bid', 'size': level.size, 'price': level.price } # Add ask levels for level in snapshot.asks: price_offset = level.price - snapshot.mid_price heatmap_entry['levels'][price_offset] = { 'side': 'ask', 'size': level.size, 'price': level.price } self.order_heatmaps[symbol].append(heatmap_entry) # Clean old entries cutoff_time = snapshot.timestamp - self.heatmap_window while (self.order_heatmaps[symbol] and self.order_heatmaps[symbol][0]['timestamp'] < cutoff_time): self.order_heatmaps[symbol].popleft() except Exception as e: logger.error(f"Error updating heatmap for {symbol}: {e}") def _update_volume_profile(self, symbol: str, price: float, quantity: float, is_buyer_maker: bool): """Enhanced Session Volume Profile (SVP) update using free data""" try: # Calculate trade volume in USDT volume = price * quantity # Use price level caching for better performance price_key = round(price, 2) # Round to 2 decimal places for price level grouping # Update session volume profile if price_key not in self.price_level_cache[symbol]: self.price_level_cache[symbol][price_key] = VolumeProfileLevel( price=price_key, volume=0.0, buy_volume=0.0, sell_volume=0.0, trades_count=0, vwap=price ) level = self.price_level_cache[symbol][price_key] old_total_volume = level.volume old_total_quantity = level.trades_count # Update volume metrics level.volume += volume level.trades_count += 1 # Update buy/sell volume breakdown if is_buyer_maker: level.sell_volume += volume # Market maker is selling else: level.buy_volume += volume # Market maker is buying # Calculate Volume Weighted Average Price (VWAP) for this level if level.volume > 0: level.vwap = ((level.vwap * old_total_volume) + (price * volume)) / level.volume # Also update the rolling volume profile (last 10 minutes) self._update_rolling_volume_profile(symbol, price_key, volume, is_buyer_maker) # Session reset detection (every 24 hours or major price gaps) current_time = datetime.now() if self._should_reset_session(symbol, current_time, price): self._reset_session_volume_profile(symbol, current_time) except Exception as e: logger.error(f"Error updating Session Volume Profile for {symbol}: {e}") def _update_rolling_volume_profile(self, symbol: str, price_key: float, volume: float, is_buyer_maker: bool): """Update rolling 10-minute volume profile for real-time heatmap""" try: # Find or create level in regular volume profile price_level = None for level in self.volume_profiles.get(symbol, []): if abs(level.price - price_key) < 0.01: price_level = level break if not price_level: if symbol not in self.volume_profiles: self.volume_profiles[symbol] = [] price_level = VolumeProfileLevel( price=price_key, volume=0.0, buy_volume=0.0, sell_volume=0.0, trades_count=0, vwap=price_key ) self.volume_profiles[symbol].append(price_level) # Update rolling metrics old_volume = price_level.volume price_level.volume += volume price_level.trades_count += 1 if is_buyer_maker: price_level.sell_volume += volume else: price_level.buy_volume += volume # Update VWAP if price_level.volume > 0: price_level.vwap = ((price_level.vwap * old_volume) + (price_key * volume)) / price_level.volume except Exception as e: logger.error(f"Error updating rolling volume profile for {symbol}: {e}") def _should_reset_session(self, symbol: str, current_time: datetime, current_price: float) -> bool: """Determine if session volume profile should be reset""" try: session_start = self.session_start_time.get(symbol) if not session_start: return False # Reset every 24 hours (daily session) if (current_time - session_start).total_seconds() > 86400: # 24 hours return True # Reset on major price gaps (> 5% from session VWAP) if self.price_level_cache.get(symbol): total_volume = sum(level.volume for level in self.price_level_cache[symbol].values()) if total_volume > 0: weighted_price = sum(level.vwap * level.volume for level in self.price_level_cache[symbol].values()) / total_volume price_gap = abs(current_price - weighted_price) / weighted_price if price_gap > 0.05: # 5% gap return True return False except Exception as e: logger.error(f"Error checking session reset for {symbol}: {e}") return False def _reset_session_volume_profile(self, symbol: str, reset_time: datetime): """Reset session volume profile""" try: logger.info(f"Resetting session volume profile for {symbol}") self.session_start_time[symbol] = reset_time self.price_level_cache[symbol] = {} self.session_volume_profiles[symbol] = [] except Exception as e: logger.error(f"Error resetting session volume profile for {symbol}: {e}") async def _analyze_order_flow(self, symbol: str, timestamp: datetime, price: float, quantity: float, is_buyer_maker: bool): """Analyze order flow patterns""" try: if symbol not in self.order_book_history or not self.order_book_history[symbol]: return recent_snapshots = list(self.order_book_history[symbol])[-10:] # Check for sweeps sweep_signal = self._detect_order_sweep(symbol, recent_snapshots, price, quantity, is_buyer_maker) if sweep_signal: self.flow_signals[symbol].append(sweep_signal) await self._notify_flow_signal(symbol, sweep_signal) # Check for absorption absorption_signal = self._detect_absorption(symbol, recent_snapshots, price, quantity) if absorption_signal: self.flow_signals[symbol].append(absorption_signal) await self._notify_flow_signal(symbol, absorption_signal) # Check for momentum momentum_signal = self._detect_momentum_trade(symbol, price, quantity, is_buyer_maker) if momentum_signal: self.flow_signals[symbol].append(momentum_signal) await self._notify_flow_signal(symbol, momentum_signal) except Exception as e: logger.error(f"Error analyzing order flow for {symbol}: {e}") async def _analyze_enhanced_order_flow(self, symbol: str, timestamp: datetime, price: float, quantity: float, trade_value: float, is_buyer_maker: bool, trade_type: str, aggregation_size: int = 1): """Enhanced order flow analysis with aggressive vs passive ratios""" try: # Determine if trade is aggressive (taker) or passive (maker) is_aggressive = not is_buyer_maker # In Binance data, m=false means buyer is taker (aggressive) # Calculate aggressive vs passive ratios self._update_aggressive_passive_ratio(symbol, timestamp, trade_value, is_aggressive) # Update trade size distribution self._update_trade_size_distribution(symbol, timestamp, trade_value, trade_type) # Update market maker vs taker flow self._update_market_maker_taker_flow(symbol, timestamp, trade_value, is_buyer_maker, is_aggressive) # Calculate order flow intensity self._update_order_flow_intensity(symbol, timestamp, trade_value, aggregation_size) # Measure liquidity consumption await self._measure_liquidity_consumption(symbol, timestamp, price, quantity, trade_value, is_aggressive) # Measure price impact await self._measure_price_impact(symbol, timestamp, price, trade_value, is_aggressive) except Exception as e: logger.error(f"Error in enhanced order flow analysis for {symbol}: {e}") def _update_aggressive_passive_ratio(self, symbol: str, timestamp: datetime, trade_value: float, is_aggressive: bool): """Update aggressive vs passive participant ratios""" try: current_window = [] cutoff_time = timestamp - timedelta(seconds=60) # 1-minute window # Filter recent trades within window for entry in self.aggressive_passive_ratios[symbol]: if entry['timestamp'] > cutoff_time: current_window.append(entry) # Add current trade current_window.append({ 'timestamp': timestamp, 'trade_value': trade_value, 'is_aggressive': is_aggressive }) # Calculate ratios aggressive_volume = sum(t['trade_value'] for t in current_window if t['is_aggressive']) passive_volume = sum(t['trade_value'] for t in current_window if not t['is_aggressive']) total_volume = aggressive_volume + passive_volume if total_volume > 0: aggressive_ratio = aggressive_volume / total_volume passive_ratio = passive_volume / total_volume ratio_data = { 'timestamp': timestamp, 'aggressive_ratio': aggressive_ratio, 'passive_ratio': passive_ratio, 'aggressive_volume': aggressive_volume, 'passive_volume': passive_volume, 'total_volume': total_volume, 'trade_count': len(current_window), 'avg_aggressive_size': aggressive_volume / max(1, sum(1 for t in current_window if t['is_aggressive'])), 'avg_passive_size': passive_volume / max(1, sum(1 for t in current_window if not t['is_aggressive'])) } # Update buffer self.aggressive_passive_ratios[symbol].clear() self.aggressive_passive_ratios[symbol].extend(current_window) # Store calculated ratios for use by models if not hasattr(self, 'current_flow_ratios'): self.current_flow_ratios = {} self.current_flow_ratios[symbol] = ratio_data except Exception as e: logger.error(f"Error updating aggressive/passive ratio for {symbol}: {e}") def _update_trade_size_distribution(self, symbol: str, timestamp: datetime, trade_value: float, trade_type: str): """Update trade size distribution for institutional vs retail detection""" try: # Classify trade size if trade_value < 1000: size_category = 'micro' # < $1K (retail) elif trade_value < 10000: size_category = 'small' # $1K-$10K (retail/small institutional) elif trade_value < 50000: size_category = 'medium' # $10K-$50K (institutional) elif trade_value < 100000: size_category = 'large' # $50K-$100K (large institutional) else: size_category = 'block' # > $100K (block trades) trade_data = { 'timestamp': timestamp, 'trade_value': trade_value, 'trade_type': trade_type, 'size_category': size_category, 'is_institutional': trade_value >= self.large_order_threshold, 'is_block_trade': trade_value >= self.block_trade_threshold } self.trade_size_distributions[symbol].append(trade_data) except Exception as e: logger.error(f"Error updating trade size distribution for {symbol}: {e}") def _update_market_maker_taker_flow(self, symbol: str, timestamp: datetime, trade_value: float, is_buyer_maker: bool, is_aggressive: bool): """Update market maker vs taker flow analysis""" try: flow_data = { 'timestamp': timestamp, 'trade_value': trade_value, 'is_buyer_maker': is_buyer_maker, 'is_aggressive': is_aggressive, 'flow_direction': 'buy_aggressive' if not is_buyer_maker else 'sell_aggressive', 'market_maker_side': 'sell' if is_buyer_maker else 'buy' } self.market_maker_taker_flows[symbol].append(flow_data) except Exception as e: logger.error(f"Error updating market maker/taker flow for {symbol}: {e}") def _update_order_flow_intensity(self, symbol: str, timestamp: datetime, trade_value: float, aggregation_size: int): """Calculate order flow intensity based on trade frequency and size""" try: # Calculate intensity based on trade value and aggregation base_intensity = trade_value / 10000 # Normalize by $10K aggregation_intensity = aggregation_size / 10 # Normalize aggregation factor # Time-based intensity (trades per second) recent_trades = [t for t in self.order_flow_intensity[symbol] if (timestamp - t['timestamp']).total_seconds() < 10] time_intensity = len(recent_trades) / 10 # Trades per second over 10s window intensity_score = base_intensity * (1 + aggregation_intensity) * (1 + time_intensity) intensity_data = { 'timestamp': timestamp, 'intensity_score': intensity_score, 'base_intensity': base_intensity, 'aggregation_intensity': aggregation_intensity, 'time_intensity': time_intensity, 'trade_value': trade_value, 'aggregation_size': aggregation_size } self.order_flow_intensity[symbol].append(intensity_data) except Exception as e: logger.error(f"Error updating order flow intensity for {symbol}: {e}") async def _measure_liquidity_consumption(self, symbol: str, timestamp: datetime, price: float, quantity: float, trade_value: float, is_aggressive: bool): """Measure liquidity consumption rates""" try: if not is_aggressive: return # Only measure for aggressive trades current_snapshot = self.order_books.get(symbol) if not current_snapshot: return # Calculate how much liquidity was consumed if price >= current_snapshot.mid_price: # Buy-side consumption consumed_liquidity = 0 for ask_level in current_snapshot.asks: if ask_level.price <= price: consumed_liquidity += min(ask_level.size, quantity) * ask_level.price quantity -= ask_level.size if quantity <= 0: break else: # Sell-side consumption consumed_liquidity = 0 for bid_level in current_snapshot.bids: if bid_level.price >= price: consumed_liquidity += min(bid_level.size, quantity) * bid_level.price quantity -= bid_level.size if quantity <= 0: break consumption_rate = consumed_liquidity / trade_value if trade_value > 0 else 0 consumption_data = { 'timestamp': timestamp, 'price': price, 'trade_value': trade_value, 'consumed_liquidity': consumed_liquidity, 'consumption_rate': consumption_rate, 'side': 'buy' if price >= current_snapshot.mid_price else 'sell' } self.liquidity_consumption_rates[symbol].append(consumption_data) except Exception as e: logger.error(f"Error measuring liquidity consumption for {symbol}: {e}") async def _measure_price_impact(self, symbol: str, timestamp: datetime, price: float, trade_value: float, is_aggressive: bool): """Measure price impact of trades""" try: if not is_aggressive: return # Get price before and after (approximated by looking at recent snapshots) recent_snapshots = list(self.order_book_history[symbol])[-5:] if len(recent_snapshots) < 2: return price_before = recent_snapshots[-2].mid_price price_after = recent_snapshots[-1].mid_price price_impact = abs(price_after - price_before) / price_before if price_before > 0 else 0 impact_per_dollar = price_impact / (trade_value / 1000000) if trade_value > 0 else 0 # Impact per $1M impact_data = { 'timestamp': timestamp, 'trade_price': price, 'trade_value': trade_value, 'price_before': price_before, 'price_after': price_after, 'price_impact': price_impact, 'impact_per_million': impact_per_dollar, 'impact_category': self._categorize_impact(price_impact) } self.price_impact_measurements[symbol].append(impact_data) except Exception as e: logger.error(f"Error measuring price impact for {symbol}: {e}") def _categorize_impact(self, price_impact: float) -> str: """Categorize price impact level""" if price_impact < 0.0001: # < 0.01% return 'minimal' elif price_impact < 0.001: # < 0.1% return 'low' elif price_impact < 0.005: # < 0.5% return 'medium' elif price_impact < 0.01: # < 1% return 'high' else: return 'extreme' async def _detect_institutional_activity(self, symbol: str, timestamp: datetime, price: float, quantity: float, trade_value: float, aggregation_size: int, is_buyer_maker: bool): """Detect institutional trading activity patterns""" try: # Block trade detection if trade_value >= self.block_trade_threshold: signal = OrderFlowSignal( timestamp=timestamp, signal_type='block_trade', price=price, volume=trade_value, confidence=min(0.95, trade_value / 500000), # Higher confidence for larger trades description=f"Block trade: ${trade_value:.0f} ({'Buy' if not is_buyer_maker else 'Sell'})" ) self.flow_signals[symbol].append(signal) await self._notify_flow_signal(symbol, signal) # Iceberg order detection (multiple large aggregated trades in sequence) await self._detect_iceberg_orders(symbol, timestamp, price, trade_value, aggregation_size, is_buyer_maker) # High-frequency activity detection await self._detect_hft_activity(symbol, timestamp, trade_value, aggregation_size) except Exception as e: logger.error(f"Error detecting institutional activity for {symbol}: {e}") async def _detect_iceberg_orders(self, symbol: str, timestamp: datetime, price: float, trade_value: float, aggregation_size: int, is_buyer_maker: bool): """Detect iceberg order patterns""" try: if trade_value < self.large_order_threshold: return # Look for similar-sized trades in recent history cutoff_time = timestamp - timedelta(seconds=self.iceberg_detection_window) recent_large_trades = [] for trade_data in self.trade_size_distributions[symbol]: if (trade_data['timestamp'] > cutoff_time and trade_data['trade_value'] >= self.large_order_threshold): recent_large_trades.append(trade_data) # Iceberg pattern: 3+ large trades with similar sizes if len(recent_large_trades) >= 3: avg_size = sum(t['trade_value'] for t in recent_large_trades) / len(recent_large_trades) size_consistency = all(abs(t['trade_value'] - avg_size) / avg_size < 0.2 for t in recent_large_trades) if size_consistency: total_iceberg_volume = sum(t['trade_value'] for t in recent_large_trades) confidence = min(0.9, len(recent_large_trades) / 10 + total_iceberg_volume / 1000000) signal = OrderFlowSignal( timestamp=timestamp, signal_type='iceberg', price=price, volume=total_iceberg_volume, confidence=confidence, description=f"Iceberg: {len(recent_large_trades)} trades, ${total_iceberg_volume:.0f} total" ) self.flow_signals[symbol].append(signal) await self._notify_flow_signal(symbol, signal) except Exception as e: logger.error(f"Error detecting iceberg orders for {symbol}: {e}") async def _detect_hft_activity(self, symbol: str, timestamp: datetime, trade_value: float, aggregation_size: int): """Detect high-frequency trading activity""" try: # Look for high-frequency patterns (many small trades in rapid succession) cutoff_time = timestamp - timedelta(seconds=5) recent_trades = [t for t in self.order_flow_intensity[symbol] if t['timestamp'] > cutoff_time] if len(recent_trades) >= 20: # 20+ trades in 5 seconds avg_trade_size = sum(t['trade_value'] for t in recent_trades) / len(recent_trades) if avg_trade_size < 5000: # Small average trade size suggests HFT total_hft_volume = sum(t['trade_value'] for t in recent_trades) confidence = min(0.8, len(recent_trades) / 50) signal = OrderFlowSignal( timestamp=timestamp, signal_type='hft_activity', price=0, # Multiple prices volume=total_hft_volume, confidence=confidence, description=f"HFT: {len(recent_trades)} trades in 5s, avg ${avg_trade_size:.0f}" ) self.flow_signals[symbol].append(signal) await self._notify_flow_signal(symbol, signal) except Exception as e: logger.error(f"Error detecting HFT activity for {symbol}: {e}") def _update_volume_statistics(self, symbol: str, volume_24h: float, quote_volume_24h: float, weighted_avg_price: float): """Update volume statistics for relative analysis""" try: # Store 24h volume data for relative comparisons if not hasattr(self, 'volume_stats'): self.volume_stats = {} self.volume_stats[symbol] = { 'volume_24h': volume_24h, 'quote_volume_24h': quote_volume_24h, 'weighted_avg_price': weighted_avg_price, 'timestamp': datetime.now() } except Exception as e: logger.error(f"Error updating volume statistics for {symbol}: {e}") def _detect_order_sweep(self, symbol: str, snapshots: List[OrderBookSnapshot], price: float, quantity: float, is_buyer_maker: bool) -> Optional[OrderFlowSignal]: """Detect order book sweeps""" try: if len(snapshots) < 2: return None before_snapshot = snapshots[-2] if is_buyer_maker: # Sell order, check ask side levels_consumed = 0 total_consumed_size = 0 for level in before_snapshot.asks[:5]: if level.price <= price: levels_consumed += 1 total_consumed_size += level.size if levels_consumed >= 2 and total_consumed_size > quantity * 1.5: confidence = min(0.9, levels_consumed / 5.0 + 0.3) return OrderFlowSignal( timestamp=datetime.now(), signal_type='sweep', price=price, volume=quantity * price, confidence=confidence, description=f"Sell sweep: {levels_consumed} levels" ) else: # Buy order, check bid side levels_consumed = 0 total_consumed_size = 0 for level in before_snapshot.bids[:5]: if level.price >= price: levels_consumed += 1 total_consumed_size += level.size if levels_consumed >= 2 and total_consumed_size > quantity * 1.5: confidence = min(0.9, levels_consumed / 5.0 + 0.3) return OrderFlowSignal( timestamp=datetime.now(), signal_type='sweep', price=price, volume=quantity * price, confidence=confidence, description=f"Buy sweep: {levels_consumed} levels" ) return None except Exception as e: logger.error(f"Error detecting sweep for {symbol}: {e}") return None def _detect_absorption(self, symbol: str, snapshots: List[OrderBookSnapshot], price: float, quantity: float) -> Optional[OrderFlowSignal]: """Detect absorption patterns""" try: if len(snapshots) < 3: return None volume_threshold = 10000 # $10K minimum price_impact_threshold = 0.001 # 0.1% max impact trade_value = price * quantity if trade_value < volume_threshold: return None # Calculate price impact price_before = snapshots[-3].mid_price price_after = snapshots[-1].mid_price price_impact = abs(price_after - price_before) / price_before if price_impact < price_impact_threshold: confidence = min(0.8, (trade_value / 50000) * 0.5 + 0.3) return OrderFlowSignal( timestamp=datetime.now(), signal_type='absorption', price=price, volume=trade_value, confidence=confidence, description=f"Absorption: ${trade_value:.0f}" ) return None except Exception as e: logger.error(f"Error detecting absorption for {symbol}: {e}") return None def _detect_momentum_trade(self, symbol: str, price: float, quantity: float, is_buyer_maker: bool) -> Optional[OrderFlowSignal]: """Detect momentum trades""" try: trade_value = price * quantity momentum_threshold = 25000 # $25K minimum if trade_value < momentum_threshold: return None confidence = min(0.9, trade_value / 100000 * 0.6 + 0.3) direction = "sell" if is_buyer_maker else "buy" return OrderFlowSignal( timestamp=datetime.now(), signal_type='momentum', price=price, volume=trade_value, confidence=confidence, description=f"Large {direction}: ${trade_value:.0f}" ) except Exception as e: logger.error(f"Error detecting momentum for {symbol}: {e}") return None async def _notify_flow_signal(self, symbol: str, signal: OrderFlowSignal): """Notify models of flow signals""" try: signal_data = { 'signal_type': signal.signal_type, 'price': signal.price, 'volume': signal.volume, 'confidence': signal.confidence, 'timestamp': signal.timestamp, 'description': signal.description } # Notify CNN callbacks for callback in self.cnn_callbacks: try: callback(symbol, signal_data) except Exception as e: logger.warning(f"Error in CNN callback: {e}") # Notify DQN callbacks for callback in self.dqn_callbacks: try: callback(symbol, signal_data) except Exception as e: logger.warning(f"Error in DQN callback: {e}") except Exception as e: logger.error(f"Error notifying flow signal: {e}") async def _continuous_analysis(self): """Continuous analysis and model feeding""" while self.is_streaming: try: await asyncio.sleep(1) # Analyze every second for symbol in self.symbols: # Generate features for models cnn_features = self.get_cnn_features(symbol) if cnn_features is not None: for callback in self.cnn_callbacks: try: callback(symbol, {'features': cnn_features, 'type': 'orderbook'}) except Exception as e: logger.warning(f"Error in CNN feature callback: {e}") dqn_features = self.get_dqn_state_features(symbol) if dqn_features is not None: for callback in self.dqn_callbacks: try: callback(symbol, {'state': dqn_features, 'type': 'orderbook'}) except Exception as e: logger.warning(f"Error in DQN state callback: {e}") except Exception as e: logger.error(f"Error in continuous analysis: {e}") await asyncio.sleep(5) def get_cnn_features(self, symbol: str) -> Optional[np.ndarray]: """Generate CNN features from order book data""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] features = [] # Order book features (80 features: 20 levels x 2 sides x 2 values) for i in range(min(20, len(snapshot.bids))): bid = snapshot.bids[i] features.append(bid.size) features.append(bid.price - snapshot.mid_price) # Pad bids while len(features) < 40: features.extend([0.0, 0.0]) for i in range(min(20, len(snapshot.asks))): ask = snapshot.asks[i] features.append(ask.size) features.append(ask.price - snapshot.mid_price) # Pad asks while len(features) < 80: features.extend([0.0, 0.0]) # Liquidity metrics (10 features) metrics = self.liquidity_metrics.get(symbol, {}) features.extend([ metrics.get('total_bid_size', 0.0), metrics.get('total_ask_size', 0.0), metrics.get('liquidity_ratio', 1.0), metrics.get('spread_bps', 0.0), snapshot.spread, metrics.get('weighted_mid', snapshot.mid_price) - snapshot.mid_price, len(snapshot.bids), len(snapshot.asks), snapshot.mid_price, time.time() % 86400 # Time of day ]) # Order book imbalance (5 features) if self.order_book_imbalances[symbol]: latest_imbalance = self.order_book_imbalances[symbol][-1] features.extend([ latest_imbalance['imbalance'], latest_imbalance['bid_size'], latest_imbalance['ask_size'], latest_imbalance['bid_size'] + latest_imbalance['ask_size'], abs(latest_imbalance['imbalance']) ]) else: features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) # Enhanced flow signals (15 features) recent_signals = [s for s in self.flow_signals[symbol] if (datetime.now() - s.timestamp).seconds < 60] sweep_count = sum(1 for s in recent_signals if s.signal_type == 'sweep') absorption_count = sum(1 for s in recent_signals if s.signal_type == 'absorption') momentum_count = sum(1 for s in recent_signals if s.signal_type == 'momentum') block_count = sum(1 for s in recent_signals if s.signal_type == 'block_trade') iceberg_count = sum(1 for s in recent_signals if s.signal_type == 'iceberg') hft_count = sum(1 for s in recent_signals if s.signal_type == 'hft_activity') max_confidence = max([s.confidence for s in recent_signals], default=0.0) total_flow_volume = sum(s.volume for s in recent_signals) # Enhanced order flow metrics flow_metrics = self.get_enhanced_order_flow_metrics(symbol) if flow_metrics: aggressive_ratio = flow_metrics['aggressive_passive']['aggressive_ratio'] institutional_ratio = flow_metrics['institutional_retail']['institutional_ratio'] flow_intensity = flow_metrics['flow_intensity']['current_intensity'] avg_consumption_rate = flow_metrics['liquidity']['avg_consumption_rate'] avg_price_impact = flow_metrics['price_impact']['avg_impact'] / 10000 # Normalize from basis points buy_pressure = flow_metrics['maker_taker_flow']['buy_pressure'] sell_pressure = flow_metrics['maker_taker_flow']['sell_pressure'] else: aggressive_ratio = 0.5 institutional_ratio = 0.5 flow_intensity = 0.0 avg_consumption_rate = 0.0 avg_price_impact = 0.0 buy_pressure = 0.5 sell_pressure = 0.5 features.extend([ sweep_count, absorption_count, momentum_count, block_count, iceberg_count, hft_count, max_confidence, total_flow_volume, aggressive_ratio, institutional_ratio, flow_intensity, avg_consumption_rate, avg_price_impact, buy_pressure, sell_pressure ]) return np.array(features, dtype=np.float32) except Exception as e: logger.error(f"Error generating CNN features for {symbol}: {e}") return None def get_dqn_state_features(self, symbol: str) -> Optional[np.ndarray]: """Generate DQN state features""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] state_features = [] # Normalized order book state (20 features) total_bid_size = sum(level.size for level in snapshot.bids[:10]) total_ask_size = sum(level.size for level in snapshot.asks[:10]) total_size = total_bid_size + total_ask_size if total_size > 0: for i in range(min(10, len(snapshot.bids))): state_features.append(snapshot.bids[i].size / total_size) while len(state_features) < 10: state_features.append(0.0) for i in range(min(10, len(snapshot.asks))): state_features.append(snapshot.asks[i].size / total_size) while len(state_features) < 20: state_features.append(0.0) else: state_features.extend([0.0] * 20) # Enhanced market state indicators (20 features) metrics = self.liquidity_metrics.get(symbol, {}) spread_pct = (snapshot.spread / snapshot.mid_price) if snapshot.mid_price > 0 else 0 liquidity_ratio = metrics.get('liquidity_ratio', 1.0) liquidity_imbalance = (liquidity_ratio - 1) / (liquidity_ratio + 1) # Flow strength recent_signals = [s for s in self.flow_signals[symbol] if (datetime.now() - s.timestamp).seconds < 30] flow_strength = sum(s.confidence for s in recent_signals) / max(len(recent_signals), 1) # Price volatility if len(self.order_book_history[symbol]) >= 10: recent_prices = [s.mid_price for s in list(self.order_book_history[symbol])[-10:]] price_volatility = np.std(recent_prices) / np.mean(recent_prices) if recent_prices else 0 else: price_volatility = 0 # Enhanced order flow metrics for DQN flow_metrics = self.get_enhanced_order_flow_metrics(symbol) if flow_metrics: aggressive_ratio = flow_metrics['aggressive_passive']['aggressive_ratio'] institutional_ratio = flow_metrics['institutional_retail']['institutional_ratio'] flow_intensity = min(flow_metrics['flow_intensity']['current_intensity'] / 10, 1.0) # Normalize consumption_rate = flow_metrics['liquidity']['avg_consumption_rate'] price_impact = min(flow_metrics['price_impact']['avg_impact'] / 100, 1.0) # Normalize basis points buy_pressure = flow_metrics['maker_taker_flow']['buy_pressure'] sell_pressure = flow_metrics['maker_taker_flow']['sell_pressure'] # Trade size distribution ratios size_dist = flow_metrics['size_distribution'] total_trades = sum(size_dist.values()) or 1 retail_ratio = (size_dist.get('micro', 0) + size_dist.get('small', 0)) / total_trades institutional_trade_ratio = (size_dist.get('large', 0) + size_dist.get('block', 0)) / total_trades # Recent activity indicators block_activity = min(size_dist.get('block', 0) / 10, 1.0) # Normalize else: aggressive_ratio = 0.5 institutional_ratio = 0.5 flow_intensity = 0.0 consumption_rate = 0.0 price_impact = 0.0 buy_pressure = 0.5 sell_pressure = 0.5 retail_ratio = 0.5 institutional_trade_ratio = 0.5 block_activity = 0.0 state_features.extend([ spread_pct * 10000, # Spread in basis points liquidity_imbalance, flow_strength, price_volatility * 100, min(len(snapshot.bids), 20) / 20, min(len(snapshot.asks), 20) / 20, len([s for s in recent_signals if s.signal_type == 'sweep']) / 10, len([s for s in recent_signals if s.signal_type == 'absorption']) / 5, len([s for s in recent_signals if s.signal_type == 'momentum']) / 5, (datetime.now().hour * 60 + datetime.now().minute) / 1440, # Enhanced order flow state features aggressive_ratio, institutional_ratio, flow_intensity, consumption_rate, price_impact, buy_pressure, sell_pressure, retail_ratio, institutional_trade_ratio, block_activity ]) return np.array(state_features, dtype=np.float32) except Exception as e: logger.error(f"Error generating DQN features for {symbol}: {e}") return None def get_order_heatmap_matrix(self, symbol: str, levels: int = 40) -> Optional[np.ndarray]: """Generate heatmap matrix for visualization""" try: if symbol not in self.order_heatmaps or not self.order_heatmaps[symbol]: return None current_snapshot = self.order_books.get(symbol) if not current_snapshot: return None mid_price = current_snapshot.mid_price price_step = mid_price * 0.0001 # 1 basis point steps # Matrix: time x price levels time_window = min(600, len(self.order_heatmaps[symbol])) heatmap_matrix = np.zeros((time_window, levels)) # Fill matrix for t, entry in enumerate(list(self.order_heatmaps[symbol])[-time_window:]): for price_offset, level_data in entry['levels'].items(): level_idx = int((price_offset + (levels/2) * price_step) / price_step) if 0 <= level_idx < levels: size_weight = 1.0 if level_data['side'] == 'bid' else -1.0 heatmap_matrix[t, level_idx] = level_data['size'] * size_weight return heatmap_matrix except Exception as e: logger.error(f"Error generating heatmap matrix for {symbol}: {e}") return None def get_dashboard_data(self, symbol: str) -> Optional[Dict]: """Get data for dashboard visualization""" try: if symbol not in self.order_books: return None snapshot = self.order_books[symbol] return { 'timestamp': snapshot.timestamp.isoformat(), 'symbol': symbol, 'mid_price': snapshot.mid_price, 'spread': snapshot.spread, 'bids': [{'price': l.price, 'size': l.size} for l in snapshot.bids[:20]], 'asks': [{'price': l.price, 'size': l.size} for l in snapshot.asks[:20]], 'liquidity_metrics': self.liquidity_metrics.get(symbol, {}), 'volume_profile': self.get_volume_profile_data(symbol), 'heatmap_matrix': self.get_order_heatmap_matrix(symbol).tolist() if self.get_order_heatmap_matrix(symbol) is not None else None, 'enhanced_order_flow': self.get_enhanced_order_flow_metrics(symbol), 'recent_signals': [ { 'type': s.signal_type, 'price': s.price, 'volume': s.volume, 'confidence': s.confidence, 'timestamp': s.timestamp.isoformat(), 'description': s.description } for s in list(self.flow_signals[symbol])[-10:] ] } except Exception as e: logger.error(f"Error getting dashboard data for {symbol}: {e}") return None def get_volume_profile_data(self, symbol: str) -> Optional[List[Dict]]: """Get rolling volume profile data (10-minute window)""" try: if symbol not in self.volume_profiles: return None profile_data = [] for level in sorted(self.volume_profiles[symbol], key=lambda x: x.price): profile_data.append({ 'price': level.price, 'volume': level.volume, 'buy_volume': level.buy_volume, 'sell_volume': level.sell_volume, 'trades_count': level.trades_count, 'vwap': level.vwap, 'net_volume': level.buy_volume - level.sell_volume }) return profile_data except Exception as e: logger.error(f"Error getting volume profile for {symbol}: {e}") return None def get_session_volume_profile_data(self, symbol: str) -> Optional[List[Dict]]: """Get Session Volume Profile (SVP) data - full session data""" try: if symbol not in self.price_level_cache: return None session_data = [] total_volume = sum(level.volume for level in self.price_level_cache[symbol].values()) for price_key, level in sorted(self.price_level_cache[symbol].items()): volume_percentage = (level.volume / total_volume * 100) if total_volume > 0 else 0 session_data.append({ 'price': level.price, 'volume': level.volume, 'buy_volume': level.buy_volume, 'sell_volume': level.sell_volume, 'trades_count': level.trades_count, 'vwap': level.vwap, 'net_volume': level.buy_volume - level.sell_volume, 'volume_percentage': volume_percentage, 'is_high_volume_node': volume_percentage > 2.0, # Mark significant price levels 'buy_sell_ratio': level.buy_volume / level.sell_volume if level.sell_volume > 0 else float('inf') }) return session_data except Exception as e: logger.error(f"Error getting Session Volume Profile for {symbol}: {e}") return None def get_session_statistics(self, symbol: str) -> Optional[Dict]: """Get session trading statistics""" try: if symbol not in self.price_level_cache: return None levels = list(self.price_level_cache[symbol].values()) if not levels: return None total_volume = sum(level.volume for level in levels) total_buy_volume = sum(level.buy_volume for level in levels) total_sell_volume = sum(level.sell_volume for level in levels) total_trades = sum(level.trades_count for level in levels) # Calculate session VWAP session_vwap = sum(level.vwap * level.volume for level in levels) / total_volume if total_volume > 0 else 0 # Find price extremes prices = [level.price for level in levels] session_high = max(prices) if prices else 0 session_low = min(prices) if prices else 0 # Find Point of Control (POC) - price level with highest volume poc_level = max(levels, key=lambda x: x.volume) if levels else None poc_price = poc_level.price if poc_level else 0 poc_volume = poc_level.volume if poc_level else 0 # Calculate Value Area (70% of volume around POC) sorted_levels = sorted(levels, key=lambda x: x.volume, reverse=True) value_area_volume = total_volume * 0.7 value_area_levels = [] current_volume = 0 for level in sorted_levels: value_area_levels.append(level) current_volume += level.volume if current_volume >= value_area_volume: break value_area_high = max(level.price for level in value_area_levels) if value_area_levels else 0 value_area_low = min(level.price for level in value_area_levels) if value_area_levels else 0 session_start = self.session_start_time.get(symbol, datetime.now()) session_duration = (datetime.now() - session_start).total_seconds() / 3600 # Hours return { 'symbol': symbol, 'session_start': session_start.isoformat(), 'session_duration_hours': session_duration, 'total_volume': total_volume, 'total_buy_volume': total_buy_volume, 'total_sell_volume': total_sell_volume, 'total_trades': total_trades, 'session_vwap': session_vwap, 'session_high': session_high, 'session_low': session_low, 'poc_price': poc_price, 'poc_volume': poc_volume, 'value_area_high': value_area_high, 'value_area_low': value_area_low, 'value_area_range': value_area_high - value_area_low, 'buy_sell_ratio': total_buy_volume / total_sell_volume if total_sell_volume > 0 else float('inf'), 'price_levels_traded': len(levels), 'avg_trade_size': total_volume / total_trades if total_trades > 0 else 0 } except Exception as e: logger.error(f"Error getting session statistics for {symbol}: {e}") return None def get_market_profile_analysis(self, symbol: str) -> Optional[Dict]: """Get detailed market profile analysis""" try: current_snapshot = self.order_books.get(symbol) session_stats = self.get_session_statistics(symbol) svp_data = self.get_session_volume_profile_data(symbol) if not all([current_snapshot, session_stats, svp_data]): return None current_price = current_snapshot.mid_price session_vwap = session_stats['session_vwap'] poc_price = session_stats['poc_price'] value_area_high = session_stats['value_area_high'] value_area_low = session_stats['value_area_low'] # Market structure analysis price_vs_vwap = "above" if current_price > session_vwap else "below" price_vs_poc = "above" if current_price > poc_price else "below" in_value_area = value_area_low <= current_price <= value_area_high # Find support and resistance levels from high volume nodes high_volume_nodes = [item for item in svp_data if item['is_high_volume_node']] resistance_levels = [node['price'] for node in high_volume_nodes if node['price'] > current_price] support_levels = [node['price'] for node in high_volume_nodes if node['price'] < current_price] # Sort to get nearest levels resistance_levels.sort() support_levels.sort(reverse=True) return { 'symbol': symbol, 'current_price': current_price, 'market_structure': { 'price_vs_vwap': price_vs_vwap, 'price_vs_poc': price_vs_poc, 'in_value_area': in_value_area, 'distance_from_vwap_bps': int(abs(current_price - session_vwap) / session_vwap * 10000), 'distance_from_poc_bps': int(abs(current_price - poc_price) / poc_price * 10000) }, 'key_levels': { 'session_vwap': session_vwap, 'poc_price': poc_price, 'value_area_high': value_area_high, 'value_area_low': value_area_low, 'nearest_resistance': resistance_levels[0] if resistance_levels else None, 'nearest_support': support_levels[0] if support_levels else None }, 'volume_analysis': { 'total_high_volume_nodes': len(high_volume_nodes), 'resistance_levels': resistance_levels[:3], # Top 3 resistance 'support_levels': support_levels[:3], # Top 3 support 'poc_strength': session_stats['poc_volume'] / session_stats['total_volume'] * 100 }, 'session_statistics': session_stats } except Exception as e: logger.error(f"Error getting market profile analysis for {symbol}: {e}") return None def get_enhanced_order_flow_metrics(self, symbol: str) -> Optional[Dict]: """Get enhanced order flow metrics including aggressive vs passive ratios""" try: if symbol not in self.current_flow_ratios: return None current_ratios = self.current_flow_ratios.get(symbol, {}) # Get recent trade size distribution recent_trades = list(self.trade_size_distributions[symbol])[-100:] # Last 100 trades if not recent_trades: return None # Calculate institutional vs retail breakdown institutional_trades = [t for t in recent_trades if t['is_institutional']] retail_trades = [t for t in recent_trades if not t['is_institutional']] block_trades = [t for t in recent_trades if t['is_block_trade']] institutional_volume = sum(t['trade_value'] for t in institutional_trades) retail_volume = sum(t['trade_value'] for t in retail_trades) total_volume = institutional_volume + retail_volume # Size category breakdown size_breakdown = { 'micro': len([t for t in recent_trades if t['size_category'] == 'micro']), 'small': len([t for t in recent_trades if t['size_category'] == 'small']), 'medium': len([t for t in recent_trades if t['size_category'] == 'medium']), 'large': len([t for t in recent_trades if t['size_category'] == 'large']), 'block': len([t for t in recent_trades if t['size_category'] == 'block']) } # Get recent order flow intensity recent_intensity = list(self.order_flow_intensity[symbol])[-10:] avg_intensity = sum(i['intensity_score'] for i in recent_intensity) / max(1, len(recent_intensity)) # Get recent liquidity consumption recent_consumption = list(self.liquidity_consumption_rates[symbol])[-20:] avg_consumption_rate = sum(c['consumption_rate'] for c in recent_consumption) / max(1, len(recent_consumption)) # Get recent price impact recent_impacts = list(self.price_impact_measurements[symbol])[-20:] avg_price_impact = sum(i['price_impact'] for i in recent_impacts) / max(1, len(recent_impacts)) # Impact distribution impact_distribution = {} for impact in recent_impacts: category = impact['impact_category'] impact_distribution[category] = impact_distribution.get(category, 0) + 1 # Market maker vs taker flow analysis recent_flows = list(self.market_maker_taker_flows[symbol])[-50:] buy_aggressive_volume = sum(f['trade_value'] for f in recent_flows if f['flow_direction'] == 'buy_aggressive') sell_aggressive_volume = sum(f['trade_value'] for f in recent_flows if f['flow_direction'] == 'sell_aggressive') return { 'symbol': symbol, 'timestamp': datetime.now().isoformat(), # Aggressive vs Passive Analysis 'aggressive_passive': { 'aggressive_ratio': current_ratios.get('aggressive_ratio', 0), 'passive_ratio': current_ratios.get('passive_ratio', 0), 'aggressive_volume': current_ratios.get('aggressive_volume', 0), 'passive_volume': current_ratios.get('passive_volume', 0), 'avg_aggressive_size': current_ratios.get('avg_aggressive_size', 0), 'avg_passive_size': current_ratios.get('avg_passive_size', 0), 'trade_count': current_ratios.get('trade_count', 0) }, # Institutional vs Retail Analysis 'institutional_retail': { 'institutional_ratio': institutional_volume / total_volume if total_volume > 0 else 0, 'retail_ratio': retail_volume / total_volume if total_volume > 0 else 0, 'institutional_volume': institutional_volume, 'retail_volume': retail_volume, 'institutional_trade_count': len(institutional_trades), 'retail_trade_count': len(retail_trades), 'block_trade_count': len(block_trades), 'avg_institutional_size': institutional_volume / max(1, len(institutional_trades)), 'avg_retail_size': retail_volume / max(1, len(retail_trades)) }, # Trade Size Distribution 'size_distribution': size_breakdown, # Order Flow Intensity 'flow_intensity': { 'current_intensity': avg_intensity, 'intensity_category': 'high' if avg_intensity > 5 else 'medium' if avg_intensity > 2 else 'low' }, # Liquidity Analysis 'liquidity': { 'avg_consumption_rate': avg_consumption_rate, 'consumption_category': 'high' if avg_consumption_rate > 0.8 else 'medium' if avg_consumption_rate > 0.5 else 'low' }, # Price Impact Analysis 'price_impact': { 'avg_impact': avg_price_impact * 10000, # in basis points 'impact_distribution': impact_distribution, 'impact_category': 'high' if avg_price_impact > 0.005 else 'medium' if avg_price_impact > 0.001 else 'low' }, # Market Maker vs Taker Flow 'maker_taker_flow': { 'buy_aggressive_volume': buy_aggressive_volume, 'sell_aggressive_volume': sell_aggressive_volume, 'buy_pressure': buy_aggressive_volume / (buy_aggressive_volume + sell_aggressive_volume) if (buy_aggressive_volume + sell_aggressive_volume) > 0 else 0.5, 'sell_pressure': sell_aggressive_volume / (buy_aggressive_volume + sell_aggressive_volume) if (buy_aggressive_volume + sell_aggressive_volume) > 0 else 0.5 }, # 24h Volume Statistics (if available) 'volume_stats': self.volume_stats.get(symbol, {}) } except Exception as e: logger.error(f"Error getting enhanced order flow metrics for {symbol}: {e}") return None