bucket aggregation
This commit is contained in:
378
COBY/processing/data_processor.py
Normal file
378
COBY/processing/data_processor.py
Normal file
@ -0,0 +1,378 @@
|
||||
"""
|
||||
Main data processor implementation.
|
||||
"""
|
||||
|
||||
from typing import Dict, Union, List, Optional, Any
|
||||
from ..interfaces.data_processor import DataProcessor
|
||||
from ..models.core import OrderBookSnapshot, TradeEvent, OrderBookMetrics
|
||||
from ..utils.logging import get_logger, set_correlation_id
|
||||
from ..utils.exceptions import ValidationError, ProcessingError
|
||||
from ..utils.timing import get_current_timestamp
|
||||
from .quality_checker import DataQualityChecker
|
||||
from .anomaly_detector import AnomalyDetector
|
||||
from .metrics_calculator import MetricsCalculator
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class StandardDataProcessor(DataProcessor):
|
||||
"""
|
||||
Standard implementation of data processor interface.
|
||||
|
||||
Provides:
|
||||
- Data normalization and validation
|
||||
- Quality checking
|
||||
- Anomaly detection
|
||||
- Metrics calculation
|
||||
- Data enrichment
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize data processor with components"""
|
||||
self.quality_checker = DataQualityChecker()
|
||||
self.anomaly_detector = AnomalyDetector()
|
||||
self.metrics_calculator = MetricsCalculator()
|
||||
|
||||
# Processing statistics
|
||||
self.processed_orderbooks = 0
|
||||
self.processed_trades = 0
|
||||
self.quality_failures = 0
|
||||
self.anomalies_detected = 0
|
||||
|
||||
logger.info("Standard data processor initialized")
|
||||
|
||||
def normalize_orderbook(self, raw_data: Dict, exchange: str) -> OrderBookSnapshot:
|
||||
"""
|
||||
Normalize raw order book data to standard format.
|
||||
|
||||
Args:
|
||||
raw_data: Raw order book data from exchange
|
||||
exchange: Exchange name
|
||||
|
||||
Returns:
|
||||
OrderBookSnapshot: Normalized order book data
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
# This is a generic implementation - specific exchanges would override
|
||||
# For now, assume data is already in correct format
|
||||
if isinstance(raw_data, OrderBookSnapshot):
|
||||
return raw_data
|
||||
|
||||
# If raw_data is a dict, try to construct OrderBookSnapshot
|
||||
# This would be customized per exchange
|
||||
raise NotImplementedError(
|
||||
"normalize_orderbook should be implemented by exchange-specific processors"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error normalizing order book data: {e}")
|
||||
raise ProcessingError(f"Normalization failed: {e}", "NORMALIZE_ERROR")
|
||||
|
||||
def normalize_trade(self, raw_data: Dict, exchange: str) -> TradeEvent:
|
||||
"""
|
||||
Normalize raw trade data to standard format.
|
||||
|
||||
Args:
|
||||
raw_data: Raw trade data from exchange
|
||||
exchange: Exchange name
|
||||
|
||||
Returns:
|
||||
TradeEvent: Normalized trade data
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
# This is a generic implementation - specific exchanges would override
|
||||
if isinstance(raw_data, TradeEvent):
|
||||
return raw_data
|
||||
|
||||
# If raw_data is a dict, try to construct TradeEvent
|
||||
# This would be customized per exchange
|
||||
raise NotImplementedError(
|
||||
"normalize_trade should be implemented by exchange-specific processors"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error normalizing trade data: {e}")
|
||||
raise ProcessingError(f"Normalization failed: {e}", "NORMALIZE_ERROR")
|
||||
|
||||
def validate_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> bool:
|
||||
"""
|
||||
Validate normalized data for quality and consistency.
|
||||
|
||||
Args:
|
||||
data: Normalized data to validate
|
||||
|
||||
Returns:
|
||||
bool: True if data is valid, False otherwise
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
quality_score, issues = self.quality_checker.check_orderbook_quality(data)
|
||||
self.processed_orderbooks += 1
|
||||
|
||||
if quality_score < 0.5: # Threshold for acceptable quality
|
||||
self.quality_failures += 1
|
||||
logger.warning(f"Low quality order book data: score={quality_score:.2f}, issues={issues}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
elif isinstance(data, TradeEvent):
|
||||
quality_score, issues = self.quality_checker.check_trade_quality(data)
|
||||
self.processed_trades += 1
|
||||
|
||||
if quality_score < 0.5:
|
||||
self.quality_failures += 1
|
||||
logger.warning(f"Low quality trade data: score={quality_score:.2f}, issues={issues}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
else:
|
||||
logger.error(f"Unknown data type for validation: {type(data)}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating data: {e}")
|
||||
return False
|
||||
|
||||
def calculate_metrics(self, orderbook: OrderBookSnapshot) -> OrderBookMetrics:
|
||||
"""
|
||||
Calculate metrics from order book data.
|
||||
|
||||
Args:
|
||||
orderbook: Order book snapshot
|
||||
|
||||
Returns:
|
||||
OrderBookMetrics: Calculated metrics
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
return self.metrics_calculator.calculate_orderbook_metrics(orderbook)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating metrics: {e}")
|
||||
raise ProcessingError(f"Metrics calculation failed: {e}", "METRICS_ERROR")
|
||||
|
||||
def detect_anomalies(self, data: Union[OrderBookSnapshot, TradeEvent]) -> List[str]:
|
||||
"""
|
||||
Detect anomalies in the data.
|
||||
|
||||
Args:
|
||||
data: Data to analyze for anomalies
|
||||
|
||||
Returns:
|
||||
List[str]: List of detected anomaly descriptions
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
anomalies = self.anomaly_detector.detect_orderbook_anomalies(data)
|
||||
elif isinstance(data, TradeEvent):
|
||||
anomalies = self.anomaly_detector.detect_trade_anomalies(data)
|
||||
else:
|
||||
logger.error(f"Unknown data type for anomaly detection: {type(data)}")
|
||||
return ["Unknown data type"]
|
||||
|
||||
if anomalies:
|
||||
self.anomalies_detected += len(anomalies)
|
||||
|
||||
return anomalies
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error detecting anomalies: {e}")
|
||||
return [f"Anomaly detection error: {e}"]
|
||||
|
||||
def filter_data(self, data: Union[OrderBookSnapshot, TradeEvent], criteria: Dict) -> bool:
|
||||
"""
|
||||
Filter data based on criteria.
|
||||
|
||||
Args:
|
||||
data: Data to filter
|
||||
criteria: Filtering criteria
|
||||
|
||||
Returns:
|
||||
bool: True if data passes filter, False otherwise
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
# Symbol filter
|
||||
if 'symbols' in criteria:
|
||||
allowed_symbols = criteria['symbols']
|
||||
if data.symbol not in allowed_symbols:
|
||||
return False
|
||||
|
||||
# Exchange filter
|
||||
if 'exchanges' in criteria:
|
||||
allowed_exchanges = criteria['exchanges']
|
||||
if data.exchange not in allowed_exchanges:
|
||||
return False
|
||||
|
||||
# Quality filter
|
||||
if 'min_quality' in criteria:
|
||||
min_quality = criteria['min_quality']
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
quality_score, _ = self.quality_checker.check_orderbook_quality(data)
|
||||
elif isinstance(data, TradeEvent):
|
||||
quality_score, _ = self.quality_checker.check_trade_quality(data)
|
||||
else:
|
||||
quality_score = 0.0
|
||||
|
||||
if quality_score < min_quality:
|
||||
return False
|
||||
|
||||
# Price range filter
|
||||
if 'price_range' in criteria:
|
||||
price_range = criteria['price_range']
|
||||
min_price, max_price = price_range
|
||||
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
price = data.mid_price
|
||||
elif isinstance(data, TradeEvent):
|
||||
price = data.price
|
||||
else:
|
||||
return False
|
||||
|
||||
if price and (price < min_price or price > max_price):
|
||||
return False
|
||||
|
||||
# Volume filter for trades
|
||||
if 'min_volume' in criteria and isinstance(data, TradeEvent):
|
||||
min_volume = criteria['min_volume']
|
||||
if data.size < min_volume:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error filtering data: {e}")
|
||||
return False
|
||||
|
||||
def enrich_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> Dict:
|
||||
"""
|
||||
Enrich data with additional metadata.
|
||||
|
||||
Args:
|
||||
data: Data to enrich
|
||||
|
||||
Returns:
|
||||
Dict: Enriched data with metadata
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
enriched = {
|
||||
'original_data': data,
|
||||
'processing_timestamp': get_current_timestamp(),
|
||||
'processor_version': '1.0.0'
|
||||
}
|
||||
|
||||
# Add quality metrics
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
quality_score, quality_issues = self.quality_checker.check_orderbook_quality(data)
|
||||
enriched['quality_score'] = quality_score
|
||||
enriched['quality_issues'] = quality_issues
|
||||
|
||||
# Add calculated metrics
|
||||
try:
|
||||
metrics = self.calculate_metrics(data)
|
||||
enriched['metrics'] = {
|
||||
'mid_price': metrics.mid_price,
|
||||
'spread': metrics.spread,
|
||||
'spread_percentage': metrics.spread_percentage,
|
||||
'volume_imbalance': metrics.volume_imbalance,
|
||||
'depth_10': metrics.depth_10,
|
||||
'depth_50': metrics.depth_50
|
||||
}
|
||||
except Exception as e:
|
||||
enriched['metrics_error'] = str(e)
|
||||
|
||||
# Add liquidity score
|
||||
try:
|
||||
liquidity_score = self.metrics_calculator.calculate_liquidity_score(data)
|
||||
enriched['liquidity_score'] = liquidity_score
|
||||
except Exception as e:
|
||||
enriched['liquidity_error'] = str(e)
|
||||
|
||||
elif isinstance(data, TradeEvent):
|
||||
quality_score, quality_issues = self.quality_checker.check_trade_quality(data)
|
||||
enriched['quality_score'] = quality_score
|
||||
enriched['quality_issues'] = quality_issues
|
||||
|
||||
# Add trade-specific enrichments
|
||||
enriched['trade_value'] = data.price * data.size
|
||||
enriched['side_numeric'] = 1 if data.side == 'buy' else -1
|
||||
|
||||
# Add anomaly detection results
|
||||
anomalies = self.detect_anomalies(data)
|
||||
enriched['anomalies'] = anomalies
|
||||
enriched['anomaly_count'] = len(anomalies)
|
||||
|
||||
return enriched
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error enriching data: {e}")
|
||||
return {
|
||||
'original_data': data,
|
||||
'enrichment_error': str(e)
|
||||
}
|
||||
|
||||
def get_data_quality_score(self, data: Union[OrderBookSnapshot, TradeEvent]) -> float:
|
||||
"""
|
||||
Calculate data quality score.
|
||||
|
||||
Args:
|
||||
data: Data to score
|
||||
|
||||
Returns:
|
||||
float: Quality score between 0.0 and 1.0
|
||||
"""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
if isinstance(data, OrderBookSnapshot):
|
||||
quality_score, _ = self.quality_checker.check_orderbook_quality(data)
|
||||
elif isinstance(data, TradeEvent):
|
||||
quality_score, _ = self.quality_checker.check_trade_quality(data)
|
||||
else:
|
||||
logger.error(f"Unknown data type for quality scoring: {type(data)}")
|
||||
return 0.0
|
||||
|
||||
return quality_score
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating quality score: {e}")
|
||||
return 0.0
|
||||
|
||||
def get_processing_stats(self) -> Dict[str, Any]:
|
||||
"""Get processing statistics"""
|
||||
return {
|
||||
'processed_orderbooks': self.processed_orderbooks,
|
||||
'processed_trades': self.processed_trades,
|
||||
'quality_failures': self.quality_failures,
|
||||
'anomalies_detected': self.anomalies_detected,
|
||||
'quality_failure_rate': (
|
||||
self.quality_failures / max(1, self.processed_orderbooks + self.processed_trades)
|
||||
),
|
||||
'anomaly_rate': (
|
||||
self.anomalies_detected / max(1, self.processed_orderbooks + self.processed_trades)
|
||||
),
|
||||
'quality_checker_summary': self.quality_checker.get_quality_summary(),
|
||||
'anomaly_detector_stats': self.anomaly_detector.get_statistics()
|
||||
}
|
||||
|
||||
def reset_stats(self) -> None:
|
||||
"""Reset processing statistics"""
|
||||
self.processed_orderbooks = 0
|
||||
self.processed_trades = 0
|
||||
self.quality_failures = 0
|
||||
self.anomalies_detected = 0
|
||||
|
||||
logger.info("Processing statistics reset")
|
Reference in New Issue
Block a user