1839 lines
82 KiB
Python
1839 lines
82 KiB
Python
"""
|
|
Order Book Analysis Integration (Free Data Sources)
|
|
|
|
This module provides Bookmap-style functionality using free order book data:
|
|
- Current Order Book (COB) analysis using Binance free depth streams
|
|
- Session Volume Profile (SVP) calculated from trade and depth data
|
|
- Order flow detection (sweeps, absorptions, momentum)
|
|
- Real-time order book heatmap generation
|
|
- Level 2 market depth streaming (20 levels via Binance free API)
|
|
|
|
Data is fed to CNN and DQN networks for enhanced trading decisions.
|
|
Uses only free data sources - no paid APIs required.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
import websockets
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from collections import deque, defaultdict
|
|
from dataclasses import dataclass
|
|
from threading import Thread, Lock
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class OrderBookLevel:
|
|
"""Single order book level"""
|
|
price: float
|
|
size: float
|
|
orders: int
|
|
side: str # 'bid' or 'ask'
|
|
timestamp: datetime
|
|
|
|
@dataclass
|
|
class OrderBookSnapshot:
|
|
"""Complete order book snapshot"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
bids: List[OrderBookLevel]
|
|
asks: List[OrderBookLevel]
|
|
spread: float
|
|
mid_price: float
|
|
|
|
@dataclass
|
|
class VolumeProfileLevel:
|
|
"""Volume profile level data"""
|
|
price: float
|
|
volume: float
|
|
buy_volume: float
|
|
sell_volume: float
|
|
trades_count: int
|
|
vwap: float
|
|
|
|
@dataclass
|
|
class OrderFlowSignal:
|
|
"""Order flow signal detection"""
|
|
timestamp: datetime
|
|
signal_type: str # 'sweep', 'absorption', 'iceberg', 'momentum'
|
|
price: float
|
|
volume: float
|
|
confidence: float
|
|
description: str
|
|
|
|
class BookmapIntegration:
|
|
"""
|
|
Order book analysis using free data sources
|
|
|
|
Features:
|
|
- Real-time order book monitoring (Binance free depth@20 levels)
|
|
- Order flow pattern detection
|
|
- Enhanced Session Volume Profile (SVP) analysis
|
|
- Market microstructure metrics
|
|
- CNN/DQN model integration
|
|
- High-frequency order book snapshots for pattern detection
|
|
"""
|
|
|
|
def __init__(self, symbols: List[str] = None):
|
|
self.symbols = symbols or ['ETHUSDT', 'BTCUSDT']
|
|
self.is_streaming = False
|
|
|
|
# Data storage
|
|
self.order_books: Dict[str, OrderBookSnapshot] = {}
|
|
self.order_book_history: Dict[str, deque] = {}
|
|
self.volume_profiles: Dict[str, List[VolumeProfileLevel]] = {}
|
|
self.flow_signals: Dict[str, deque] = {}
|
|
|
|
# Enhanced Session Volume Profile tracking
|
|
self.session_start_time = {} # Track session start for each symbol
|
|
self.session_volume_profiles: Dict[str, List[VolumeProfileLevel]] = {}
|
|
self.price_level_cache: Dict[str, Dict[float, VolumeProfileLevel]] = {}
|
|
|
|
# Heatmap data (10-minute rolling window)
|
|
self.heatmap_window = timedelta(minutes=10)
|
|
self.order_heatmaps: Dict[str, deque] = {}
|
|
|
|
# Market metrics
|
|
self.liquidity_metrics: Dict[str, Dict] = {}
|
|
self.order_book_imbalances: Dict[str, deque] = {}
|
|
|
|
# Enhanced Order Flow Analysis
|
|
self.aggressive_passive_ratios: Dict[str, deque] = {}
|
|
self.trade_size_distributions: Dict[str, deque] = {}
|
|
self.market_maker_taker_flows: Dict[str, deque] = {}
|
|
self.order_flow_intensity: Dict[str, deque] = {}
|
|
self.liquidity_consumption_rates: Dict[str, deque] = {}
|
|
self.price_impact_measurements: Dict[str, deque] = {}
|
|
|
|
# Advanced metrics for institutional vs retail detection
|
|
self.large_order_threshold = 50000 # $50K+ considered institutional
|
|
self.block_trade_threshold = 100000 # $100K+ considered block trades
|
|
self.iceberg_detection_window = 30 # seconds for iceberg detection
|
|
self.trade_clustering_window = 5 # seconds for trade clustering analysis
|
|
|
|
# Free data source optimization
|
|
self.depth_snapshots_per_second = 10 # 100ms updates = 10 per second
|
|
self.trade_aggregation_window = 1.0 # 1 second aggregation
|
|
self.last_trade_aggregation = {}
|
|
|
|
# WebSocket connections
|
|
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
|
self.data_lock = Lock()
|
|
|
|
# Model callbacks
|
|
self.cnn_callbacks: List[Callable] = []
|
|
self.dqn_callbacks: List[Callable] = []
|
|
|
|
# Initialize data structures
|
|
for symbol in self.symbols:
|
|
self.order_book_history[symbol] = deque(maxlen=1000)
|
|
self.order_heatmaps[symbol] = deque(maxlen=600) # 10 min at 1s intervals
|
|
self.flow_signals[symbol] = deque(maxlen=500)
|
|
self.order_book_imbalances[symbol] = deque(maxlen=1000)
|
|
self.session_volume_profiles[symbol] = []
|
|
self.price_level_cache[symbol] = {}
|
|
self.session_start_time[symbol] = datetime.now()
|
|
self.last_trade_aggregation[symbol] = datetime.now()
|
|
|
|
# Enhanced order flow analysis buffers
|
|
self.aggressive_passive_ratios[symbol] = deque(maxlen=300) # 5 minutes at 1s intervals
|
|
self.trade_size_distributions[symbol] = deque(maxlen=1000)
|
|
self.market_maker_taker_flows[symbol] = deque(maxlen=600)
|
|
self.order_flow_intensity[symbol] = deque(maxlen=300)
|
|
self.liquidity_consumption_rates[symbol] = deque(maxlen=300)
|
|
self.price_impact_measurements[symbol] = deque(maxlen=300)
|
|
|
|
self.liquidity_metrics[symbol] = {
|
|
'total_bid_size': 0.0,
|
|
'total_ask_size': 0.0,
|
|
'weighted_mid': 0.0,
|
|
'liquidity_ratio': 1.0,
|
|
'avg_spread_bps': 0.0,
|
|
'volume_weighted_spread': 0.0
|
|
}
|
|
|
|
logger.info(f"Order Book Integration initialized for symbols: {self.symbols}")
|
|
logger.info("Using FREE data sources: Binance WebSocket depth@20 + trades")
|
|
|
|
def add_cnn_callback(self, callback: Callable[[str, Dict], None]):
|
|
"""Add CNN model callback"""
|
|
self.cnn_callbacks.append(callback)
|
|
logger.info(f"Added CNN callback: {len(self.cnn_callbacks)} total")
|
|
|
|
def add_dqn_callback(self, callback: Callable[[str, Dict], None]):
|
|
"""Add DQN model callback"""
|
|
self.dqn_callbacks.append(callback)
|
|
logger.info(f"Added DQN callback: {len(self.dqn_callbacks)} total")
|
|
|
|
async def start_streaming(self):
|
|
"""Start order book data streaming"""
|
|
if self.is_streaming:
|
|
logger.warning("Bookmap streaming already active")
|
|
return
|
|
|
|
self.is_streaming = True
|
|
logger.info("Starting Bookmap order book streaming")
|
|
|
|
# Start streams for each symbol
|
|
for symbol in self.symbols:
|
|
# Order book depth stream (20 levels, 100ms updates)
|
|
depth_task = asyncio.create_task(self._stream_order_book_depth(symbol))
|
|
self.websocket_tasks[f"{symbol}_depth"] = depth_task
|
|
|
|
# Trade stream for order flow analysis
|
|
trade_task = asyncio.create_task(self._stream_trades(symbol))
|
|
self.websocket_tasks[f"{symbol}_trades"] = trade_task
|
|
|
|
# Aggregated trade stream (for larger trades and better order flow analysis)
|
|
agg_trade_task = asyncio.create_task(self._stream_aggregate_trades(symbol))
|
|
self.websocket_tasks[f"{symbol}_aggTrade"] = agg_trade_task
|
|
|
|
# 24hr ticker stream (for volume and statistical analysis)
|
|
ticker_task = asyncio.create_task(self._stream_ticker(symbol))
|
|
self.websocket_tasks[f"{symbol}_ticker"] = ticker_task
|
|
|
|
# Start continuous analysis
|
|
analysis_task = asyncio.create_task(self._continuous_analysis())
|
|
self.websocket_tasks["analysis"] = analysis_task
|
|
|
|
logger.info(f"Started streaming for {len(self.symbols)} symbols")
|
|
|
|
async def stop_streaming(self):
|
|
"""Stop streaming"""
|
|
if not self.is_streaming:
|
|
return
|
|
|
|
logger.info("Stopping Bookmap streaming")
|
|
self.is_streaming = False
|
|
|
|
# Cancel all tasks
|
|
for name, task in self.websocket_tasks.items():
|
|
if not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self.websocket_tasks.clear()
|
|
logger.info("Bookmap streaming stopped")
|
|
|
|
async def _stream_order_book_depth(self, symbol: str):
|
|
"""Stream order book depth data"""
|
|
binance_symbol = symbol.lower()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@depth20@100ms"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"Order book depth connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_depth_update(symbol, data)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing depth for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Depth WebSocket error for {symbol}: {e}")
|
|
if self.is_streaming:
|
|
await asyncio.sleep(2)
|
|
|
|
async def _stream_trades(self, symbol: str):
|
|
"""Stream individual trade data for order flow analysis"""
|
|
binance_symbol = symbol.lower()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@trade"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"Trade stream connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_trade_update(symbol, data)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing trade for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Trade WebSocket error for {symbol}: {e}")
|
|
if self.is_streaming:
|
|
await asyncio.sleep(2)
|
|
|
|
async def _stream_aggregate_trades(self, symbol: str):
|
|
"""Stream aggregated trade data for institutional order flow detection"""
|
|
binance_symbol = symbol.lower()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@aggTrade"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"Aggregate Trade stream connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_aggregate_trade_update(symbol, data)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing aggTrade for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Aggregate Trade WebSocket error for {symbol}: {e}")
|
|
if self.is_streaming:
|
|
await asyncio.sleep(2)
|
|
|
|
async def _stream_ticker(self, symbol: str):
|
|
"""Stream 24hr ticker data for volume analysis"""
|
|
binance_symbol = symbol.lower()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@ticker"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"Ticker stream connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_ticker_update(symbol, data)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing ticker for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ticker WebSocket error for {symbol}: {e}")
|
|
if self.is_streaming:
|
|
await asyncio.sleep(2)
|
|
|
|
async def _process_depth_update(self, symbol: str, data: Dict):
|
|
"""Process order book depth update"""
|
|
try:
|
|
timestamp = datetime.now()
|
|
|
|
# Parse bids and asks
|
|
bids = []
|
|
asks = []
|
|
|
|
for bid_data in data.get('bids', []):
|
|
price = float(bid_data[0])
|
|
size = float(bid_data[1])
|
|
bids.append(OrderBookLevel(
|
|
price=price,
|
|
size=size,
|
|
orders=1,
|
|
side='bid',
|
|
timestamp=timestamp
|
|
))
|
|
|
|
for ask_data in data.get('asks', []):
|
|
price = float(ask_data[0])
|
|
size = float(ask_data[1])
|
|
asks.append(OrderBookLevel(
|
|
price=price,
|
|
size=size,
|
|
orders=1,
|
|
side='ask',
|
|
timestamp=timestamp
|
|
))
|
|
|
|
# Sort levels
|
|
bids.sort(key=lambda x: x.price, reverse=True)
|
|
asks.sort(key=lambda x: x.price)
|
|
|
|
# Calculate spread and mid price
|
|
if bids and asks:
|
|
best_bid = bids[0].price
|
|
best_ask = asks[0].price
|
|
spread = best_ask - best_bid
|
|
mid_price = (best_bid + best_ask) / 2
|
|
else:
|
|
spread = 0.0
|
|
mid_price = 0.0
|
|
|
|
# Create snapshot
|
|
snapshot = OrderBookSnapshot(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
bids=bids,
|
|
asks=asks,
|
|
spread=spread,
|
|
mid_price=mid_price
|
|
)
|
|
|
|
with self.data_lock:
|
|
self.order_books[symbol] = snapshot
|
|
self.order_book_history[symbol].append(snapshot)
|
|
|
|
# Update metrics
|
|
self._update_liquidity_metrics(symbol, snapshot)
|
|
self._calculate_order_book_imbalance(symbol, snapshot)
|
|
self._update_order_heatmap(symbol, snapshot)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing depth update for {symbol}: {e}")
|
|
|
|
async def _process_trade_update(self, symbol: str, data: Dict):
|
|
"""Process individual trade data with enhanced order flow analysis"""
|
|
try:
|
|
timestamp = datetime.fromtimestamp(int(data['T']) / 1000)
|
|
price = float(data['p'])
|
|
quantity = float(data['q'])
|
|
is_buyer_maker = data['m']
|
|
trade_id = data.get('t', '')
|
|
|
|
# Calculate trade value
|
|
trade_value = price * quantity
|
|
|
|
# Enhanced order flow analysis
|
|
await self._analyze_enhanced_order_flow(symbol, timestamp, price, quantity, trade_value, is_buyer_maker, 'individual')
|
|
|
|
# Traditional order flow analysis
|
|
await self._analyze_order_flow(symbol, timestamp, price, quantity, is_buyer_maker)
|
|
|
|
# Update volume profile
|
|
self._update_volume_profile(symbol, price, quantity, is_buyer_maker)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing trade for {symbol}: {e}")
|
|
|
|
async def _process_aggregate_trade_update(self, symbol: str, data: Dict):
|
|
"""Process aggregated trade data for institutional flow detection"""
|
|
try:
|
|
timestamp = datetime.fromtimestamp(int(data['T']) / 1000)
|
|
price = float(data['p'])
|
|
quantity = float(data['q'])
|
|
is_buyer_maker = data['m']
|
|
first_trade_id = data.get('f', '')
|
|
last_trade_id = data.get('l', '')
|
|
|
|
# Calculate trade value and aggregation size
|
|
trade_value = price * quantity
|
|
trade_aggregation_size = int(last_trade_id) - int(first_trade_id) + 1 if first_trade_id and last_trade_id else 1
|
|
|
|
# Enhanced analysis for aggregated trades (institutional detection)
|
|
await self._analyze_enhanced_order_flow(symbol, timestamp, price, quantity, trade_value, is_buyer_maker, 'aggregated', trade_aggregation_size)
|
|
|
|
# Detect large block trades and iceberg orders
|
|
await self._detect_institutional_activity(symbol, timestamp, price, quantity, trade_value, trade_aggregation_size, is_buyer_maker)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing aggregate trade for {symbol}: {e}")
|
|
|
|
async def _process_ticker_update(self, symbol: str, data: Dict):
|
|
"""Process ticker data for volume and statistical analysis"""
|
|
try:
|
|
# Extract relevant ticker data
|
|
volume_24h = float(data.get('v', 0)) # 24hr volume
|
|
quote_volume_24h = float(data.get('q', 0)) # 24hr quote volume
|
|
price_change_24h = float(data.get('P', 0)) # 24hr price change %
|
|
high_24h = float(data.get('h', 0))
|
|
low_24h = float(data.get('l', 0))
|
|
weighted_avg_price = float(data.get('w', 0)) # Weighted average price
|
|
|
|
# Update volume statistics for relative analysis
|
|
self._update_volume_statistics(symbol, volume_24h, quote_volume_24h, weighted_avg_price)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing ticker for {symbol}: {e}")
|
|
|
|
def _update_liquidity_metrics(self, symbol: str, snapshot: OrderBookSnapshot):
|
|
"""Update liquidity metrics"""
|
|
try:
|
|
total_bid_size = sum(level.size for level in snapshot.bids)
|
|
total_ask_size = sum(level.size for level in snapshot.asks)
|
|
|
|
# Weighted mid price
|
|
if snapshot.bids and snapshot.asks:
|
|
bid_weight = total_bid_size / (total_bid_size + total_ask_size)
|
|
ask_weight = total_ask_size / (total_bid_size + total_ask_size)
|
|
weighted_mid = (snapshot.bids[0].price * ask_weight +
|
|
snapshot.asks[0].price * bid_weight)
|
|
else:
|
|
weighted_mid = snapshot.mid_price
|
|
|
|
# Liquidity ratio
|
|
liquidity_ratio = total_bid_size / total_ask_size if total_ask_size > 0 else 1.0
|
|
|
|
self.liquidity_metrics[symbol] = {
|
|
'total_bid_size': total_bid_size,
|
|
'total_ask_size': total_ask_size,
|
|
'weighted_mid': weighted_mid,
|
|
'liquidity_ratio': liquidity_ratio,
|
|
'spread_bps': (snapshot.spread / snapshot.mid_price) * 10000 if snapshot.mid_price > 0 else 0
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating liquidity metrics for {symbol}: {e}")
|
|
|
|
def _calculate_order_book_imbalance(self, symbol: str, snapshot: OrderBookSnapshot):
|
|
"""Calculate order book imbalance"""
|
|
try:
|
|
if not snapshot.bids or not snapshot.asks:
|
|
return
|
|
|
|
# Top 5 levels imbalance
|
|
n_levels = min(5, len(snapshot.bids), len(snapshot.asks))
|
|
|
|
total_bid_size = sum(snapshot.bids[i].size for i in range(n_levels))
|
|
total_ask_size = sum(snapshot.asks[i].size for i in range(n_levels))
|
|
|
|
if total_bid_size + total_ask_size > 0:
|
|
imbalance = (total_bid_size - total_ask_size) / (total_bid_size + total_ask_size)
|
|
else:
|
|
imbalance = 0.0
|
|
|
|
self.order_book_imbalances[symbol].append({
|
|
'timestamp': snapshot.timestamp,
|
|
'imbalance': imbalance,
|
|
'bid_size': total_bid_size,
|
|
'ask_size': total_ask_size
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating imbalance for {symbol}: {e}")
|
|
|
|
def _update_order_heatmap(self, symbol: str, snapshot: OrderBookSnapshot):
|
|
"""Update order heatmap matrix"""
|
|
try:
|
|
heatmap_entry = {
|
|
'timestamp': snapshot.timestamp,
|
|
'mid_price': snapshot.mid_price,
|
|
'levels': {}
|
|
}
|
|
|
|
# Add bid levels
|
|
for level in snapshot.bids:
|
|
price_offset = level.price - snapshot.mid_price
|
|
heatmap_entry['levels'][price_offset] = {
|
|
'side': 'bid',
|
|
'size': level.size,
|
|
'price': level.price
|
|
}
|
|
|
|
# Add ask levels
|
|
for level in snapshot.asks:
|
|
price_offset = level.price - snapshot.mid_price
|
|
heatmap_entry['levels'][price_offset] = {
|
|
'side': 'ask',
|
|
'size': level.size,
|
|
'price': level.price
|
|
}
|
|
|
|
self.order_heatmaps[symbol].append(heatmap_entry)
|
|
|
|
# Clean old entries
|
|
cutoff_time = snapshot.timestamp - self.heatmap_window
|
|
while (self.order_heatmaps[symbol] and
|
|
self.order_heatmaps[symbol][0]['timestamp'] < cutoff_time):
|
|
self.order_heatmaps[symbol].popleft()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating heatmap for {symbol}: {e}")
|
|
|
|
def _update_volume_profile(self, symbol: str, price: float, quantity: float, is_buyer_maker: bool):
|
|
"""Enhanced Session Volume Profile (SVP) update using free data"""
|
|
try:
|
|
# Calculate trade volume in USDT
|
|
volume = price * quantity
|
|
|
|
# Use price level caching for better performance
|
|
price_key = round(price, 2) # Round to 2 decimal places for price level grouping
|
|
|
|
# Update session volume profile
|
|
if price_key not in self.price_level_cache[symbol]:
|
|
self.price_level_cache[symbol][price_key] = VolumeProfileLevel(
|
|
price=price_key,
|
|
volume=0.0,
|
|
buy_volume=0.0,
|
|
sell_volume=0.0,
|
|
trades_count=0,
|
|
vwap=price
|
|
)
|
|
|
|
level = self.price_level_cache[symbol][price_key]
|
|
old_total_volume = level.volume
|
|
old_total_quantity = level.trades_count
|
|
|
|
# Update volume metrics
|
|
level.volume += volume
|
|
level.trades_count += 1
|
|
|
|
# Update buy/sell volume breakdown
|
|
if is_buyer_maker:
|
|
level.sell_volume += volume # Market maker is selling
|
|
else:
|
|
level.buy_volume += volume # Market maker is buying
|
|
|
|
# Calculate Volume Weighted Average Price (VWAP) for this level
|
|
if level.volume > 0:
|
|
level.vwap = ((level.vwap * old_total_volume) + (price * volume)) / level.volume
|
|
|
|
# Also update the rolling volume profile (last 10 minutes)
|
|
self._update_rolling_volume_profile(symbol, price_key, volume, is_buyer_maker)
|
|
|
|
# Session reset detection (every 24 hours or major price gaps)
|
|
current_time = datetime.now()
|
|
if self._should_reset_session(symbol, current_time, price):
|
|
self._reset_session_volume_profile(symbol, current_time)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating Session Volume Profile for {symbol}: {e}")
|
|
|
|
def _update_rolling_volume_profile(self, symbol: str, price_key: float, volume: float, is_buyer_maker: bool):
|
|
"""Update rolling 10-minute volume profile for real-time heatmap"""
|
|
try:
|
|
# Find or create level in regular volume profile
|
|
price_level = None
|
|
for level in self.volume_profiles.get(symbol, []):
|
|
if abs(level.price - price_key) < 0.01:
|
|
price_level = level
|
|
break
|
|
|
|
if not price_level:
|
|
if symbol not in self.volume_profiles:
|
|
self.volume_profiles[symbol] = []
|
|
|
|
price_level = VolumeProfileLevel(
|
|
price=price_key,
|
|
volume=0.0,
|
|
buy_volume=0.0,
|
|
sell_volume=0.0,
|
|
trades_count=0,
|
|
vwap=price_key
|
|
)
|
|
self.volume_profiles[symbol].append(price_level)
|
|
|
|
# Update rolling metrics
|
|
old_volume = price_level.volume
|
|
price_level.volume += volume
|
|
price_level.trades_count += 1
|
|
|
|
if is_buyer_maker:
|
|
price_level.sell_volume += volume
|
|
else:
|
|
price_level.buy_volume += volume
|
|
|
|
# Update VWAP
|
|
if price_level.volume > 0:
|
|
price_level.vwap = ((price_level.vwap * old_volume) + (price_key * volume)) / price_level.volume
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating rolling volume profile for {symbol}: {e}")
|
|
|
|
def _should_reset_session(self, symbol: str, current_time: datetime, current_price: float) -> bool:
|
|
"""Determine if session volume profile should be reset"""
|
|
try:
|
|
session_start = self.session_start_time.get(symbol)
|
|
if not session_start:
|
|
return False
|
|
|
|
# Reset every 24 hours (daily session)
|
|
if (current_time - session_start).total_seconds() > 86400: # 24 hours
|
|
return True
|
|
|
|
# Reset on major price gaps (> 5% from session VWAP)
|
|
if self.price_level_cache.get(symbol):
|
|
total_volume = sum(level.volume for level in self.price_level_cache[symbol].values())
|
|
if total_volume > 0:
|
|
weighted_price = sum(level.vwap * level.volume for level in self.price_level_cache[symbol].values()) / total_volume
|
|
price_gap = abs(current_price - weighted_price) / weighted_price
|
|
if price_gap > 0.05: # 5% gap
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking session reset for {symbol}: {e}")
|
|
return False
|
|
|
|
def _reset_session_volume_profile(self, symbol: str, reset_time: datetime):
|
|
"""Reset session volume profile"""
|
|
try:
|
|
logger.info(f"Resetting session volume profile for {symbol}")
|
|
self.session_start_time[symbol] = reset_time
|
|
self.price_level_cache[symbol] = {}
|
|
self.session_volume_profiles[symbol] = []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error resetting session volume profile for {symbol}: {e}")
|
|
|
|
async def _analyze_order_flow(self, symbol: str, timestamp: datetime, price: float,
|
|
quantity: float, is_buyer_maker: bool):
|
|
"""Analyze order flow patterns"""
|
|
try:
|
|
if symbol not in self.order_book_history or not self.order_book_history[symbol]:
|
|
return
|
|
|
|
recent_snapshots = list(self.order_book_history[symbol])[-10:]
|
|
|
|
# Check for sweeps
|
|
sweep_signal = self._detect_order_sweep(symbol, recent_snapshots, price, quantity, is_buyer_maker)
|
|
if sweep_signal:
|
|
self.flow_signals[symbol].append(sweep_signal)
|
|
await self._notify_flow_signal(symbol, sweep_signal)
|
|
|
|
# Check for absorption
|
|
absorption_signal = self._detect_absorption(symbol, recent_snapshots, price, quantity)
|
|
if absorption_signal:
|
|
self.flow_signals[symbol].append(absorption_signal)
|
|
await self._notify_flow_signal(symbol, absorption_signal)
|
|
|
|
# Check for momentum
|
|
momentum_signal = self._detect_momentum_trade(symbol, price, quantity, is_buyer_maker)
|
|
if momentum_signal:
|
|
self.flow_signals[symbol].append(momentum_signal)
|
|
await self._notify_flow_signal(symbol, momentum_signal)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing order flow for {symbol}: {e}")
|
|
|
|
async def _analyze_enhanced_order_flow(self, symbol: str, timestamp: datetime, price: float,
|
|
quantity: float, trade_value: float, is_buyer_maker: bool,
|
|
trade_type: str, aggregation_size: int = 1):
|
|
"""Enhanced order flow analysis with aggressive vs passive ratios"""
|
|
try:
|
|
# Determine if trade is aggressive (taker) or passive (maker)
|
|
is_aggressive = not is_buyer_maker # In Binance data, m=false means buyer is taker (aggressive)
|
|
|
|
# Calculate aggressive vs passive ratios
|
|
self._update_aggressive_passive_ratio(symbol, timestamp, trade_value, is_aggressive)
|
|
|
|
# Update trade size distribution
|
|
self._update_trade_size_distribution(symbol, timestamp, trade_value, trade_type)
|
|
|
|
# Update market maker vs taker flow
|
|
self._update_market_maker_taker_flow(symbol, timestamp, trade_value, is_buyer_maker, is_aggressive)
|
|
|
|
# Calculate order flow intensity
|
|
self._update_order_flow_intensity(symbol, timestamp, trade_value, aggregation_size)
|
|
|
|
# Measure liquidity consumption
|
|
await self._measure_liquidity_consumption(symbol, timestamp, price, quantity, trade_value, is_aggressive)
|
|
|
|
# Measure price impact
|
|
await self._measure_price_impact(symbol, timestamp, price, trade_value, is_aggressive)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in enhanced order flow analysis for {symbol}: {e}")
|
|
|
|
def _update_aggressive_passive_ratio(self, symbol: str, timestamp: datetime, trade_value: float, is_aggressive: bool):
|
|
"""Update aggressive vs passive participant ratios"""
|
|
try:
|
|
current_window = []
|
|
cutoff_time = timestamp - timedelta(seconds=60) # 1-minute window
|
|
|
|
# Filter recent trades within window
|
|
for entry in self.aggressive_passive_ratios[symbol]:
|
|
if entry['timestamp'] > cutoff_time:
|
|
current_window.append(entry)
|
|
|
|
# Add current trade
|
|
current_window.append({
|
|
'timestamp': timestamp,
|
|
'trade_value': trade_value,
|
|
'is_aggressive': is_aggressive
|
|
})
|
|
|
|
# Calculate ratios
|
|
aggressive_volume = sum(t['trade_value'] for t in current_window if t['is_aggressive'])
|
|
passive_volume = sum(t['trade_value'] for t in current_window if not t['is_aggressive'])
|
|
total_volume = aggressive_volume + passive_volume
|
|
|
|
if total_volume > 0:
|
|
aggressive_ratio = aggressive_volume / total_volume
|
|
passive_ratio = passive_volume / total_volume
|
|
|
|
ratio_data = {
|
|
'timestamp': timestamp,
|
|
'aggressive_ratio': aggressive_ratio,
|
|
'passive_ratio': passive_ratio,
|
|
'aggressive_volume': aggressive_volume,
|
|
'passive_volume': passive_volume,
|
|
'total_volume': total_volume,
|
|
'trade_count': len(current_window),
|
|
'avg_aggressive_size': aggressive_volume / max(1, sum(1 for t in current_window if t['is_aggressive'])),
|
|
'avg_passive_size': passive_volume / max(1, sum(1 for t in current_window if not t['is_aggressive']))
|
|
}
|
|
|
|
# Update buffer
|
|
self.aggressive_passive_ratios[symbol].clear()
|
|
self.aggressive_passive_ratios[symbol].extend(current_window)
|
|
|
|
# Store calculated ratios for use by models
|
|
if not hasattr(self, 'current_flow_ratios'):
|
|
self.current_flow_ratios = {}
|
|
self.current_flow_ratios[symbol] = ratio_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating aggressive/passive ratio for {symbol}: {e}")
|
|
|
|
def _update_trade_size_distribution(self, symbol: str, timestamp: datetime, trade_value: float, trade_type: str):
|
|
"""Update trade size distribution for institutional vs retail detection"""
|
|
try:
|
|
# Classify trade size
|
|
if trade_value < 1000:
|
|
size_category = 'micro' # < $1K (retail)
|
|
elif trade_value < 10000:
|
|
size_category = 'small' # $1K-$10K (retail/small institutional)
|
|
elif trade_value < 50000:
|
|
size_category = 'medium' # $10K-$50K (institutional)
|
|
elif trade_value < 100000:
|
|
size_category = 'large' # $50K-$100K (large institutional)
|
|
else:
|
|
size_category = 'block' # > $100K (block trades)
|
|
|
|
trade_data = {
|
|
'timestamp': timestamp,
|
|
'trade_value': trade_value,
|
|
'trade_type': trade_type,
|
|
'size_category': size_category,
|
|
'is_institutional': trade_value >= self.large_order_threshold,
|
|
'is_block_trade': trade_value >= self.block_trade_threshold
|
|
}
|
|
|
|
self.trade_size_distributions[symbol].append(trade_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating trade size distribution for {symbol}: {e}")
|
|
|
|
def _update_market_maker_taker_flow(self, symbol: str, timestamp: datetime, trade_value: float,
|
|
is_buyer_maker: bool, is_aggressive: bool):
|
|
"""Update market maker vs taker flow analysis"""
|
|
try:
|
|
flow_data = {
|
|
'timestamp': timestamp,
|
|
'trade_value': trade_value,
|
|
'is_buyer_maker': is_buyer_maker,
|
|
'is_aggressive': is_aggressive,
|
|
'flow_direction': 'buy_aggressive' if not is_buyer_maker else 'sell_aggressive',
|
|
'market_maker_side': 'sell' if is_buyer_maker else 'buy'
|
|
}
|
|
|
|
self.market_maker_taker_flows[symbol].append(flow_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating market maker/taker flow for {symbol}: {e}")
|
|
|
|
def _update_order_flow_intensity(self, symbol: str, timestamp: datetime, trade_value: float, aggregation_size: int):
|
|
"""Calculate order flow intensity based on trade frequency and size"""
|
|
try:
|
|
# Calculate intensity based on trade value and aggregation
|
|
base_intensity = trade_value / 10000 # Normalize by $10K
|
|
aggregation_intensity = aggregation_size / 10 # Normalize aggregation factor
|
|
|
|
# Time-based intensity (trades per second)
|
|
recent_trades = [t for t in self.order_flow_intensity[symbol]
|
|
if (timestamp - t['timestamp']).total_seconds() < 10]
|
|
time_intensity = len(recent_trades) / 10 # Trades per second over 10s window
|
|
|
|
intensity_score = base_intensity * (1 + aggregation_intensity) * (1 + time_intensity)
|
|
|
|
intensity_data = {
|
|
'timestamp': timestamp,
|
|
'intensity_score': intensity_score,
|
|
'base_intensity': base_intensity,
|
|
'aggregation_intensity': aggregation_intensity,
|
|
'time_intensity': time_intensity,
|
|
'trade_value': trade_value,
|
|
'aggregation_size': aggregation_size
|
|
}
|
|
|
|
self.order_flow_intensity[symbol].append(intensity_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating order flow intensity for {symbol}: {e}")
|
|
|
|
async def _measure_liquidity_consumption(self, symbol: str, timestamp: datetime, price: float,
|
|
quantity: float, trade_value: float, is_aggressive: bool):
|
|
"""Measure liquidity consumption rates"""
|
|
try:
|
|
if not is_aggressive:
|
|
return # Only measure for aggressive trades
|
|
|
|
current_snapshot = self.order_books.get(symbol)
|
|
if not current_snapshot:
|
|
return
|
|
|
|
# Calculate how much liquidity was consumed
|
|
if price >= current_snapshot.mid_price: # Buy-side consumption
|
|
consumed_liquidity = 0
|
|
for ask_level in current_snapshot.asks:
|
|
if ask_level.price <= price:
|
|
consumed_liquidity += min(ask_level.size, quantity) * ask_level.price
|
|
quantity -= ask_level.size
|
|
if quantity <= 0:
|
|
break
|
|
else: # Sell-side consumption
|
|
consumed_liquidity = 0
|
|
for bid_level in current_snapshot.bids:
|
|
if bid_level.price >= price:
|
|
consumed_liquidity += min(bid_level.size, quantity) * bid_level.price
|
|
quantity -= bid_level.size
|
|
if quantity <= 0:
|
|
break
|
|
|
|
consumption_rate = consumed_liquidity / trade_value if trade_value > 0 else 0
|
|
|
|
consumption_data = {
|
|
'timestamp': timestamp,
|
|
'price': price,
|
|
'trade_value': trade_value,
|
|
'consumed_liquidity': consumed_liquidity,
|
|
'consumption_rate': consumption_rate,
|
|
'side': 'buy' if price >= current_snapshot.mid_price else 'sell'
|
|
}
|
|
|
|
self.liquidity_consumption_rates[symbol].append(consumption_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error measuring liquidity consumption for {symbol}: {e}")
|
|
|
|
async def _measure_price_impact(self, symbol: str, timestamp: datetime, price: float,
|
|
trade_value: float, is_aggressive: bool):
|
|
"""Measure price impact of trades"""
|
|
try:
|
|
if not is_aggressive:
|
|
return
|
|
|
|
# Get price before and after (approximated by looking at recent snapshots)
|
|
recent_snapshots = list(self.order_book_history[symbol])[-5:]
|
|
if len(recent_snapshots) < 2:
|
|
return
|
|
|
|
price_before = recent_snapshots[-2].mid_price
|
|
price_after = recent_snapshots[-1].mid_price
|
|
|
|
price_impact = abs(price_after - price_before) / price_before if price_before > 0 else 0
|
|
impact_per_dollar = price_impact / (trade_value / 1000000) if trade_value > 0 else 0 # Impact per $1M
|
|
|
|
impact_data = {
|
|
'timestamp': timestamp,
|
|
'trade_price': price,
|
|
'trade_value': trade_value,
|
|
'price_before': price_before,
|
|
'price_after': price_after,
|
|
'price_impact': price_impact,
|
|
'impact_per_million': impact_per_dollar,
|
|
'impact_category': self._categorize_impact(price_impact)
|
|
}
|
|
|
|
self.price_impact_measurements[symbol].append(impact_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error measuring price impact for {symbol}: {e}")
|
|
|
|
def _categorize_impact(self, price_impact: float) -> str:
|
|
"""Categorize price impact level"""
|
|
if price_impact < 0.0001: # < 0.01%
|
|
return 'minimal'
|
|
elif price_impact < 0.001: # < 0.1%
|
|
return 'low'
|
|
elif price_impact < 0.005: # < 0.5%
|
|
return 'medium'
|
|
elif price_impact < 0.01: # < 1%
|
|
return 'high'
|
|
else:
|
|
return 'extreme'
|
|
|
|
async def _detect_institutional_activity(self, symbol: str, timestamp: datetime, price: float,
|
|
quantity: float, trade_value: float, aggregation_size: int,
|
|
is_buyer_maker: bool):
|
|
"""Detect institutional trading activity patterns"""
|
|
try:
|
|
# Block trade detection
|
|
if trade_value >= self.block_trade_threshold:
|
|
signal = OrderFlowSignal(
|
|
timestamp=timestamp,
|
|
signal_type='block_trade',
|
|
price=price,
|
|
volume=trade_value,
|
|
confidence=min(0.95, trade_value / 500000), # Higher confidence for larger trades
|
|
description=f"Block trade: ${trade_value:.0f} ({'Buy' if not is_buyer_maker else 'Sell'})"
|
|
)
|
|
self.flow_signals[symbol].append(signal)
|
|
await self._notify_flow_signal(symbol, signal)
|
|
|
|
# Iceberg order detection (multiple large aggregated trades in sequence)
|
|
await self._detect_iceberg_orders(symbol, timestamp, price, trade_value, aggregation_size, is_buyer_maker)
|
|
|
|
# High-frequency activity detection
|
|
await self._detect_hft_activity(symbol, timestamp, trade_value, aggregation_size)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting institutional activity for {symbol}: {e}")
|
|
|
|
async def _detect_iceberg_orders(self, symbol: str, timestamp: datetime, price: float,
|
|
trade_value: float, aggregation_size: int, is_buyer_maker: bool):
|
|
"""Detect iceberg order patterns"""
|
|
try:
|
|
if trade_value < self.large_order_threshold:
|
|
return
|
|
|
|
# Look for similar-sized trades in recent history
|
|
cutoff_time = timestamp - timedelta(seconds=self.iceberg_detection_window)
|
|
recent_large_trades = []
|
|
|
|
for trade_data in self.trade_size_distributions[symbol]:
|
|
if (trade_data['timestamp'] > cutoff_time and
|
|
trade_data['trade_value'] >= self.large_order_threshold):
|
|
recent_large_trades.append(trade_data)
|
|
|
|
# Iceberg pattern: 3+ large trades with similar sizes
|
|
if len(recent_large_trades) >= 3:
|
|
avg_size = sum(t['trade_value'] for t in recent_large_trades) / len(recent_large_trades)
|
|
size_consistency = all(abs(t['trade_value'] - avg_size) / avg_size < 0.2 for t in recent_large_trades)
|
|
|
|
if size_consistency:
|
|
total_iceberg_volume = sum(t['trade_value'] for t in recent_large_trades)
|
|
confidence = min(0.9, len(recent_large_trades) / 10 + total_iceberg_volume / 1000000)
|
|
|
|
signal = OrderFlowSignal(
|
|
timestamp=timestamp,
|
|
signal_type='iceberg',
|
|
price=price,
|
|
volume=total_iceberg_volume,
|
|
confidence=confidence,
|
|
description=f"Iceberg: {len(recent_large_trades)} trades, ${total_iceberg_volume:.0f} total"
|
|
)
|
|
self.flow_signals[symbol].append(signal)
|
|
await self._notify_flow_signal(symbol, signal)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting iceberg orders for {symbol}: {e}")
|
|
|
|
async def _detect_hft_activity(self, symbol: str, timestamp: datetime, trade_value: float, aggregation_size: int):
|
|
"""Detect high-frequency trading activity"""
|
|
try:
|
|
# Look for high-frequency patterns (many small trades in rapid succession)
|
|
cutoff_time = timestamp - timedelta(seconds=5)
|
|
recent_trades = [t for t in self.order_flow_intensity[symbol] if t['timestamp'] > cutoff_time]
|
|
|
|
if len(recent_trades) >= 20: # 20+ trades in 5 seconds
|
|
avg_trade_size = sum(t['trade_value'] for t in recent_trades) / len(recent_trades)
|
|
|
|
if avg_trade_size < 5000: # Small average trade size suggests HFT
|
|
total_hft_volume = sum(t['trade_value'] for t in recent_trades)
|
|
confidence = min(0.8, len(recent_trades) / 50)
|
|
|
|
signal = OrderFlowSignal(
|
|
timestamp=timestamp,
|
|
signal_type='hft_activity',
|
|
price=0, # Multiple prices
|
|
volume=total_hft_volume,
|
|
confidence=confidence,
|
|
description=f"HFT: {len(recent_trades)} trades in 5s, avg ${avg_trade_size:.0f}"
|
|
)
|
|
self.flow_signals[symbol].append(signal)
|
|
await self._notify_flow_signal(symbol, signal)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting HFT activity for {symbol}: {e}")
|
|
|
|
def _update_volume_statistics(self, symbol: str, volume_24h: float, quote_volume_24h: float, weighted_avg_price: float):
|
|
"""Update volume statistics for relative analysis"""
|
|
try:
|
|
# Store 24h volume data for relative comparisons
|
|
if not hasattr(self, 'volume_stats'):
|
|
self.volume_stats = {}
|
|
|
|
self.volume_stats[symbol] = {
|
|
'volume_24h': volume_24h,
|
|
'quote_volume_24h': quote_volume_24h,
|
|
'weighted_avg_price': weighted_avg_price,
|
|
'timestamp': datetime.now()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating volume statistics for {symbol}: {e}")
|
|
|
|
def _detect_order_sweep(self, symbol: str, snapshots: List[OrderBookSnapshot],
|
|
price: float, quantity: float, is_buyer_maker: bool) -> Optional[OrderFlowSignal]:
|
|
"""Detect order book sweeps"""
|
|
try:
|
|
if len(snapshots) < 2:
|
|
return None
|
|
|
|
before_snapshot = snapshots[-2]
|
|
|
|
if is_buyer_maker: # Sell order, check ask side
|
|
levels_consumed = 0
|
|
total_consumed_size = 0
|
|
|
|
for level in before_snapshot.asks[:5]:
|
|
if level.price <= price:
|
|
levels_consumed += 1
|
|
total_consumed_size += level.size
|
|
|
|
if levels_consumed >= 2 and total_consumed_size > quantity * 1.5:
|
|
confidence = min(0.9, levels_consumed / 5.0 + 0.3)
|
|
|
|
return OrderFlowSignal(
|
|
timestamp=datetime.now(),
|
|
signal_type='sweep',
|
|
price=price,
|
|
volume=quantity * price,
|
|
confidence=confidence,
|
|
description=f"Sell sweep: {levels_consumed} levels"
|
|
)
|
|
else: # Buy order, check bid side
|
|
levels_consumed = 0
|
|
total_consumed_size = 0
|
|
|
|
for level in before_snapshot.bids[:5]:
|
|
if level.price >= price:
|
|
levels_consumed += 1
|
|
total_consumed_size += level.size
|
|
|
|
if levels_consumed >= 2 and total_consumed_size > quantity * 1.5:
|
|
confidence = min(0.9, levels_consumed / 5.0 + 0.3)
|
|
|
|
return OrderFlowSignal(
|
|
timestamp=datetime.now(),
|
|
signal_type='sweep',
|
|
price=price,
|
|
volume=quantity * price,
|
|
confidence=confidence,
|
|
description=f"Buy sweep: {levels_consumed} levels"
|
|
)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting sweep for {symbol}: {e}")
|
|
return None
|
|
|
|
def _detect_absorption(self, symbol: str, snapshots: List[OrderBookSnapshot],
|
|
price: float, quantity: float) -> Optional[OrderFlowSignal]:
|
|
"""Detect absorption patterns"""
|
|
try:
|
|
if len(snapshots) < 3:
|
|
return None
|
|
|
|
volume_threshold = 10000 # $10K minimum
|
|
price_impact_threshold = 0.001 # 0.1% max impact
|
|
|
|
trade_value = price * quantity
|
|
if trade_value < volume_threshold:
|
|
return None
|
|
|
|
# Calculate price impact
|
|
price_before = snapshots[-3].mid_price
|
|
price_after = snapshots[-1].mid_price
|
|
price_impact = abs(price_after - price_before) / price_before
|
|
|
|
if price_impact < price_impact_threshold:
|
|
confidence = min(0.8, (trade_value / 50000) * 0.5 + 0.3)
|
|
|
|
return OrderFlowSignal(
|
|
timestamp=datetime.now(),
|
|
signal_type='absorption',
|
|
price=price,
|
|
volume=trade_value,
|
|
confidence=confidence,
|
|
description=f"Absorption: ${trade_value:.0f}"
|
|
)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting absorption for {symbol}: {e}")
|
|
return None
|
|
|
|
def _detect_momentum_trade(self, symbol: str, price: float, quantity: float,
|
|
is_buyer_maker: bool) -> Optional[OrderFlowSignal]:
|
|
"""Detect momentum trades"""
|
|
try:
|
|
trade_value = price * quantity
|
|
momentum_threshold = 25000 # $25K minimum
|
|
|
|
if trade_value < momentum_threshold:
|
|
return None
|
|
|
|
confidence = min(0.9, trade_value / 100000 * 0.6 + 0.3)
|
|
direction = "sell" if is_buyer_maker else "buy"
|
|
|
|
return OrderFlowSignal(
|
|
timestamp=datetime.now(),
|
|
signal_type='momentum',
|
|
price=price,
|
|
volume=trade_value,
|
|
confidence=confidence,
|
|
description=f"Large {direction}: ${trade_value:.0f}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error detecting momentum for {symbol}: {e}")
|
|
return None
|
|
|
|
async def _notify_flow_signal(self, symbol: str, signal: OrderFlowSignal):
|
|
"""Notify models of flow signals"""
|
|
try:
|
|
signal_data = {
|
|
'signal_type': signal.signal_type,
|
|
'price': signal.price,
|
|
'volume': signal.volume,
|
|
'confidence': signal.confidence,
|
|
'timestamp': signal.timestamp,
|
|
'description': signal.description
|
|
}
|
|
|
|
# Notify CNN callbacks
|
|
for callback in self.cnn_callbacks:
|
|
try:
|
|
callback(symbol, signal_data)
|
|
except Exception as e:
|
|
logger.warning(f"Error in CNN callback: {e}")
|
|
|
|
# Notify DQN callbacks
|
|
for callback in self.dqn_callbacks:
|
|
try:
|
|
callback(symbol, signal_data)
|
|
except Exception as e:
|
|
logger.warning(f"Error in DQN callback: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error notifying flow signal: {e}")
|
|
|
|
async def _continuous_analysis(self):
|
|
"""Continuous analysis and model feeding"""
|
|
while self.is_streaming:
|
|
try:
|
|
await asyncio.sleep(1) # Analyze every second
|
|
|
|
for symbol in self.symbols:
|
|
# Generate features for models
|
|
cnn_features = self.get_cnn_features(symbol)
|
|
if cnn_features is not None:
|
|
for callback in self.cnn_callbacks:
|
|
try:
|
|
callback(symbol, {'features': cnn_features, 'type': 'orderbook'})
|
|
except Exception as e:
|
|
logger.warning(f"Error in CNN feature callback: {e}")
|
|
|
|
dqn_features = self.get_dqn_state_features(symbol)
|
|
if dqn_features is not None:
|
|
for callback in self.dqn_callbacks:
|
|
try:
|
|
callback(symbol, {'state': dqn_features, 'type': 'orderbook'})
|
|
except Exception as e:
|
|
logger.warning(f"Error in DQN state callback: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in continuous analysis: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
def get_cnn_features(self, symbol: str) -> Optional[np.ndarray]:
|
|
"""Generate CNN features from order book data"""
|
|
try:
|
|
if symbol not in self.order_books:
|
|
return None
|
|
|
|
snapshot = self.order_books[symbol]
|
|
features = []
|
|
|
|
# Order book features (80 features: 20 levels x 2 sides x 2 values)
|
|
for i in range(min(20, len(snapshot.bids))):
|
|
bid = snapshot.bids[i]
|
|
features.append(bid.size)
|
|
features.append(bid.price - snapshot.mid_price)
|
|
|
|
# Pad bids
|
|
while len(features) < 40:
|
|
features.extend([0.0, 0.0])
|
|
|
|
for i in range(min(20, len(snapshot.asks))):
|
|
ask = snapshot.asks[i]
|
|
features.append(ask.size)
|
|
features.append(ask.price - snapshot.mid_price)
|
|
|
|
# Pad asks
|
|
while len(features) < 80:
|
|
features.extend([0.0, 0.0])
|
|
|
|
# Liquidity metrics (10 features)
|
|
metrics = self.liquidity_metrics.get(symbol, {})
|
|
features.extend([
|
|
metrics.get('total_bid_size', 0.0),
|
|
metrics.get('total_ask_size', 0.0),
|
|
metrics.get('liquidity_ratio', 1.0),
|
|
metrics.get('spread_bps', 0.0),
|
|
snapshot.spread,
|
|
metrics.get('weighted_mid', snapshot.mid_price) - snapshot.mid_price,
|
|
len(snapshot.bids),
|
|
len(snapshot.asks),
|
|
snapshot.mid_price,
|
|
time.time() % 86400 # Time of day
|
|
])
|
|
|
|
# Order book imbalance (5 features)
|
|
if self.order_book_imbalances[symbol]:
|
|
latest_imbalance = self.order_book_imbalances[symbol][-1]
|
|
features.extend([
|
|
latest_imbalance['imbalance'],
|
|
latest_imbalance['bid_size'],
|
|
latest_imbalance['ask_size'],
|
|
latest_imbalance['bid_size'] + latest_imbalance['ask_size'],
|
|
abs(latest_imbalance['imbalance'])
|
|
])
|
|
else:
|
|
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
|
|
|
|
# Enhanced flow signals (15 features)
|
|
recent_signals = [s for s in self.flow_signals[symbol]
|
|
if (datetime.now() - s.timestamp).seconds < 60]
|
|
|
|
sweep_count = sum(1 for s in recent_signals if s.signal_type == 'sweep')
|
|
absorption_count = sum(1 for s in recent_signals if s.signal_type == 'absorption')
|
|
momentum_count = sum(1 for s in recent_signals if s.signal_type == 'momentum')
|
|
block_count = sum(1 for s in recent_signals if s.signal_type == 'block_trade')
|
|
iceberg_count = sum(1 for s in recent_signals if s.signal_type == 'iceberg')
|
|
hft_count = sum(1 for s in recent_signals if s.signal_type == 'hft_activity')
|
|
max_confidence = max([s.confidence for s in recent_signals], default=0.0)
|
|
total_flow_volume = sum(s.volume for s in recent_signals)
|
|
|
|
# Enhanced order flow metrics
|
|
flow_metrics = self.get_enhanced_order_flow_metrics(symbol)
|
|
if flow_metrics:
|
|
aggressive_ratio = flow_metrics['aggressive_passive']['aggressive_ratio']
|
|
institutional_ratio = flow_metrics['institutional_retail']['institutional_ratio']
|
|
flow_intensity = flow_metrics['flow_intensity']['current_intensity']
|
|
avg_consumption_rate = flow_metrics['liquidity']['avg_consumption_rate']
|
|
avg_price_impact = flow_metrics['price_impact']['avg_impact'] / 10000 # Normalize from basis points
|
|
buy_pressure = flow_metrics['maker_taker_flow']['buy_pressure']
|
|
sell_pressure = flow_metrics['maker_taker_flow']['sell_pressure']
|
|
else:
|
|
aggressive_ratio = 0.5
|
|
institutional_ratio = 0.5
|
|
flow_intensity = 0.0
|
|
avg_consumption_rate = 0.0
|
|
avg_price_impact = 0.0
|
|
buy_pressure = 0.5
|
|
sell_pressure = 0.5
|
|
|
|
features.extend([
|
|
sweep_count,
|
|
absorption_count,
|
|
momentum_count,
|
|
block_count,
|
|
iceberg_count,
|
|
hft_count,
|
|
max_confidence,
|
|
total_flow_volume,
|
|
aggressive_ratio,
|
|
institutional_ratio,
|
|
flow_intensity,
|
|
avg_consumption_rate,
|
|
avg_price_impact,
|
|
buy_pressure,
|
|
sell_pressure
|
|
])
|
|
|
|
return np.array(features, dtype=np.float32)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating CNN features for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_dqn_state_features(self, symbol: str) -> Optional[np.ndarray]:
|
|
"""Generate DQN state features"""
|
|
try:
|
|
if symbol not in self.order_books:
|
|
return None
|
|
|
|
snapshot = self.order_books[symbol]
|
|
state_features = []
|
|
|
|
# Normalized order book state (20 features)
|
|
total_bid_size = sum(level.size for level in snapshot.bids[:10])
|
|
total_ask_size = sum(level.size for level in snapshot.asks[:10])
|
|
total_size = total_bid_size + total_ask_size
|
|
|
|
if total_size > 0:
|
|
for i in range(min(10, len(snapshot.bids))):
|
|
state_features.append(snapshot.bids[i].size / total_size)
|
|
|
|
while len(state_features) < 10:
|
|
state_features.append(0.0)
|
|
|
|
for i in range(min(10, len(snapshot.asks))):
|
|
state_features.append(snapshot.asks[i].size / total_size)
|
|
|
|
while len(state_features) < 20:
|
|
state_features.append(0.0)
|
|
else:
|
|
state_features.extend([0.0] * 20)
|
|
|
|
# Enhanced market state indicators (20 features)
|
|
metrics = self.liquidity_metrics.get(symbol, {})
|
|
|
|
spread_pct = (snapshot.spread / snapshot.mid_price) if snapshot.mid_price > 0 else 0
|
|
liquidity_ratio = metrics.get('liquidity_ratio', 1.0)
|
|
liquidity_imbalance = (liquidity_ratio - 1) / (liquidity_ratio + 1)
|
|
|
|
# Flow strength
|
|
recent_signals = [s for s in self.flow_signals[symbol]
|
|
if (datetime.now() - s.timestamp).seconds < 30]
|
|
flow_strength = sum(s.confidence for s in recent_signals) / max(len(recent_signals), 1)
|
|
|
|
# Price volatility
|
|
if len(self.order_book_history[symbol]) >= 10:
|
|
recent_prices = [s.mid_price for s in list(self.order_book_history[symbol])[-10:]]
|
|
price_volatility = np.std(recent_prices) / np.mean(recent_prices) if recent_prices else 0
|
|
else:
|
|
price_volatility = 0
|
|
|
|
# Enhanced order flow metrics for DQN
|
|
flow_metrics = self.get_enhanced_order_flow_metrics(symbol)
|
|
if flow_metrics:
|
|
aggressive_ratio = flow_metrics['aggressive_passive']['aggressive_ratio']
|
|
institutional_ratio = flow_metrics['institutional_retail']['institutional_ratio']
|
|
flow_intensity = min(flow_metrics['flow_intensity']['current_intensity'] / 10, 1.0) # Normalize
|
|
consumption_rate = flow_metrics['liquidity']['avg_consumption_rate']
|
|
price_impact = min(flow_metrics['price_impact']['avg_impact'] / 100, 1.0) # Normalize basis points
|
|
buy_pressure = flow_metrics['maker_taker_flow']['buy_pressure']
|
|
sell_pressure = flow_metrics['maker_taker_flow']['sell_pressure']
|
|
|
|
# Trade size distribution ratios
|
|
size_dist = flow_metrics['size_distribution']
|
|
total_trades = sum(size_dist.values()) or 1
|
|
retail_ratio = (size_dist.get('micro', 0) + size_dist.get('small', 0)) / total_trades
|
|
institutional_trade_ratio = (size_dist.get('large', 0) + size_dist.get('block', 0)) / total_trades
|
|
|
|
# Recent activity indicators
|
|
block_activity = min(size_dist.get('block', 0) / 10, 1.0) # Normalize
|
|
else:
|
|
aggressive_ratio = 0.5
|
|
institutional_ratio = 0.5
|
|
flow_intensity = 0.0
|
|
consumption_rate = 0.0
|
|
price_impact = 0.0
|
|
buy_pressure = 0.5
|
|
sell_pressure = 0.5
|
|
retail_ratio = 0.5
|
|
institutional_trade_ratio = 0.5
|
|
block_activity = 0.0
|
|
|
|
state_features.extend([
|
|
spread_pct * 10000, # Spread in basis points
|
|
liquidity_imbalance,
|
|
flow_strength,
|
|
price_volatility * 100,
|
|
min(len(snapshot.bids), 20) / 20,
|
|
min(len(snapshot.asks), 20) / 20,
|
|
len([s for s in recent_signals if s.signal_type == 'sweep']) / 10,
|
|
len([s for s in recent_signals if s.signal_type == 'absorption']) / 5,
|
|
len([s for s in recent_signals if s.signal_type == 'momentum']) / 5,
|
|
(datetime.now().hour * 60 + datetime.now().minute) / 1440,
|
|
# Enhanced order flow state features
|
|
aggressive_ratio,
|
|
institutional_ratio,
|
|
flow_intensity,
|
|
consumption_rate,
|
|
price_impact,
|
|
buy_pressure,
|
|
sell_pressure,
|
|
retail_ratio,
|
|
institutional_trade_ratio,
|
|
block_activity
|
|
])
|
|
|
|
return np.array(state_features, dtype=np.float32)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating DQN features for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_order_heatmap_matrix(self, symbol: str, levels: int = 40) -> Optional[np.ndarray]:
|
|
"""Generate heatmap matrix for visualization"""
|
|
try:
|
|
if symbol not in self.order_heatmaps or not self.order_heatmaps[symbol]:
|
|
return None
|
|
|
|
current_snapshot = self.order_books.get(symbol)
|
|
if not current_snapshot:
|
|
return None
|
|
|
|
mid_price = current_snapshot.mid_price
|
|
price_step = mid_price * 0.0001 # 1 basis point steps
|
|
|
|
# Matrix: time x price levels
|
|
time_window = min(600, len(self.order_heatmaps[symbol]))
|
|
heatmap_matrix = np.zeros((time_window, levels))
|
|
|
|
# Fill matrix
|
|
for t, entry in enumerate(list(self.order_heatmaps[symbol])[-time_window:]):
|
|
for price_offset, level_data in entry['levels'].items():
|
|
level_idx = int((price_offset + (levels/2) * price_step) / price_step)
|
|
|
|
if 0 <= level_idx < levels:
|
|
size_weight = 1.0 if level_data['side'] == 'bid' else -1.0
|
|
heatmap_matrix[t, level_idx] = level_data['size'] * size_weight
|
|
|
|
return heatmap_matrix
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating heatmap matrix for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_dashboard_data(self, symbol: str) -> Optional[Dict]:
|
|
"""Get data for dashboard visualization"""
|
|
try:
|
|
if symbol not in self.order_books:
|
|
return None
|
|
|
|
snapshot = self.order_books[symbol]
|
|
|
|
return {
|
|
'timestamp': snapshot.timestamp.isoformat(),
|
|
'symbol': symbol,
|
|
'mid_price': snapshot.mid_price,
|
|
'spread': snapshot.spread,
|
|
'bids': [{'price': l.price, 'size': l.size} for l in snapshot.bids[:20]],
|
|
'asks': [{'price': l.price, 'size': l.size} for l in snapshot.asks[:20]],
|
|
'liquidity_metrics': self.liquidity_metrics.get(symbol, {}),
|
|
'volume_profile': self.get_volume_profile_data(symbol),
|
|
'heatmap_matrix': self.get_order_heatmap_matrix(symbol).tolist() if self.get_order_heatmap_matrix(symbol) is not None else None,
|
|
'enhanced_order_flow': self.get_enhanced_order_flow_metrics(symbol),
|
|
'recent_signals': [
|
|
{
|
|
'type': s.signal_type,
|
|
'price': s.price,
|
|
'volume': s.volume,
|
|
'confidence': s.confidence,
|
|
'timestamp': s.timestamp.isoformat(),
|
|
'description': s.description
|
|
}
|
|
for s in list(self.flow_signals[symbol])[-10:]
|
|
]
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting dashboard data for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_volume_profile_data(self, symbol: str) -> Optional[List[Dict]]:
|
|
"""Get rolling volume profile data (10-minute window)"""
|
|
try:
|
|
if symbol not in self.volume_profiles:
|
|
return None
|
|
|
|
profile_data = []
|
|
for level in sorted(self.volume_profiles[symbol], key=lambda x: x.price):
|
|
profile_data.append({
|
|
'price': level.price,
|
|
'volume': level.volume,
|
|
'buy_volume': level.buy_volume,
|
|
'sell_volume': level.sell_volume,
|
|
'trades_count': level.trades_count,
|
|
'vwap': level.vwap,
|
|
'net_volume': level.buy_volume - level.sell_volume
|
|
})
|
|
|
|
return profile_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting volume profile for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_session_volume_profile_data(self, symbol: str) -> Optional[List[Dict]]:
|
|
"""Get Session Volume Profile (SVP) data - full session data"""
|
|
try:
|
|
if symbol not in self.price_level_cache:
|
|
return None
|
|
|
|
session_data = []
|
|
total_volume = sum(level.volume for level in self.price_level_cache[symbol].values())
|
|
|
|
for price_key, level in sorted(self.price_level_cache[symbol].items()):
|
|
volume_percentage = (level.volume / total_volume * 100) if total_volume > 0 else 0
|
|
|
|
session_data.append({
|
|
'price': level.price,
|
|
'volume': level.volume,
|
|
'buy_volume': level.buy_volume,
|
|
'sell_volume': level.sell_volume,
|
|
'trades_count': level.trades_count,
|
|
'vwap': level.vwap,
|
|
'net_volume': level.buy_volume - level.sell_volume,
|
|
'volume_percentage': volume_percentage,
|
|
'is_high_volume_node': volume_percentage > 2.0, # Mark significant price levels
|
|
'buy_sell_ratio': level.buy_volume / level.sell_volume if level.sell_volume > 0 else float('inf')
|
|
})
|
|
|
|
return session_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting Session Volume Profile for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_session_statistics(self, symbol: str) -> Optional[Dict]:
|
|
"""Get session trading statistics"""
|
|
try:
|
|
if symbol not in self.price_level_cache:
|
|
return None
|
|
|
|
levels = list(self.price_level_cache[symbol].values())
|
|
if not levels:
|
|
return None
|
|
|
|
total_volume = sum(level.volume for level in levels)
|
|
total_buy_volume = sum(level.buy_volume for level in levels)
|
|
total_sell_volume = sum(level.sell_volume for level in levels)
|
|
total_trades = sum(level.trades_count for level in levels)
|
|
|
|
# Calculate session VWAP
|
|
session_vwap = sum(level.vwap * level.volume for level in levels) / total_volume if total_volume > 0 else 0
|
|
|
|
# Find price extremes
|
|
prices = [level.price for level in levels]
|
|
session_high = max(prices) if prices else 0
|
|
session_low = min(prices) if prices else 0
|
|
|
|
# Find Point of Control (POC) - price level with highest volume
|
|
poc_level = max(levels, key=lambda x: x.volume) if levels else None
|
|
poc_price = poc_level.price if poc_level else 0
|
|
poc_volume = poc_level.volume if poc_level else 0
|
|
|
|
# Calculate Value Area (70% of volume around POC)
|
|
sorted_levels = sorted(levels, key=lambda x: x.volume, reverse=True)
|
|
value_area_volume = total_volume * 0.7
|
|
value_area_levels = []
|
|
current_volume = 0
|
|
|
|
for level in sorted_levels:
|
|
value_area_levels.append(level)
|
|
current_volume += level.volume
|
|
if current_volume >= value_area_volume:
|
|
break
|
|
|
|
value_area_high = max(level.price for level in value_area_levels) if value_area_levels else 0
|
|
value_area_low = min(level.price for level in value_area_levels) if value_area_levels else 0
|
|
|
|
session_start = self.session_start_time.get(symbol, datetime.now())
|
|
session_duration = (datetime.now() - session_start).total_seconds() / 3600 # Hours
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'session_start': session_start.isoformat(),
|
|
'session_duration_hours': session_duration,
|
|
'total_volume': total_volume,
|
|
'total_buy_volume': total_buy_volume,
|
|
'total_sell_volume': total_sell_volume,
|
|
'total_trades': total_trades,
|
|
'session_vwap': session_vwap,
|
|
'session_high': session_high,
|
|
'session_low': session_low,
|
|
'poc_price': poc_price,
|
|
'poc_volume': poc_volume,
|
|
'value_area_high': value_area_high,
|
|
'value_area_low': value_area_low,
|
|
'value_area_range': value_area_high - value_area_low,
|
|
'buy_sell_ratio': total_buy_volume / total_sell_volume if total_sell_volume > 0 else float('inf'),
|
|
'price_levels_traded': len(levels),
|
|
'avg_trade_size': total_volume / total_trades if total_trades > 0 else 0
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting session statistics for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_market_profile_analysis(self, symbol: str) -> Optional[Dict]:
|
|
"""Get detailed market profile analysis"""
|
|
try:
|
|
current_snapshot = self.order_books.get(symbol)
|
|
session_stats = self.get_session_statistics(symbol)
|
|
svp_data = self.get_session_volume_profile_data(symbol)
|
|
|
|
if not all([current_snapshot, session_stats, svp_data]):
|
|
return None
|
|
|
|
current_price = current_snapshot.mid_price
|
|
session_vwap = session_stats['session_vwap']
|
|
poc_price = session_stats['poc_price']
|
|
value_area_high = session_stats['value_area_high']
|
|
value_area_low = session_stats['value_area_low']
|
|
|
|
# Market structure analysis
|
|
price_vs_vwap = "above" if current_price > session_vwap else "below"
|
|
price_vs_poc = "above" if current_price > poc_price else "below"
|
|
|
|
in_value_area = value_area_low <= current_price <= value_area_high
|
|
|
|
# Find support and resistance levels from high volume nodes
|
|
high_volume_nodes = [item for item in svp_data if item['is_high_volume_node']]
|
|
resistance_levels = [node['price'] for node in high_volume_nodes if node['price'] > current_price]
|
|
support_levels = [node['price'] for node in high_volume_nodes if node['price'] < current_price]
|
|
|
|
# Sort to get nearest levels
|
|
resistance_levels.sort()
|
|
support_levels.sort(reverse=True)
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'current_price': current_price,
|
|
'market_structure': {
|
|
'price_vs_vwap': price_vs_vwap,
|
|
'price_vs_poc': price_vs_poc,
|
|
'in_value_area': in_value_area,
|
|
'distance_from_vwap_bps': int(abs(current_price - session_vwap) / session_vwap * 10000),
|
|
'distance_from_poc_bps': int(abs(current_price - poc_price) / poc_price * 10000)
|
|
},
|
|
'key_levels': {
|
|
'session_vwap': session_vwap,
|
|
'poc_price': poc_price,
|
|
'value_area_high': value_area_high,
|
|
'value_area_low': value_area_low,
|
|
'nearest_resistance': resistance_levels[0] if resistance_levels else None,
|
|
'nearest_support': support_levels[0] if support_levels else None
|
|
},
|
|
'volume_analysis': {
|
|
'total_high_volume_nodes': len(high_volume_nodes),
|
|
'resistance_levels': resistance_levels[:3], # Top 3 resistance
|
|
'support_levels': support_levels[:3], # Top 3 support
|
|
'poc_strength': session_stats['poc_volume'] / session_stats['total_volume'] * 100
|
|
},
|
|
'session_statistics': session_stats
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting market profile analysis for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_enhanced_order_flow_metrics(self, symbol: str) -> Optional[Dict]:
|
|
"""Get enhanced order flow metrics including aggressive vs passive ratios"""
|
|
try:
|
|
if symbol not in self.current_flow_ratios:
|
|
return None
|
|
|
|
current_ratios = self.current_flow_ratios.get(symbol, {})
|
|
|
|
# Get recent trade size distribution
|
|
recent_trades = list(self.trade_size_distributions[symbol])[-100:] # Last 100 trades
|
|
if not recent_trades:
|
|
return None
|
|
|
|
# Calculate institutional vs retail breakdown
|
|
institutional_trades = [t for t in recent_trades if t['is_institutional']]
|
|
retail_trades = [t for t in recent_trades if not t['is_institutional']]
|
|
block_trades = [t for t in recent_trades if t['is_block_trade']]
|
|
|
|
institutional_volume = sum(t['trade_value'] for t in institutional_trades)
|
|
retail_volume = sum(t['trade_value'] for t in retail_trades)
|
|
total_volume = institutional_volume + retail_volume
|
|
|
|
# Size category breakdown
|
|
size_breakdown = {
|
|
'micro': len([t for t in recent_trades if t['size_category'] == 'micro']),
|
|
'small': len([t for t in recent_trades if t['size_category'] == 'small']),
|
|
'medium': len([t for t in recent_trades if t['size_category'] == 'medium']),
|
|
'large': len([t for t in recent_trades if t['size_category'] == 'large']),
|
|
'block': len([t for t in recent_trades if t['size_category'] == 'block'])
|
|
}
|
|
|
|
# Get recent order flow intensity
|
|
recent_intensity = list(self.order_flow_intensity[symbol])[-10:]
|
|
avg_intensity = sum(i['intensity_score'] for i in recent_intensity) / max(1, len(recent_intensity))
|
|
|
|
# Get recent liquidity consumption
|
|
recent_consumption = list(self.liquidity_consumption_rates[symbol])[-20:]
|
|
avg_consumption_rate = sum(c['consumption_rate'] for c in recent_consumption) / max(1, len(recent_consumption))
|
|
|
|
# Get recent price impact
|
|
recent_impacts = list(self.price_impact_measurements[symbol])[-20:]
|
|
avg_price_impact = sum(i['price_impact'] for i in recent_impacts) / max(1, len(recent_impacts))
|
|
|
|
# Impact distribution
|
|
impact_distribution = {}
|
|
for impact in recent_impacts:
|
|
category = impact['impact_category']
|
|
impact_distribution[category] = impact_distribution.get(category, 0) + 1
|
|
|
|
# Market maker vs taker flow analysis
|
|
recent_flows = list(self.market_maker_taker_flows[symbol])[-50:]
|
|
buy_aggressive_volume = sum(f['trade_value'] for f in recent_flows if f['flow_direction'] == 'buy_aggressive')
|
|
sell_aggressive_volume = sum(f['trade_value'] for f in recent_flows if f['flow_direction'] == 'sell_aggressive')
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
|
|
# Aggressive vs Passive Analysis
|
|
'aggressive_passive': {
|
|
'aggressive_ratio': current_ratios.get('aggressive_ratio', 0),
|
|
'passive_ratio': current_ratios.get('passive_ratio', 0),
|
|
'aggressive_volume': current_ratios.get('aggressive_volume', 0),
|
|
'passive_volume': current_ratios.get('passive_volume', 0),
|
|
'avg_aggressive_size': current_ratios.get('avg_aggressive_size', 0),
|
|
'avg_passive_size': current_ratios.get('avg_passive_size', 0),
|
|
'trade_count': current_ratios.get('trade_count', 0)
|
|
},
|
|
|
|
# Institutional vs Retail Analysis
|
|
'institutional_retail': {
|
|
'institutional_ratio': institutional_volume / total_volume if total_volume > 0 else 0,
|
|
'retail_ratio': retail_volume / total_volume if total_volume > 0 else 0,
|
|
'institutional_volume': institutional_volume,
|
|
'retail_volume': retail_volume,
|
|
'institutional_trade_count': len(institutional_trades),
|
|
'retail_trade_count': len(retail_trades),
|
|
'block_trade_count': len(block_trades),
|
|
'avg_institutional_size': institutional_volume / max(1, len(institutional_trades)),
|
|
'avg_retail_size': retail_volume / max(1, len(retail_trades))
|
|
},
|
|
|
|
# Trade Size Distribution
|
|
'size_distribution': size_breakdown,
|
|
|
|
# Order Flow Intensity
|
|
'flow_intensity': {
|
|
'current_intensity': avg_intensity,
|
|
'intensity_category': 'high' if avg_intensity > 5 else 'medium' if avg_intensity > 2 else 'low'
|
|
},
|
|
|
|
# Liquidity Analysis
|
|
'liquidity': {
|
|
'avg_consumption_rate': avg_consumption_rate,
|
|
'consumption_category': 'high' if avg_consumption_rate > 0.8 else 'medium' if avg_consumption_rate > 0.5 else 'low'
|
|
},
|
|
|
|
# Price Impact Analysis
|
|
'price_impact': {
|
|
'avg_impact': avg_price_impact * 10000, # in basis points
|
|
'impact_distribution': impact_distribution,
|
|
'impact_category': 'high' if avg_price_impact > 0.005 else 'medium' if avg_price_impact > 0.001 else 'low'
|
|
},
|
|
|
|
# Market Maker vs Taker Flow
|
|
'maker_taker_flow': {
|
|
'buy_aggressive_volume': buy_aggressive_volume,
|
|
'sell_aggressive_volume': sell_aggressive_volume,
|
|
'buy_pressure': buy_aggressive_volume / (buy_aggressive_volume + sell_aggressive_volume) if (buy_aggressive_volume + sell_aggressive_volume) > 0 else 0.5,
|
|
'sell_pressure': sell_aggressive_volume / (buy_aggressive_volume + sell_aggressive_volume) if (buy_aggressive_volume + sell_aggressive_volume) > 0 else 0.5
|
|
},
|
|
|
|
# 24h Volume Statistics (if available)
|
|
'volume_stats': self.volume_stats.get(symbol, {})
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting enhanced order flow metrics for {symbol}: {e}")
|
|
return None |