Files
gogo2/COBY/integration/orchestrator_adapter.py
2025-08-04 23:13:44 +03:00

888 lines
33 KiB
Python

"""
Orchestrator integration adapter for COBY system.
Provides compatibility layer for seamless integration with existing orchestrator.
"""
import asyncio
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable, Union
from dataclasses import dataclass, field
import uuid
from collections import deque
import threading
from ..storage.storage_manager import StorageManager
from ..replay.replay_manager import HistoricalReplayManager
from ..caching.redis_manager import RedisManager
from ..aggregation.aggregation_engine import StandardAggregationEngine
from ..processing.data_processor import StandardDataProcessor
from ..connectors.binance_connector import BinanceConnector
from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData, ReplayStatus
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import IntegrationError, ValidationError
from ..config import Config
logger = get_logger(__name__)
@dataclass
class MarketTick:
"""Market tick data structure compatible with orchestrator"""
symbol: str
price: float
volume: float
timestamp: datetime
side: str = "unknown"
exchange: str = "binance"
subscriber_name: str = "unknown"
@dataclass
class PivotBounds:
"""Pivot bounds structure compatible with orchestrator"""
symbol: str
price_max: float
price_min: float
volume_max: float
volume_min: float
pivot_support_levels: List[float]
pivot_resistance_levels: List[float]
pivot_context: Dict[str, Any]
created_timestamp: datetime
data_period_start: datetime
data_period_end: datetime
total_candles_analyzed: int
def get_price_range(self) -> float:
return self.price_max - self.price_min
def normalize_price(self, price: float) -> float:
return (price - self.price_min) / self.get_price_range()
class COBYOrchestratorAdapter:
"""
Adapter that makes COBY system compatible with existing orchestrator interface.
Provides:
- Data provider interface compatibility
- Live/replay mode switching
- Data quality indicators
- Subscription management
- Caching and performance optimization
"""
def __init__(self, config: Config):
"""
Initialize orchestrator adapter.
Args:
config: COBY system configuration
"""
self.config = config
# Core components
self.storage_manager = StorageManager(config)
self.replay_manager = HistoricalReplayManager(self.storage_manager, config)
self.redis_manager = RedisManager()
self.aggregation_engine = StandardAggregationEngine()
self.data_processor = StandardDataProcessor()
# Exchange connectors
self.connectors = {
'binance': BinanceConnector()
}
# Mode management
self.mode = 'live' # 'live' or 'replay'
self.current_replay_session = None
# Subscription management
self.subscribers = {
'ticks': {},
'cob_raw': {},
'cob_aggregated': {},
'training_data': {},
'model_predictions': {}
}
self.subscriber_lock = threading.Lock()
# Data caching
self.tick_cache = {}
self.orderbook_cache = {}
self.price_cache = {}
# Statistics
self.stats = {
'ticks_processed': 0,
'orderbooks_processed': 0,
'subscribers_active': 0,
'cache_hits': 0,
'cache_misses': 0
}
# Initialize components
self._initialize_components()
logger.info("COBY orchestrator adapter initialized")
async def _initialize_components(self):
"""Initialize all COBY components."""
try:
# Initialize storage
await self.storage_manager.initialize()
# Initialize Redis cache
await self.redis_manager.initialize()
# Initialize connectors
for name, connector in self.connectors.items():
await connector.connect()
connector.add_data_callback(self._handle_connector_data)
logger.info("COBY components initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize COBY components: {e}")
raise IntegrationError(f"Component initialization failed: {e}")
# === ORCHESTRATOR COMPATIBILITY METHODS ===
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000,
refresh: bool = False) -> Optional[pd.DataFrame]:
"""Get historical OHLCV data compatible with orchestrator interface."""
try:
set_correlation_id()
# Convert timeframe to minutes
timeframe_minutes = self._parse_timeframe(timeframe)
if not timeframe_minutes:
logger.warning(f"Unsupported timeframe: {timeframe}")
return None
# Calculate time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=timeframe_minutes * limit)
# Get data from storage
if self.mode == 'replay' and self.current_replay_session:
# Use replay data
data = asyncio.run(self.storage_manager.get_historical_data(
symbol, start_time, end_time, 'ohlcv'
))
else:
# Use live data from cache or storage
cache_key = f"ohlcv:{symbol}:{timeframe}:{limit}"
cached_data = asyncio.run(self.redis_manager.get(cache_key))
if cached_data and not refresh:
self.stats['cache_hits'] += 1
return pd.DataFrame(cached_data)
self.stats['cache_misses'] += 1
data = asyncio.run(self.storage_manager.get_historical_data(
symbol, start_time, end_time, 'ohlcv'
))
# Cache the result
if data:
asyncio.run(self.redis_manager.set(cache_key, data, ttl=60))
if not data:
return None
# Convert to DataFrame compatible with orchestrator
df = pd.DataFrame(data)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df = df.sort_index()
return df
except Exception as e:
logger.error(f"Error getting historical data for {symbol}: {e}")
return None
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for a symbol."""
try:
# Check cache first
if symbol in self.price_cache:
cached_price, timestamp = self.price_cache[symbol]
if (datetime.utcnow() - timestamp).seconds < 5: # 5 second cache
return cached_price
# Get latest orderbook
latest_orderbook = asyncio.run(
self.storage_manager.get_latest_orderbook(symbol)
)
if latest_orderbook and latest_orderbook.get('mid_price'):
price = float(latest_orderbook['mid_price'])
self.price_cache[symbol] = (price, datetime.utcnow())
return price
return None
except Exception as e:
logger.error(f"Error getting current price for {symbol}: {e}")
return None
def get_live_price_from_api(self, symbol: str) -> Optional[float]:
"""Get live price from API (low-latency method)."""
return self.get_current_price(symbol)
def build_base_data_input(self, symbol: str) -> Optional[Any]:
"""Build base data input compatible with orchestrator models."""
try:
# This would need to be implemented based on the specific
# BaseDataInput class used by the orchestrator
# For now, return a mock object that provides the interface
class MockBaseDataInput:
def __init__(self, symbol: str, adapter):
self.symbol = symbol
self.adapter = adapter
def get_feature_vector(self) -> np.ndarray:
# Return feature vector from COBY data
return self.adapter._get_feature_vector(self.symbol)
return MockBaseDataInput(symbol, self)
except Exception as e:
logger.error(f"Error building base data input for {symbol}: {e}")
return None
def _get_feature_vector(self, symbol: str) -> np.ndarray:
"""Get feature vector for ML models."""
try:
# Get latest market data
latest_orderbook = asyncio.run(
self.storage_manager.get_latest_orderbook(symbol)
)
if not latest_orderbook:
return np.zeros(100, dtype=np.float32) # Default size
# Extract features from orderbook
features = []
# Price features
if latest_orderbook.get('mid_price'):
features.append(float(latest_orderbook['mid_price']))
if latest_orderbook.get('spread'):
features.append(float(latest_orderbook['spread']))
# Volume features
if latest_orderbook.get('bid_volume'):
features.append(float(latest_orderbook['bid_volume']))
if latest_orderbook.get('ask_volume'):
features.append(float(latest_orderbook['ask_volume']))
# Pad or truncate to expected size
target_size = 100
if len(features) < target_size:
features.extend([0.0] * (target_size - len(features)))
elif len(features) > target_size:
features = features[:target_size]
return np.array(features, dtype=np.float32)
except Exception as e:
logger.error(f"Error getting feature vector for {symbol}: {e}")
return np.zeros(100, dtype=np.float32)
# === COB DATA METHODS ===
def get_cob_raw_ticks(self, symbol: str, count: int = 1000) -> List[Dict]:
"""Get raw COB ticks for a symbol."""
try:
# Get recent orderbook snapshots
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=15) # 15 minutes of data
data = asyncio.run(self.storage_manager.get_historical_data(
symbol, start_time, end_time, 'orderbook'
))
if not data:
return []
# Convert to COB tick format
ticks = []
for item in data[-count:]:
tick = {
'symbol': item['symbol'],
'timestamp': item['timestamp'].isoformat(),
'mid_price': item.get('mid_price'),
'spread': item.get('spread'),
'bid_volume': item.get('bid_volume'),
'ask_volume': item.get('ask_volume'),
'exchange': item['exchange']
}
ticks.append(tick)
return ticks
except Exception as e:
logger.error(f"Error getting COB raw ticks for {symbol}: {e}")
return []
def get_cob_1s_aggregated(self, symbol: str, count: int = 300) -> List[Dict]:
"""Get 1s aggregated COB data with $1 price buckets."""
try:
# Get heatmap data
bucket_size = self.config.aggregation.bucket_size
start_time = datetime.utcnow() - timedelta(seconds=count)
heatmap_data = asyncio.run(
self.storage_manager.get_heatmap_data(symbol, bucket_size, start_time)
)
if not heatmap_data:
return []
# Group by timestamp and aggregate
aggregated = {}
for item in heatmap_data:
timestamp = item['timestamp']
if timestamp not in aggregated:
aggregated[timestamp] = {
'timestamp': timestamp.isoformat(),
'symbol': symbol,
'bid_buckets': {},
'ask_buckets': {},
'total_bid_volume': 0,
'total_ask_volume': 0
}
price_bucket = float(item['price_bucket'])
volume = float(item['volume'])
side = item['side']
if side == 'bid':
aggregated[timestamp]['bid_buckets'][price_bucket] = volume
aggregated[timestamp]['total_bid_volume'] += volume
else:
aggregated[timestamp]['ask_buckets'][price_bucket] = volume
aggregated[timestamp]['total_ask_volume'] += volume
# Return sorted by timestamp
result = list(aggregated.values())
result.sort(key=lambda x: x['timestamp'])
return result[-count:]
except Exception as e:
logger.error(f"Error getting COB 1s aggregated data for {symbol}: {e}")
return []
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
"""Get latest COB raw tick for a symbol."""
try:
latest_orderbook = asyncio.run(
self.storage_manager.get_latest_orderbook(symbol)
)
if not latest_orderbook:
return None
return {
'symbol': symbol,
'timestamp': latest_orderbook['timestamp'].isoformat(),
'mid_price': latest_orderbook.get('mid_price'),
'spread': latest_orderbook.get('spread'),
'bid_volume': latest_orderbook.get('bid_volume'),
'ask_volume': latest_orderbook.get('ask_volume'),
'exchange': latest_orderbook['exchange']
}
except Exception as e:
logger.error(f"Error getting latest COB data for {symbol}: {e}")
return None
def get_latest_cob_aggregated(self, symbol: str) -> Optional[Dict]:
"""Get latest 1s aggregated COB data for a symbol."""
try:
aggregated_data = self.get_cob_1s_aggregated(symbol, count=1)
return aggregated_data[0] if aggregated_data else None
except Exception as e:
logger.error(f"Error getting latest COB aggregated data for {symbol}: {e}")
return None
# === SUBSCRIPTION METHODS ===
def subscribe_to_ticks(self, callback: Callable[[MarketTick], None],
symbols: List[str] = None,
subscriber_name: str = None) -> str:
"""Subscribe to tick data updates."""
try:
subscriber_id = str(uuid.uuid4())
with self.subscriber_lock:
self.subscribers['ticks'][subscriber_id] = {
'callback': callback,
'symbols': symbols or [],
'subscriber_name': subscriber_name or 'unknown',
'created_at': datetime.utcnow()
}
self.stats['subscribers_active'] += 1
logger.info(f"Added tick subscriber {subscriber_id} for {subscriber_name}")
return subscriber_id
except Exception as e:
logger.error(f"Error adding tick subscriber: {e}")
return ""
def subscribe_to_cob_raw_ticks(self, callback: Callable[[str, Dict], None]) -> str:
"""Subscribe to raw COB tick updates."""
try:
subscriber_id = str(uuid.uuid4())
with self.subscriber_lock:
self.subscribers['cob_raw'][subscriber_id] = {
'callback': callback,
'created_at': datetime.utcnow()
}
self.stats['subscribers_active'] += 1
logger.info(f"Added COB raw tick subscriber {subscriber_id}")
return subscriber_id
except Exception as e:
logger.error(f"Error adding COB raw tick subscriber: {e}")
return ""
def subscribe_to_cob_aggregated(self, callback: Callable[[str, Dict], None]) -> str:
"""Subscribe to 1s aggregated COB updates."""
try:
subscriber_id = str(uuid.uuid4())
with self.subscriber_lock:
self.subscribers['cob_aggregated'][subscriber_id] = {
'callback': callback,
'created_at': datetime.utcnow()
}
self.stats['subscribers_active'] += 1
logger.info(f"Added COB aggregated subscriber {subscriber_id}")
return subscriber_id
except Exception as e:
logger.error(f"Error adding COB aggregated subscriber: {e}")
return ""
def subscribe_to_training_data(self, callback: Callable[[str, dict], None]) -> str:
"""Subscribe to training data updates."""
try:
subscriber_id = str(uuid.uuid4())
with self.subscriber_lock:
self.subscribers['training_data'][subscriber_id] = {
'callback': callback,
'created_at': datetime.utcnow()
}
self.stats['subscribers_active'] += 1
logger.info(f"Added training data subscriber {subscriber_id}")
return subscriber_id
except Exception as e:
logger.error(f"Error adding training data subscriber: {e}")
return ""
def subscribe_to_model_predictions(self, callback: Callable[[str, dict], None]) -> str:
"""Subscribe to model prediction updates."""
try:
subscriber_id = str(uuid.uuid4())
with self.subscriber_lock:
self.subscribers['model_predictions'][subscriber_id] = {
'callback': callback,
'created_at': datetime.utcnow()
}
self.stats['subscribers_active'] += 1
logger.info(f"Added model prediction subscriber {subscriber_id}")
return subscriber_id
except Exception as e:
logger.error(f"Error adding model prediction subscriber: {e}")
return ""
def unsubscribe(self, subscriber_id: str) -> bool:
"""Unsubscribe from all data feeds."""
try:
with self.subscriber_lock:
removed = False
for category in self.subscribers:
if subscriber_id in self.subscribers[category]:
del self.subscribers[category][subscriber_id]
self.stats['subscribers_active'] -= 1
removed = True
break
if removed:
logger.info(f"Removed subscriber {subscriber_id}")
return removed
except Exception as e:
logger.error(f"Error removing subscriber {subscriber_id}: {e}")
return False
# === MODE SWITCHING ===
async def switch_to_live_mode(self) -> bool:
"""Switch to live data mode."""
try:
if self.mode == 'live':
logger.info("Already in live mode")
return True
# Stop replay session if active
if self.current_replay_session:
await self.replay_manager.stop_replay(self.current_replay_session)
self.current_replay_session = None
# Start live connectors
for name, connector in self.connectors.items():
if not connector.is_connected:
await connector.connect()
self.mode = 'live'
logger.info("Switched to live data mode")
return True
except Exception as e:
logger.error(f"Error switching to live mode: {e}")
return False
async def switch_to_replay_mode(self, start_time: datetime, end_time: datetime,
speed: float = 1.0, symbols: List[str] = None) -> bool:
"""Switch to replay data mode."""
try:
if self.mode == 'replay' and self.current_replay_session:
await self.replay_manager.stop_replay(self.current_replay_session)
# Create replay session
session_id = self.replay_manager.create_replay_session(
start_time=start_time,
end_time=end_time,
speed=speed,
symbols=symbols or self.config.exchanges.symbols
)
# Add data callback for replay
self.replay_manager.add_data_callback(session_id, self._handle_replay_data)
# Start replay
await self.replay_manager.start_replay(session_id)
self.current_replay_session = session_id
self.mode = 'replay'
logger.info(f"Switched to replay mode: {start_time} to {end_time}")
return True
except Exception as e:
logger.error(f"Error switching to replay mode: {e}")
return False
def get_current_mode(self) -> str:
"""Get current data mode (live or replay)."""
return self.mode
def get_replay_status(self) -> Optional[Dict[str, Any]]:
"""Get current replay session status."""
if not self.current_replay_session:
return None
session = self.replay_manager.get_replay_status(self.current_replay_session)
if not session:
return None
return {
'session_id': session.session_id,
'status': session.status.value,
'progress': session.progress,
'current_time': session.current_time.isoformat(),
'speed': session.speed,
'events_replayed': session.events_replayed,
'total_events': session.total_events
}
# === DATA QUALITY AND METADATA ===
def get_data_quality_indicators(self, symbol: str) -> Dict[str, Any]:
"""Get data quality indicators for a symbol."""
try:
# Get recent data statistics
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=5)
orderbook_data = asyncio.run(self.storage_manager.get_historical_data(
symbol, start_time, end_time, 'orderbook'
))
trade_data = asyncio.run(self.storage_manager.get_historical_data(
symbol, start_time, end_time, 'trades'
))
# Calculate quality metrics
quality = {
'symbol': symbol,
'timestamp': datetime.utcnow().isoformat(),
'orderbook_updates': len(orderbook_data) if orderbook_data else 0,
'trade_events': len(trade_data) if trade_data else 0,
'data_freshness_seconds': 0,
'exchange_coverage': [],
'quality_score': 0.0
}
# Calculate data freshness
if orderbook_data:
latest_timestamp = max(item['timestamp'] for item in orderbook_data)
quality['data_freshness_seconds'] = (
datetime.utcnow() - latest_timestamp
).total_seconds()
# Get exchange coverage
if orderbook_data:
exchanges = set(item['exchange'] for item in orderbook_data)
quality['exchange_coverage'] = list(exchanges)
# Calculate quality score (0-1)
score = 0.0
if quality['orderbook_updates'] > 0:
score += 0.4
if quality['trade_events'] > 0:
score += 0.3
if quality['data_freshness_seconds'] < 10:
score += 0.3
quality['quality_score'] = score
return quality
except Exception as e:
logger.error(f"Error getting data quality for {symbol}: {e}")
return {
'symbol': symbol,
'timestamp': datetime.utcnow().isoformat(),
'quality_score': 0.0,
'error': str(e)
}
def get_system_metadata(self) -> Dict[str, Any]:
"""Get system metadata and status."""
try:
return {
'system': 'COBY',
'version': '1.0.0',
'mode': self.mode,
'timestamp': datetime.utcnow().isoformat(),
'components': {
'storage': self.storage_manager.is_healthy(),
'redis': True, # Simplified check
'connectors': {
name: connector.is_connected
for name, connector in self.connectors.items()
}
},
'statistics': self.stats,
'replay_session': self.get_replay_status(),
'active_subscribers': sum(
len(subs) for subs in self.subscribers.values()
)
}
except Exception as e:
logger.error(f"Error getting system metadata: {e}")
return {'error': str(e)}
# === DATA HANDLERS ===
async def _handle_connector_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> None:
"""Handle data from exchange connectors."""
try:
# Store data
if isinstance(data, OrderBookSnapshot):
await self.storage_manager.store_orderbook(data)
self.stats['orderbooks_processed'] += 1
# Create market tick for subscribers
if data.bids and data.asks:
best_bid = max(data.bids, key=lambda x: x.price)
best_ask = min(data.asks, key=lambda x: x.price)
mid_price = (best_bid.price + best_ask.price) / 2
tick = MarketTick(
symbol=data.symbol,
price=mid_price,
volume=best_bid.size + best_ask.size,
timestamp=data.timestamp,
exchange=data.exchange
)
await self._notify_tick_subscribers(tick)
# Create COB data for subscribers
cob_data = {
'symbol': data.symbol,
'timestamp': data.timestamp.isoformat(),
'bids': [{'price': b.price, 'size': b.size} for b in data.bids[:10]],
'asks': [{'price': a.price, 'size': a.size} for a in data.asks[:10]],
'exchange': data.exchange
}
await self._notify_cob_raw_subscribers(data.symbol, cob_data)
elif isinstance(data, TradeEvent):
await self.storage_manager.store_trade(data)
self.stats['ticks_processed'] += 1
# Create market tick
tick = MarketTick(
symbol=data.symbol,
price=data.price,
volume=data.size,
timestamp=data.timestamp,
side=data.side,
exchange=data.exchange
)
await self._notify_tick_subscribers(tick)
except Exception as e:
logger.error(f"Error handling connector data: {e}")
async def _handle_replay_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> None:
"""Handle data from replay system."""
try:
# Process replay data same as live data
await self._handle_connector_data(data)
except Exception as e:
logger.error(f"Error handling replay data: {e}")
async def _notify_tick_subscribers(self, tick: MarketTick) -> None:
"""Notify tick subscribers."""
try:
with self.subscriber_lock:
subscribers = self.subscribers['ticks'].copy()
for subscriber_id, sub_info in subscribers.items():
try:
callback = sub_info['callback']
symbols = sub_info['symbols']
# Check if subscriber wants this symbol
if not symbols or tick.symbol in symbols:
if asyncio.iscoroutinefunction(callback):
await callback(tick)
else:
callback(tick)
except Exception as e:
logger.error(f"Error notifying tick subscriber {subscriber_id}: {e}")
except Exception as e:
logger.error(f"Error notifying tick subscribers: {e}")
async def _notify_cob_raw_subscribers(self, symbol: str, data: Dict) -> None:
"""Notify COB raw tick subscribers."""
try:
with self.subscriber_lock:
subscribers = self.subscribers['cob_raw'].copy()
for subscriber_id, sub_info in subscribers.items():
try:
callback = sub_info['callback']
if asyncio.iscoroutinefunction(callback):
await callback(symbol, data)
else:
callback(symbol, data)
except Exception as e:
logger.error(f"Error notifying COB raw subscriber {subscriber_id}: {e}")
except Exception as e:
logger.error(f"Error notifying COB raw subscribers: {e}")
# === UTILITY METHODS ===
def _parse_timeframe(self, timeframe: str) -> Optional[int]:
"""Parse timeframe string to minutes."""
try:
if timeframe.endswith('m'):
return int(timeframe[:-1])
elif timeframe.endswith('h'):
return int(timeframe[:-1]) * 60
elif timeframe.endswith('d'):
return int(timeframe[:-1]) * 24 * 60
else:
return None
except:
return None
def start_centralized_data_collection(self) -> None:
"""Start centralized data collection (compatibility method)."""
logger.info("Centralized data collection started (COBY mode)")
def start_training_data_collection(self) -> None:
"""Start training data collection (compatibility method)."""
logger.info("Training data collection started (COBY mode)")
def invalidate_ohlcv_cache(self, symbol: str) -> None:
"""Invalidate OHLCV cache for a symbol."""
try:
# Clear Redis cache for this symbol
cache_pattern = f"ohlcv:{symbol}:*"
asyncio.run(self.redis_manager.delete_pattern(cache_pattern))
# Clear local price cache
if symbol in self.price_cache:
del self.price_cache[symbol]
except Exception as e:
logger.error(f"Error invalidating cache for {symbol}: {e}")
async def close(self) -> None:
"""Close all connections and cleanup."""
try:
# Stop replay session
if self.current_replay_session:
await self.replay_manager.stop_replay(self.current_replay_session)
# Close connectors
for connector in self.connectors.values():
await connector.disconnect()
# Close storage
await self.storage_manager.close()
# Close Redis
await self.redis_manager.close()
logger.info("COBY orchestrator adapter closed")
except Exception as e:
logger.error(f"Error closing adapter: {e}")
def get_stats(self) -> Dict[str, Any]:
"""Get adapter statistics."""
return {
**self.stats,
'mode': self.mode,
'active_subscribers': sum(len(subs) for subs in self.subscribers.values()),
'cache_size': len(self.price_cache),
'replay_session': self.current_replay_session
}