cob integration (wip)

This commit is contained in:
Dobromir Popov
2025-06-19 01:12:10 +03:00
parent 2bc78af888
commit 2ef7ed011d
6 changed files with 1011 additions and 98 deletions

View File

@ -8,6 +8,7 @@ This enhanced orchestrator implements:
4. Perfect move marking for CNN backpropagation training
5. Market environment adaptation through RL evaluation
6. Universal data format compliance (5 timeseries streams)
7. Consolidated Order Book (COB) integration for real-time market microstructure
"""
import asyncio
@ -32,6 +33,7 @@ from .trading_action import TradingAction
from .negative_case_trainer import NegativeCaseTrainer
from .trading_executor import TradingExecutor
from .cnn_monitor import log_cnn_prediction, start_cnn_training_session
from .cob_integration import COBIntegration
# Enhanced pivot RL trainer functionality integrated into orchestrator
logger = logging.getLogger(__name__)
@ -71,7 +73,7 @@ class TradingAction:
@dataclass
class MarketState:
"""Complete market state for RL evaluation with comprehensive data"""
"""Complete market state for RL evaluation with comprehensive data including COB"""
symbol: str
timestamp: datetime
prices: Dict[str, float] # {timeframe: current_price}
@ -90,6 +92,14 @@ class MarketState:
cnn_predictions: Optional[Dict[str, np.ndarray]] = None # CNN predictions by timeframe
pivot_points: Optional[Dict[str, Any]] = None # Williams market structure data
market_microstructure: Dict[str, Any] = field(default_factory=dict) # Tick-level patterns
# COB (Consolidated Order Book) data for market microstructure analysis
cob_features: Optional[np.ndarray] = None # COB CNN features (200 dimensions)
cob_state: Optional[np.ndarray] = None # COB DQN state features (50 dimensions)
order_book_imbalance: float = 0.0 # Bid/ask imbalance ratio
liquidity_depth: float = 0.0 # Total liquidity within 1% of mid price
exchange_diversity: float = 0.0 # Number of exchanges contributing to liquidity
market_impact_estimate: float = 0.0 # Estimated market impact for standard trade size
@dataclass
class PerfectMove:
@ -136,7 +146,7 @@ class EnhancedTradingOrchestrator:
symbols: List[str] = None,
enhanced_rl_training: bool = True,
model_registry: Dict = None):
"""Initialize the enhanced orchestrator with 2-action system"""
"""Initialize the enhanced orchestrator with 2-action system and COB integration"""
self.config = get_config()
self.data_provider = data_provider or DataProvider()
self.model_registry = model_registry or get_model_registry()
@ -155,6 +165,22 @@ class EnhancedTradingOrchestrator:
if self.enhanced_rl_training:
logger.info("Enhanced RL training enabled")
# Initialize COB Integration for real-time market microstructure
self.cob_integration = COBIntegration(
data_provider=self.data_provider,
symbols=self.symbols
)
# Register COB callbacks for CNN and RL models
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
self.cob_integration.add_dqn_callback(self._on_cob_dqn_state)
# COB feature storage for model integration
self.latest_cob_features: Dict[str, np.ndarray] = {}
self.latest_cob_state: Dict[str, np.ndarray] = {}
self.cob_feature_history: Dict[str, deque] = {symbol: deque(maxlen=100) for symbol in self.symbols}
logger.info("COB Integration initialized for real-time market microstructure")
# Position tracking for 2-action system
self.current_positions = {} # symbol -> {'side': 'LONG'|'SHORT'|'FLAT', 'entry_price': float, 'timestamp': datetime}
self.last_signals = {} # symbol -> {'action': 'BUY'|'SELL', 'timestamp': datetime, 'confidence': float}
@ -414,6 +440,46 @@ class EnhancedTradingOrchestrator:
# Analyze market microstructure
market_microstructure = self._analyze_market_microstructure(raw_ticks)
# Get COB (Consolidated Order Book) data if available
cob_features = self.latest_cob_features.get(symbol)
cob_state = self.latest_cob_state.get(symbol)
# Get COB snapshot for additional metrics
cob_snapshot = None
order_book_imbalance = 0.0
liquidity_depth = 0.0
exchange_diversity = 0.0
market_impact_estimate = 0.0
try:
if self.cob_integration:
cob_snapshot = self.cob_integration.get_cob_snapshot(symbol)
if cob_snapshot:
# Calculate order book imbalance
bid_liquidity = sum(level.total_volume_usd for level in cob_snapshot.consolidated_bids[:10])
ask_liquidity = sum(level.total_volume_usd for level in cob_snapshot.consolidated_asks[:10])
if ask_liquidity > 0:
order_book_imbalance = (bid_liquidity - ask_liquidity) / (bid_liquidity + ask_liquidity)
# Calculate liquidity depth (within 1% of mid price)
mid_price = cob_snapshot.volume_weighted_mid
price_range = mid_price * 0.01 # 1%
depth_bids = [l for l in cob_snapshot.consolidated_bids if l.price >= mid_price - price_range]
depth_asks = [l for l in cob_snapshot.consolidated_asks if l.price <= mid_price + price_range]
liquidity_depth = sum(l.total_volume_usd for l in depth_bids + depth_asks)
# Calculate exchange diversity
all_exchanges = set()
for level in cob_snapshot.consolidated_bids[:20] + cob_snapshot.consolidated_asks[:20]:
all_exchanges.update(level.exchange_breakdown.keys())
exchange_diversity = len(all_exchanges)
# Estimate market impact for 10k USD trade
market_impact_estimate = self._estimate_market_impact(cob_snapshot, 10000)
except Exception as e:
logger.warning(f"Error calculating COB metrics for {symbol}: {e}")
# Create comprehensive market state
market_state = MarketState(
symbol=symbol,
@ -431,17 +497,51 @@ class EnhancedTradingOrchestrator:
cnn_hidden_features=cnn_hidden_features,
cnn_predictions=cnn_predictions,
pivot_points=pivot_points,
market_microstructure=market_microstructure
market_microstructure=market_microstructure,
# COB data integration
cob_features=cob_features,
cob_state=cob_state,
order_book_imbalance=order_book_imbalance,
liquidity_depth=liquidity_depth,
exchange_diversity=exchange_diversity,
market_impact_estimate=market_impact_estimate
)
market_states[symbol] = market_state
logger.debug(f"Created comprehensive market state for {symbol} with {len(raw_ticks)} ticks")
logger.debug(f"Created comprehensive market state for {symbol} with COB integration")
except Exception as e:
logger.error(f"Error creating market state for {symbol}: {e}")
return market_states
def _estimate_market_impact(self, cob_snapshot, trade_size_usd: float) -> float:
"""Estimate market impact for a given trade size"""
try:
# Simple market impact estimation based on order book depth
cumulative_volume = 0
weighted_price = 0
mid_price = cob_snapshot.volume_weighted_mid
# For buy orders, walk through asks
for level in cob_snapshot.consolidated_asks:
if cumulative_volume >= trade_size_usd:
break
volume_needed = min(level.total_volume_usd, trade_size_usd - cumulative_volume)
weighted_price += level.price * volume_needed
cumulative_volume += volume_needed
if cumulative_volume > 0:
avg_execution_price = weighted_price / cumulative_volume
impact = (avg_execution_price - mid_price) / mid_price
return abs(impact)
return 0.0
except Exception as e:
logger.warning(f"Error estimating market impact: {e}")
return 0.0
def _get_recent_tick_data_for_rl(self, symbol: str, seconds: int = 300) -> List[Dict[str, Any]]:
"""Get recent tick data for RL state building"""
try:
@ -600,18 +700,130 @@ class EnhancedTradingOrchestrator:
hidden_features[model_name] = model_hidden
if model_pred is not None:
predictions[model_name] = model_pred
except Exception as e:
logger.warning(f"Error getting features from CNN model {model_name}: {e}")
logger.warning(f"Error extracting CNN features from {model_name}: {e}")
return (hidden_features if hidden_features else None,
predictions if predictions else None)
return hidden_features if hidden_features else None, predictions if predictions else None
return None, None
except Exception as e:
logger.warning(f"Error getting CNN features for {symbol}: {e}")
return None, None
def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]:
"""Get latest price for symbol and timeframe from universal data stream"""
try:
if symbol == 'ETH/USDT':
if timeframe == '1s' and len(universal_stream.eth_ticks) > 0:
# Get latest tick price (close price is at index 4)
return float(universal_stream.eth_ticks[-1, 4]) # close price
elif timeframe == '1m' and len(universal_stream.eth_1m) > 0:
return float(universal_stream.eth_1m[-1, 4]) # close price
elif timeframe == '1h' and len(universal_stream.eth_1h) > 0:
return float(universal_stream.eth_1h[-1, 4]) # close price
elif timeframe == '1d' and len(universal_stream.eth_1d) > 0:
return float(universal_stream.eth_1d[-1, 4]) # close price
elif symbol == 'BTC/USDT':
if timeframe == '1s' and len(universal_stream.btc_ticks) > 0:
return float(universal_stream.btc_ticks[-1, 4]) # close price
# Fallback to data provider
return self._get_latest_price_fallback(symbol, timeframe)
except Exception as e:
logger.warning(f"Error getting latest price for {symbol} {timeframe}: {e}")
return self._get_latest_price_fallback(symbol, timeframe)
def _get_latest_price_fallback(self, symbol: str, timeframe: str) -> Optional[float]:
"""Fallback method to get latest price from data provider"""
try:
df = self.data_provider.get_historical_data(symbol, timeframe, limit=1)
if df is not None and not df.empty:
return float(df['close'].iloc[-1])
return None
except Exception as e:
logger.warning(f"Error in price fallback for {symbol} {timeframe}: {e}")
return None
def _calculate_volatility_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float:
"""Calculate volatility from universal data stream"""
try:
if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 1:
# Calculate volatility from 1m candles
closes = universal_stream.eth_1m[:, 4] # close prices
if len(closes) > 1:
returns = np.diff(np.log(closes))
return float(np.std(returns) * np.sqrt(1440)) # Daily volatility
elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 1:
# Calculate volatility from tick data
closes = universal_stream.btc_ticks[:, 4] # close prices
if len(closes) > 1:
returns = np.diff(np.log(closes))
return float(np.std(returns) * np.sqrt(86400)) # Daily volatility
return 0.0
except Exception as e:
logger.warning(f"Error calculating volatility for {symbol}: {e}")
return 0.0
def _calculate_volume_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float:
"""Calculate volume from universal data stream"""
try:
if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 0:
# Get latest volume from 1m candles
volumes = universal_stream.eth_1m[:, 5] # volume
return float(np.mean(volumes[-10:])) # Average of last 10 candles
elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 0:
# Calculate volume from tick data
volumes = universal_stream.btc_ticks[:, 5] # volume
return float(np.sum(volumes[-100:])) # Sum of last 100 ticks
return 0.0
except Exception as e:
logger.warning(f"Error calculating volume for {symbol}: {e}")
return 0.0
def _calculate_trend_strength_from_universal(self, symbol: str, universal_stream: UniversalDataStream) -> float:
"""Calculate trend strength from universal data stream"""
try:
if symbol == 'ETH/USDT' and len(universal_stream.eth_1m) > 20:
# Calculate trend strength using 20-period moving average
closes = universal_stream.eth_1m[-20:, 4] # last 20 closes
if len(closes) >= 20:
sma = np.mean(closes)
current_price = closes[-1]
return float((current_price - sma) / sma) # Relative trend strength
elif symbol == 'BTC/USDT' and len(universal_stream.btc_ticks) > 100:
# Calculate trend from tick data
closes = universal_stream.btc_ticks[-100:, 4] # last 100 ticks
if len(closes) >= 100:
start_price = closes[0]
end_price = closes[-1]
return float((end_price - start_price) / start_price)
return 0.0
except Exception as e:
logger.warning(f"Error calculating trend strength for {symbol}: {e}")
return 0.0
def _determine_market_regime(self, symbol: str, universal_stream: UniversalDataStream) -> str:
"""Determine market regime from universal data stream"""
try:
# Calculate volatility and trend strength
volatility = self._calculate_volatility_from_universal(symbol, universal_stream)
trend_strength = abs(self._calculate_trend_strength_from_universal(symbol, universal_stream))
# Classify market regime
if volatility > 0.05: # High volatility threshold
return 'volatile'
elif trend_strength > 0.02: # Strong trend threshold
return 'trending'
else:
return 'ranging'
except Exception as e:
logger.warning(f"Error determining market regime for {symbol}: {e}")
return 'unknown'
def _extract_cnn_features(self, model, feature_matrix: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""Extract hidden features and predictions from CNN model"""
try:
@ -2535,4 +2747,45 @@ class EnhancedTradingOrchestrator:
'positions': {sym: pos for sym, pos in self.current_positions.items()},
'total_positions': len(self.current_positions),
'last_signals': self.last_signals
}
}
def _on_cob_cnn_features(self, symbol: str, cob_data: Dict):
"""Handle COB features for CNN model integration"""
try:
if 'features' in cob_data:
features = cob_data['features']
self.latest_cob_features[symbol] = features
self.cob_feature_history[symbol].append({
'timestamp': cob_data.get('timestamp', datetime.now()),
'features': features
})
logger.debug(f"COB CNN features updated for {symbol}: {features.shape}")
except Exception as e:
logger.error(f"Error processing COB CNN features for {symbol}: {e}")
def _on_cob_dqn_state(self, symbol: str, cob_data: Dict):
"""Handle COB state features for DQN model integration"""
try:
if 'state' in cob_data:
state = cob_data['state']
self.latest_cob_state[symbol] = state
logger.debug(f"COB DQN state updated for {symbol}: {state.shape}")
except Exception as e:
logger.error(f"Error processing COB DQN state for {symbol}: {e}")
async def start_cob_integration(self):
"""Start COB integration for real-time data feed"""
try:
logger.info("Starting COB integration for real-time market microstructure...")
await self.cob_integration.start()
logger.info("COB integration started successfully")
except Exception as e:
logger.error(f"Error starting COB integration: {e}")
async def stop_cob_integration(self):
"""Stop COB integration"""
try:
await self.cob_integration.stop()
logger.info("COB integration stopped")
except Exception as e:
logger.error(f"Error stopping COB integration: {e}")