unified cache. LLM report

This commit is contained in:
Dobromir Popov
2025-10-20 11:16:27 +03:00
parent f464a412dc
commit ba8813f04f
6 changed files with 1943 additions and 1 deletions

View File

@@ -55,6 +55,7 @@ from .williams_market_structure import WilliamsMarketStructure, PivotPoint, Tren
from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket
from .huobi_cob_websocket import get_huobi_cob_websocket
from .cob_integration import COBIntegration
from .report_data_crawler import ReportDataCrawler, ReportData
logger = logging.getLogger(__name__)
@@ -169,6 +170,9 @@ class DataProvider:
self.enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None
self.websocket_tasks = {}
# Report data crawler for comprehensive trading reports
self.report_crawler: Optional[ReportDataCrawler] = None
# COB collection state guard to prevent duplicate starts
self._cob_started: bool = False
@@ -3516,3 +3520,71 @@ class DataProvider:
except Exception as e:
logger.error(f"Error creating transformer sequences for inference: {e}")
return []
# === REPORT DATA CRAWLER METHODS ===
def _get_report_crawler(self) -> ReportDataCrawler:
"""Get or initialize the report data crawler"""
if self.report_crawler is None:
self.report_crawler = ReportDataCrawler(data_provider=self)
return self.report_crawler
def crawl_comprehensive_report(self, symbol: str) -> Optional[ReportData]:
"""Crawl comprehensive report data for a trading pair
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
ReportData object with all required information, or None if failed
"""
try:
crawler = self._get_report_crawler()
return crawler.crawl_report_data(symbol)
except Exception as e:
logger.error(f"Error crawling comprehensive report for {symbol}: {e}")
return None
def generate_trading_report(self, symbol: str) -> Optional[str]:
"""Generate formatted trading report for a symbol
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
Formatted report string, or None if failed
"""
try:
crawler = self._get_report_crawler()
return crawler.crawl_and_generate_report(symbol)
except Exception as e:
logger.error(f"Error generating trading report for {symbol}: {e}")
return None
def get_report_data_for_multiple_pairs(self, symbols: List[str]) -> Dict[str, Optional[ReportData]]:
"""Get report data for multiple trading pairs
Args:
symbols: List of trading pair symbols
Returns:
Dictionary mapping symbols to their ReportData objects
"""
try:
results = {}
crawler = self._get_report_crawler()
for symbol in symbols:
try:
report_data = crawler.crawl_report_data(symbol)
results[symbol] = report_data
logger.info(f"Crawled report data for {symbol}: {'Success' if report_data else 'Failed'}")
except Exception as e:
logger.error(f"Error crawling report data for {symbol}: {e}")
results[symbol] = None
return results
except Exception as e:
logger.error(f"Error getting report data for multiple pairs: {e}")
return {}

799
core/report_data_crawler.py Normal file
View File

@@ -0,0 +1,799 @@
"""
Report Data Crawler Sub-Module
This module provides functionality to crawl and compile comprehensive trading reports
in the specified format for different trading pairs. It extends the DataProvider
with specialized methods for report generation.
Enhanced with:
- Polymarket bets/odds data for sentiment analysis
- Multiple open interest data sources for improved reliability
- Real-time market sentiment from prediction markets
CRITICAL POLICY: NO SYNTHETIC DATA ALLOWED
This module MUST ONLY use real market data from exchanges.
NEVER use np.random.*, mock/fake/synthetic data, or placeholder values.
If data is unavailable: return None/0/empty, log errors, raise exceptions.
"""
import logging
import pandas as pd
import numpy as np
import requests
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
import ta
import warnings
# Suppress ta library deprecation warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="ta")
logger = logging.getLogger(__name__)
@dataclass
class PolymarketData:
"""Polymarket prediction market data"""
market_id: str
question: str
outcome: str
probability: float
volume_24h: float
liquidity: float
last_trade_time: datetime
market_type: str # 'crypto', 'election', 'sports', etc.
@dataclass
class OpenInterestData:
"""Open interest data from multiple sources"""
source: str
symbol: str
open_interest: float
timestamp: datetime
change_24h: float
change_percent: float
@dataclass
class ReportData:
"""Data structure for comprehensive trading report"""
symbol: str
timestamp: datetime
# Current values
current_price: float
current_ema20: float
current_macd: float
current_rsi_7: float
# Enhanced Open Interest and Funding Rate (multiple sources)
open_interest_data: List[OpenInterestData]
funding_rate: float
# Polymarket sentiment data
polymarket_data: List[PolymarketData]
market_sentiment_score: float
# Intraday series (3-minute intervals, oldest → newest)
mid_prices: List[float]
ema_20_series: List[float]
macd_series: List[float]
rsi_7_series: List[float]
rsi_14_series: List[float]
# Longer-term context (4-hour timeframe)
ema_20_4h: float
ema_50_4h: float
atr_3_4h: float
atr_14_4h: float
current_volume: float
average_volume: float
macd_4h_series: List[float]
rsi_14_4h_series: List[float]
class ReportDataCrawler:
"""Specialized crawler for compiling comprehensive trading reports"""
def __init__(self, data_provider=None):
"""Initialize the report data crawler
Args:
data_provider: Instance of DataProvider to use for data fetching
"""
self.data_provider = data_provider
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# Cache for API responses to avoid rate limiting
self._cache = {}
self._cache_timeout = 30 # 30 seconds cache
# Polymarket API configuration
self.polymarket_base_url = "https://gamma-api.polymarket.com"
self.polymarket_graphql_url = f"{self.polymarket_base_url}/graphql"
# Open interest data sources
self.open_interest_sources = {
'binance': 'https://fapi.binance.com/fapi/v1/openInterest',
'bybit': 'https://api.bybit.com/v5/market/open-interest',
'okx': 'https://www.okx.com/api/v5/public/open-interest',
'coinglass': 'https://open-api.coinglass.com/public/v2/open_interest'
}
def _get_cached_or_fetch(self, cache_key: str, fetch_func, *args, **kwargs):
"""Get data from cache or fetch if expired"""
now = time.time()
if cache_key in self._cache:
data, timestamp = self._cache[cache_key]
if now - timestamp < self._cache_timeout:
return data
# Fetch new data
data = fetch_func(*args, **kwargs)
if data is not None:
self._cache[cache_key] = (data, now)
return data
def _fetch_polymarket_data(self, symbol: str) -> List[PolymarketData]:
"""Fetch Polymarket prediction market data for cryptocurrency-related markets
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
List of PolymarketData objects
"""
try:
cache_key = f"polymarket_{symbol}"
cached_data = self._get_cached_or_fetch(cache_key, self._fetch_polymarket_raw, symbol)
if not cached_data:
return []
polymarket_data = []
base_symbol = symbol.split('/')[0].lower() # Extract BTC from BTC/USDT
for market in cached_data:
try:
# Filter for crypto-related markets
if any(keyword in market.get('question', '').lower() for keyword in [base_symbol, 'bitcoin', 'ethereum', 'crypto', 'cryptocurrency']):
polymarket_data.append(PolymarketData(
market_id=market.get('id', ''),
question=market.get('question', ''),
outcome=market.get('outcome', ''),
probability=float(market.get('probability', 0)),
volume_24h=float(market.get('volume24h', 0)),
liquidity=float(market.get('liquidity', 0)),
last_trade_time=datetime.fromisoformat(market.get('lastTradeTime', datetime.now().isoformat())),
market_type='crypto'
))
except Exception as e:
logger.warning(f"Error parsing Polymarket market data: {e}")
continue
logger.info(f"Fetched {len(polymarket_data)} Polymarket markets for {symbol}")
return polymarket_data
except Exception as e:
logger.error(f"Error fetching Polymarket data for {symbol}: {e}")
return []
def _fetch_polymarket_raw(self, symbol: str) -> List[Dict]:
"""Fetch raw Polymarket data using GraphQL API"""
try:
# GraphQL query for active markets
query = """
query GetMarkets($limit: Int!, $offset: Int!) {
markets(
limit: $limit
offset: $offset
orderBy: "volume"
orderDirection: "desc"
where: {
active: true
closed: false
}
) {
id
question
outcome
probability
volume24h
liquidity
lastTradeTime
endDate
}
}
"""
variables = {
"limit": 50,
"offset": 0
}
payload = {
"query": query,
"variables": variables
}
response = self.session.post(
self.polymarket_graphql_url,
json=payload,
timeout=10,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
data = response.json()
if 'data' in data and 'markets' in data['data']:
return data['data']['markets']
logger.warning(f"Polymarket API returned status {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching raw Polymarket data: {e}")
return []
def _calculate_market_sentiment_score(self, polymarket_data: List[PolymarketData], symbol: str) -> float:
"""Calculate market sentiment score from Polymarket data
Args:
polymarket_data: List of PolymarketData objects
symbol: Trading pair symbol
Returns:
Sentiment score between -1 (bearish) and 1 (bullish)
"""
try:
if not polymarket_data:
return 0.0
base_symbol = symbol.split('/')[0].lower()
sentiment_scores = []
for data in polymarket_data:
# Weight by volume and liquidity
weight = (data.volume_24h + data.liquidity) / 1000000 # Normalize
# Extract sentiment from probability
if 'above' in data.question.lower() or 'higher' in data.question.lower():
# Bullish sentiment
sentiment = (data.probability - 0.5) * 2 # Convert 0-1 to -1 to 1
elif 'below' in data.question.lower() or 'lower' in data.question.lower():
# Bearish sentiment
sentiment = (0.5 - data.probability) * 2 # Convert 0-1 to -1 to 1
else:
# Neutral or unclear sentiment
sentiment = 0.0
sentiment_scores.append(sentiment * weight)
if sentiment_scores:
# Weighted average sentiment
total_weight = sum((data.volume_24h + data.liquidity) / 1000000 for data in polymarket_data)
if total_weight > 0:
return sum(sentiment_scores) / total_weight
return 0.0
except Exception as e:
logger.error(f"Error calculating market sentiment score: {e}")
return 0.0
def _fetch_open_interest_from_multiple_sources(self, symbol: str) -> List[OpenInterestData]:
"""Fetch open interest data from multiple sources for improved reliability
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
List of OpenInterestData objects from different sources
"""
try:
open_interest_data = []
binance_symbol = symbol.replace('/', '')
# Source 1: Binance
try:
cache_key = f"binance_oi_{symbol}"
binance_data = self._get_cached_or_fetch(cache_key, self._fetch_binance_open_interest, binance_symbol)
if binance_data:
open_interest_data.append(OpenInterestData(
source='binance',
symbol=symbol,
open_interest=binance_data['openInterest'],
timestamp=datetime.now(),
change_24h=binance_data.get('change24h', 0),
change_percent=binance_data.get('changePercent', 0)
))
except Exception as e:
logger.warning(f"Failed to fetch Binance open interest: {e}")
# Source 2: Bybit
try:
cache_key = f"bybit_oi_{symbol}"
bybit_data = self._get_cached_or_fetch(cache_key, self._fetch_bybit_open_interest, symbol)
if bybit_data:
open_interest_data.append(OpenInterestData(
source='bybit',
symbol=symbol,
open_interest=bybit_data['openInterest'],
timestamp=datetime.now(),
change_24h=bybit_data.get('change24h', 0),
change_percent=bybit_data.get('changePercent', 0)
))
except Exception as e:
logger.warning(f"Failed to fetch Bybit open interest: {e}")
# Source 3: OKX
try:
cache_key = f"okx_oi_{symbol}"
okx_data = self._get_cached_or_fetch(cache_key, self._fetch_okx_open_interest, symbol)
if okx_data:
open_interest_data.append(OpenInterestData(
source='okx',
symbol=symbol,
open_interest=okx_data['openInterest'],
timestamp=datetime.now(),
change_24h=okx_data.get('change24h', 0),
change_percent=okx_data.get('changePercent', 0)
))
except Exception as e:
logger.warning(f"Failed to fetch OKX open interest: {e}")
logger.info(f"Fetched open interest data from {len(open_interest_data)} sources for {symbol}")
return open_interest_data
except Exception as e:
logger.error(f"Error fetching open interest from multiple sources for {symbol}: {e}")
return []
def _fetch_binance_open_interest(self, symbol: str) -> Dict:
"""Fetch open interest data from Binance"""
try:
url = "https://fapi.binance.com/fapi/v1/openInterest"
params = {'symbol': symbol}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json()
return {}
except Exception as e:
logger.error(f"Error fetching Binance open interest: {e}")
return {}
def _fetch_bybit_open_interest(self, symbol: str) -> Dict:
"""Fetch open interest data from Bybit"""
try:
# Convert BTC/USDT to BTCUSDT for Bybit
bybit_symbol = symbol.replace('/', '')
url = "https://api.bybit.com/v5/market/open-interest"
params = {
'category': 'linear',
'symbol': bybit_symbol,
'intervalTime': '5min'
}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if 'result' in data and 'list' in data['result'] and data['result']['list']:
latest = data['result']['list'][0]
return {
'openInterest': float(latest['openInterest']),
'change24h': float(latest.get('change24h', 0)),
'changePercent': float(latest.get('changePercent', 0))
}
return {}
except Exception as e:
logger.error(f"Error fetching Bybit open interest: {e}")
return {}
def _fetch_okx_open_interest(self, symbol: str) -> Dict:
"""Fetch open interest data from OKX"""
try:
# Convert BTC/USDT to BTC-USDT-SWAP for OKX
okx_symbol = symbol.replace('/', '-') + '-SWAP'
url = "https://www.okx.com/api/v5/public/open-interest"
params = {'instId': okx_symbol}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if 'data' in data and data['data']:
latest = data['data'][0]
return {
'openInterest': float(latest['oi']),
'change24h': float(latest.get('oiCcy', 0)),
'changePercent': 0 # OKX doesn't provide percentage change
}
return {}
except Exception as e:
logger.error(f"Error fetching OKX open interest: {e}")
return {}
def _fetch_funding_rate(self, symbol: str) -> float:
"""Fetch funding rate from Binance"""
try:
binance_symbol = symbol.replace('/', '')
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
params = {'symbol': binance_symbol}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return float(data.get('lastFundingRate', 0))
return 0.0
except Exception as e:
logger.error(f"Error fetching funding rate: {e}")
return 0.0
def _calculate_technical_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate technical indicators for the given dataframe"""
try:
if df.empty or len(df) < 5:
logger.warning("Insufficient data for technical indicators")
return {}
indicators = {}
# EMA 20 (need at least 20 periods)
if len(df) >= 20:
indicators['ema_20'] = ta.trend.ema_indicator(df['close'], window=20)
else:
indicators['ema_20'] = pd.Series(index=df.index, dtype=float)
# MACD (need at least 26 periods)
if len(df) >= 26:
macd = ta.trend.MACD(df['close'])
indicators['macd'] = macd.macd()
indicators['macd_signal'] = macd.macd_signal()
indicators['macd_histogram'] = macd.macd_diff()
else:
indicators['macd'] = pd.Series(index=df.index, dtype=float)
indicators['macd_signal'] = pd.Series(index=df.index, dtype=float)
indicators['macd_histogram'] = pd.Series(index=df.index, dtype=float)
# RSI (7 and 14 period) - need at least 14 periods
if len(df) >= 14:
indicators['rsi_7'] = self._calculate_rsi(df['close'], period=7)
indicators['rsi_14'] = self._calculate_rsi(df['close'], period=14)
else:
indicators['rsi_7'] = pd.Series(index=df.index, dtype=float)
indicators['rsi_14'] = pd.Series(index=df.index, dtype=float)
# ATR (need at least 14 periods)
if len(df) >= 14:
indicators['atr_3'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=3)
indicators['atr_14'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=14)
else:
indicators['atr_3'] = pd.Series(index=df.index, dtype=float)
indicators['atr_14'] = pd.Series(index=df.index, dtype=float)
# EMA 50 (need at least 50 periods)
if len(df) >= 50:
indicators['ema_50'] = ta.trend.ema_indicator(df['close'], window=50)
else:
indicators['ema_50'] = pd.Series(index=df.index, dtype=float)
return indicators
except Exception as e:
logger.error(f"Error calculating technical indicators: {e}")
return {}
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""Calculate RSI using our own implementation"""
try:
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
except Exception as e:
logger.error(f"Error calculating RSI: {e}")
return pd.Series(index=prices.index, dtype=float)
def _get_intraday_series(self, symbol: str, limit: int = 10) -> Dict[str, List[float]]:
"""Get intraday series data (3-minute intervals)"""
try:
if not self.data_provider:
logger.error("No data provider available")
return {}
# Get 1-minute data and resample to 3-minute intervals
df_1m = self.data_provider.get_historical_data(symbol, '1m', limit=limit*3, refresh=True)
if df_1m is None or df_1m.empty:
logger.warning(f"No 1-minute data available for {symbol}")
return {}
# Resample to 3-minute intervals
df_3m = df_1m.resample('3min').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
if len(df_3m) < limit:
logger.warning(f"Insufficient 3-minute data: {len(df_3m)} < {limit}")
return {}
# Take the last 'limit' periods
df_3m = df_3m.tail(limit)
# Calculate technical indicators
indicators = self._calculate_technical_indicators(df_3m)
# Extract series data
series_data = {
'mid_prices': df_3m['close'].tolist(),
'ema_20_series': indicators.get('ema_20', pd.Series()).tolist(),
'macd_series': indicators.get('macd', pd.Series()).tolist(),
'rsi_7_series': indicators.get('rsi_7', pd.Series()).tolist(),
'rsi_14_series': indicators.get('rsi_14', pd.Series()).tolist()
}
return series_data
except Exception as e:
logger.error(f"Error getting intraday series for {symbol}: {e}")
return {}
def _get_longer_term_context(self, symbol: str, limit: int = 10) -> Dict[str, Any]:
"""Get longer-term context data (4-hour timeframe)"""
try:
if not self.data_provider:
logger.error("No data provider available")
return {}
# Get 4-hour data
df_4h = self.data_provider.get_historical_data(symbol, '4h', limit=limit, refresh=True)
if df_4h is None or df_4h.empty:
logger.warning(f"No 4-hour data available for {symbol}")
return {}
# Calculate technical indicators
indicators = self._calculate_technical_indicators(df_4h)
# Calculate volume metrics
current_volume = df_4h['volume'].iloc[-1] if not df_4h.empty else 0
average_volume = df_4h['volume'].mean() if not df_4h.empty else 0
context_data = {
'ema_20_4h': indicators.get('ema_20', pd.Series()).iloc[-1] if not indicators.get('ema_20', pd.Series()).empty else 0,
'ema_50_4h': indicators.get('ema_50', pd.Series()).iloc[-1] if not indicators.get('ema_50', pd.Series()).empty else 0,
'atr_3_4h': indicators.get('atr_3', pd.Series()).iloc[-1] if not indicators.get('atr_3', pd.Series()).empty else 0,
'atr_14_4h': indicators.get('atr_14', pd.Series()).iloc[-1] if not indicators.get('atr_14', pd.Series()).empty else 0,
'current_volume': current_volume,
'average_volume': average_volume,
'macd_4h_series': indicators.get('macd', pd.Series()).tolist(),
'rsi_14_4h_series': indicators.get('rsi_14', pd.Series()).tolist()
}
return context_data
except Exception as e:
logger.error(f"Error getting longer-term context for {symbol}: {e}")
return {}
def crawl_report_data(self, symbol: str) -> Optional[ReportData]:
"""Crawl comprehensive report data for a trading pair
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
ReportData object with all required information, or None if failed
"""
try:
logger.info(f"Crawling report data for {symbol}")
if not self.data_provider:
logger.error("No data provider available for crawling")
return None
# Force refresh data to ensure we have current information
logger.info(f"Refreshing data for {symbol}...")
for timeframe in ['1m', '1h', '4h']:
try:
self.data_provider.get_historical_data(symbol, timeframe, limit=100, refresh=True)
except Exception as e:
logger.warning(f"Could not refresh {timeframe} data for {symbol}: {e}")
# Get current price and basic data
current_price = self.data_provider.get_current_price(symbol)
if current_price is None:
logger.error(f"Could not get current price for {symbol}")
return None
# Get 1-minute data for current indicators
df_1m = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=True)
if df_1m is None or df_1m.empty:
logger.error(f"Could not get 1-minute data for {symbol}")
return None
# Calculate current technical indicators
indicators = self._calculate_technical_indicators(df_1m)
# Get current indicator values
current_ema20 = indicators.get('ema_20', pd.Series()).iloc[-1] if not indicators.get('ema_20', pd.Series()).empty else 0
current_macd = indicators.get('macd', pd.Series()).iloc[-1] if not indicators.get('macd', pd.Series()).empty else 0
current_rsi_7 = indicators.get('rsi_7', pd.Series()).iloc[-1] if not indicators.get('rsi_7', pd.Series()).empty else 0
# Fetch enhanced open interest data from multiple sources
open_interest_data = self._fetch_open_interest_from_multiple_sources(symbol)
# Fetch funding rate (still using Binance as primary source)
funding_rate = self._fetch_funding_rate(symbol)
# Fetch Polymarket sentiment data
polymarket_data = self._fetch_polymarket_data(symbol)
market_sentiment_score = self._calculate_market_sentiment_score(polymarket_data, symbol)
# Get intraday series data
intraday_data = self._get_intraday_series(symbol, limit=10)
# Get longer-term context
longer_term_data = self._get_longer_term_context(symbol, limit=10)
# Create ReportData object
report_data = ReportData(
symbol=symbol,
timestamp=datetime.now(),
current_price=current_price,
current_ema20=current_ema20,
current_macd=current_macd,
current_rsi_7=current_rsi_7,
open_interest_data=open_interest_data,
funding_rate=funding_rate,
polymarket_data=polymarket_data,
market_sentiment_score=market_sentiment_score,
mid_prices=intraday_data.get('mid_prices', []),
ema_20_series=intraday_data.get('ema_20_series', []),
macd_series=intraday_data.get('macd_series', []),
rsi_7_series=intraday_data.get('rsi_7_series', []),
rsi_14_series=intraday_data.get('rsi_14_series', []),
ema_20_4h=longer_term_data.get('ema_20_4h', 0),
ema_50_4h=longer_term_data.get('ema_50_4h', 0),
atr_3_4h=longer_term_data.get('atr_3_4h', 0),
atr_14_4h=longer_term_data.get('atr_14_4h', 0),
current_volume=longer_term_data.get('current_volume', 0),
average_volume=longer_term_data.get('average_volume', 0),
macd_4h_series=longer_term_data.get('macd_4h_series', []),
rsi_14_4h_series=longer_term_data.get('rsi_14_4h_series', [])
)
logger.info(f"Successfully crawled report data for {symbol}")
return report_data
except Exception as e:
logger.error(f"Error crawling report data for {symbol}: {e}")
return None
def generate_report_text(self, report_data: ReportData) -> str:
"""Generate formatted report text in the specified format
Args:
report_data: ReportData object with all required information
Returns:
Formatted report string
"""
try:
if not report_data:
return "Error: No report data available"
# Format current values
current_values = (
f"current_price = {report_data.current_price:.1f}, "
f"current_ema20 = {report_data.current_ema20:.3f}, "
f"current_macd = {report_data.current_macd:.3f}, "
f"current_rsi (7 period) = {report_data.current_rsi_7:.3f}"
)
# Format enhanced open interest data from multiple sources
oi_section = "Open Interest (Multiple Sources):\n"
if report_data.open_interest_data:
for oi_data in report_data.open_interest_data:
try:
open_interest_val = float(oi_data.open_interest) if oi_data.open_interest else 0
change_percent_val = float(oi_data.change_percent) if oi_data.change_percent else 0
oi_section += f" {oi_data.source.upper()}: {open_interest_val:,.2f} "
if change_percent_val != 0:
oi_section += f"(24h: {change_percent_val:+.2f}%)\n"
else:
oi_section += "\n"
except (ValueError, TypeError) as e:
logger.warning(f"Error formatting open interest data: {e}")
oi_section += f" {oi_data.source.upper()}: Error formatting data\n"
else:
oi_section += " No open interest data available\n"
# Format funding rate
funding_section = f"Funding Rate: {report_data.funding_rate:.2e}\n"
# Format Polymarket sentiment data
sentiment_section = ""
if report_data.polymarket_data:
sentiment_section = f"\nPolymarket Sentiment Analysis:\n"
sentiment_section += f"Market Sentiment Score: {report_data.market_sentiment_score:.3f} "
if report_data.market_sentiment_score > 0.1:
sentiment_section += "(Bullish)\n"
elif report_data.market_sentiment_score < -0.1:
sentiment_section += "(Bearish)\n"
else:
sentiment_section += "(Neutral)\n"
sentiment_section += f"Active Markets: {len(report_data.polymarket_data)}\n"
for market in report_data.polymarket_data[:3]: # Show top 3 markets
sentiment_section += f" - {market.question[:50]}... (Prob: {market.probability:.2f})\n"
else:
sentiment_section = "\nPolymarket Sentiment: No data available\n"
# Format intraday series
intraday_section = f"""Intraday series (by minute, oldest → latest):
Mid prices: {report_data.mid_prices}
EMA indicators (20period): {report_data.ema_20_series}
MACD indicators: {report_data.macd_series}
RSI indicators (7Period): {report_data.rsi_7_series}
RSI indicators (14Period): {report_data.rsi_14_series}"""
# Format longer-term context
longer_term_section = f"""Longerterm context (4hour timeframe):
20Period EMA: {report_data.ema_20_4h:.3f} vs. 50Period EMA: {report_data.ema_50_4h:.3f}
3Period ATR: {report_data.atr_3_4h:.3f} vs. 14Period ATR: {report_data.atr_14_4h:.3f}
Current Volume: {report_data.current_volume:.3f} vs. Average Volume: {report_data.average_volume:.3f}
MACD indicators: {report_data.macd_4h_series}
RSI indicators (14Period): {report_data.rsi_14_4h_series}"""
# Combine all sections
report_text = f"""ALL OF THE PRICE OR SIGNAL DATA BELOW IS ORDERED: OLDEST → NEWEST
Timeframes note: Unless stated otherwise in a section title, intraday series are provided at 3minute intervals. If a coin uses a different interval, it is explicitly stated in that coin's section.
{current_values}
In addition, here is the latest {report_data.symbol} open interest and funding rate for perps (the instrument you are trading):
{oi_section}
{funding_section}{sentiment_section}
{intraday_section}
{longer_term_section}"""
return report_text
except Exception as e:
logger.error(f"Error generating report text: {e}")
return f"Error generating report: {e}"
def crawl_and_generate_report(self, symbol: str) -> Optional[str]:
"""Crawl data and generate formatted report in one call
Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
Returns:
Formatted report string, or None if failed
"""
try:
report_data = self.crawl_report_data(symbol)
if report_data:
return self.generate_report_text(report_data)
else:
return None
except Exception as e:
logger.error(f"Error in crawl_and_generate_report for {symbol}: {e}")
return None

View File

@@ -0,0 +1,432 @@
"""
Unified Data Models for the storage system.
Standardized data structures for all components.
"""
import pandas as pd
import numpy as np
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class InferenceDataFrame:
"""
Complete inference data for a single timestamp.
Contains all data needed for model inference including multi-timeframe OHLCV,
order book data, imbalances, and technical indicators.
"""
symbol: str
timestamp: datetime
# Multi-timeframe OHLCV data
ohlcv_1s: pd.DataFrame = field(default_factory=pd.DataFrame)
ohlcv_1m: pd.DataFrame = field(default_factory=pd.DataFrame)
ohlcv_5m: pd.DataFrame = field(default_factory=pd.DataFrame)
ohlcv_15m: pd.DataFrame = field(default_factory=pd.DataFrame)
ohlcv_1h: pd.DataFrame = field(default_factory=pd.DataFrame)
ohlcv_1d: pd.DataFrame = field(default_factory=pd.DataFrame)
# Order book data
orderbook_snapshot: Optional[Dict] = None
orderbook_1s_agg: pd.DataFrame = field(default_factory=pd.DataFrame)
# Imbalance metrics (multi-timeframe)
imbalances: pd.DataFrame = field(default_factory=pd.DataFrame)
# Technical indicators (pre-calculated from latest candle)
indicators: Dict[str, float] = field(default_factory=dict)
# Context window data (±N minutes around timestamp)
context_data: Optional[pd.DataFrame] = None
# Metadata
data_source: str = 'unknown' # 'cache' or 'database'
query_latency_ms: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'symbol': self.symbol,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'ohlcv_1s': self.ohlcv_1s.to_dict('records') if not self.ohlcv_1s.empty else [],
'ohlcv_1m': self.ohlcv_1m.to_dict('records') if not self.ohlcv_1m.empty else [],
'ohlcv_5m': self.ohlcv_5m.to_dict('records') if not self.ohlcv_5m.empty else [],
'ohlcv_15m': self.ohlcv_15m.to_dict('records') if not self.ohlcv_15m.empty else [],
'ohlcv_1h': self.ohlcv_1h.to_dict('records') if not self.ohlcv_1h.empty else [],
'ohlcv_1d': self.ohlcv_1d.to_dict('records') if not self.ohlcv_1d.empty else [],
'orderbook_snapshot': self.orderbook_snapshot,
'orderbook_1s_agg': self.orderbook_1s_agg.to_dict('records') if not self.orderbook_1s_agg.empty else [],
'imbalances': self.imbalances.to_dict('records') if not self.imbalances.empty else [],
'indicators': self.indicators,
'context_data': self.context_data.to_dict('records') if self.context_data is not None and not self.context_data.empty else None,
'data_source': self.data_source,
'query_latency_ms': self.query_latency_ms
}
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'InferenceDataFrame':
"""Create from dictionary."""
return cls(
symbol=data['symbol'],
timestamp=datetime.fromisoformat(data['timestamp']) if data.get('timestamp') else datetime.now(),
ohlcv_1s=pd.DataFrame(data.get('ohlcv_1s', [])),
ohlcv_1m=pd.DataFrame(data.get('ohlcv_1m', [])),
ohlcv_5m=pd.DataFrame(data.get('ohlcv_5m', [])),
ohlcv_15m=pd.DataFrame(data.get('ohlcv_15m', [])),
ohlcv_1h=pd.DataFrame(data.get('ohlcv_1h', [])),
ohlcv_1d=pd.DataFrame(data.get('ohlcv_1d', [])),
orderbook_snapshot=data.get('orderbook_snapshot'),
orderbook_1s_agg=pd.DataFrame(data.get('orderbook_1s_agg', [])),
imbalances=pd.DataFrame(data.get('imbalances', [])),
indicators=data.get('indicators', {}),
context_data=pd.DataFrame(data['context_data']) if data.get('context_data') else None,
data_source=data.get('data_source', 'unknown'),
query_latency_ms=data.get('query_latency_ms', 0.0)
)
def get_latest_price(self) -> Optional[float]:
"""Get the latest close price from 1s data."""
if not self.ohlcv_1s.empty:
return float(self.ohlcv_1s.iloc[-1]['close_price'])
return None
def get_timeframe_data(self, timeframe: str) -> pd.DataFrame:
"""Get OHLCV data for a specific timeframe."""
timeframe_map = {
'1s': self.ohlcv_1s,
'1m': self.ohlcv_1m,
'5m': self.ohlcv_5m,
'15m': self.ohlcv_15m,
'1h': self.ohlcv_1h,
'1d': self.ohlcv_1d
}
return timeframe_map.get(timeframe, pd.DataFrame())
def has_complete_data(self) -> bool:
"""Check if all required data is present."""
return (
not self.ohlcv_1s.empty and
not self.ohlcv_1m.empty and
not self.imbalances.empty and
self.orderbook_snapshot is not None
)
def get_data_summary(self) -> Dict[str, Any]:
"""Get summary of available data."""
return {
'symbol': self.symbol,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'ohlcv_1s_rows': len(self.ohlcv_1s),
'ohlcv_1m_rows': len(self.ohlcv_1m),
'ohlcv_5m_rows': len(self.ohlcv_5m),
'ohlcv_15m_rows': len(self.ohlcv_15m),
'ohlcv_1h_rows': len(self.ohlcv_1h),
'ohlcv_1d_rows': len(self.ohlcv_1d),
'has_orderbook': self.orderbook_snapshot is not None,
'orderbook_1s_agg_rows': len(self.orderbook_1s_agg),
'imbalances_rows': len(self.imbalances),
'indicators_count': len(self.indicators),
'has_context_data': self.context_data is not None,
'data_source': self.data_source,
'query_latency_ms': self.query_latency_ms,
'is_complete': self.has_complete_data()
}
@dataclass
class OrderBookDataFrame:
"""
Order book data with imbalances and aggregations.
Contains raw order book, price buckets, and multi-timeframe imbalance metrics.
"""
symbol: str
timestamp: datetime
# Raw order book (top levels)
bids: List[Tuple[float, float]] = field(default_factory=list) # (price, size)
asks: List[Tuple[float, float]] = field(default_factory=list) # (price, size)
# Aggregated data (price buckets)
price_buckets: pd.DataFrame = field(default_factory=pd.DataFrame)
# Multi-timeframe imbalance metrics
imbalance_1s: float = 0.0
imbalance_5s: float = 0.0
imbalance_15s: float = 0.0
imbalance_60s: float = 0.0
# Volume-weighted imbalances
volume_imbalance_1s: float = 0.0
volume_imbalance_5s: float = 0.0
volume_imbalance_15s: float = 0.0
volume_imbalance_60s: float = 0.0
# Order book statistics
mid_price: float = 0.0
spread: float = 0.0
bid_volume: float = 0.0
ask_volume: float = 0.0
# Metadata
exchange: str = 'binance'
sequence_id: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'symbol': self.symbol,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'bids': self.bids,
'asks': self.asks,
'price_buckets': self.price_buckets.to_dict('records') if not self.price_buckets.empty else [],
'imbalance_1s': self.imbalance_1s,
'imbalance_5s': self.imbalance_5s,
'imbalance_15s': self.imbalance_15s,
'imbalance_60s': self.imbalance_60s,
'volume_imbalance_1s': self.volume_imbalance_1s,
'volume_imbalance_5s': self.volume_imbalance_5s,
'volume_imbalance_15s': self.volume_imbalance_15s,
'volume_imbalance_60s': self.volume_imbalance_60s,
'mid_price': self.mid_price,
'spread': self.spread,
'bid_volume': self.bid_volume,
'ask_volume': self.ask_volume,
'exchange': self.exchange,
'sequence_id': self.sequence_id
}
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'OrderBookDataFrame':
"""Create from dictionary."""
return cls(
symbol=data['symbol'],
timestamp=datetime.fromisoformat(data['timestamp']) if data.get('timestamp') else datetime.now(),
bids=data.get('bids', []),
asks=data.get('asks', []),
price_buckets=pd.DataFrame(data.get('price_buckets', [])),
imbalance_1s=data.get('imbalance_1s', 0.0),
imbalance_5s=data.get('imbalance_5s', 0.0),
imbalance_15s=data.get('imbalance_15s', 0.0),
imbalance_60s=data.get('imbalance_60s', 0.0),
volume_imbalance_1s=data.get('volume_imbalance_1s', 0.0),
volume_imbalance_5s=data.get('volume_imbalance_5s', 0.0),
volume_imbalance_15s=data.get('volume_imbalance_15s', 0.0),
volume_imbalance_60s=data.get('volume_imbalance_60s', 0.0),
mid_price=data.get('mid_price', 0.0),
spread=data.get('spread', 0.0),
bid_volume=data.get('bid_volume', 0.0),
ask_volume=data.get('ask_volume', 0.0),
exchange=data.get('exchange', 'binance'),
sequence_id=data.get('sequence_id')
)
def calculate_statistics(self):
"""Calculate order book statistics from bids and asks."""
if self.bids and self.asks:
# Best bid and ask
best_bid = max(self.bids, key=lambda x: x[0])[0] if self.bids else 0
best_ask = min(self.asks, key=lambda x: x[0])[0] if self.asks else 0
# Mid price and spread
if best_bid > 0 and best_ask > 0:
self.mid_price = (best_bid + best_ask) / 2
self.spread = best_ask - best_bid
# Total volumes
self.bid_volume = sum(size for _, size in self.bids)
self.ask_volume = sum(size for _, size in self.asks)
def get_best_bid(self) -> Optional[Tuple[float, float]]:
"""Get best bid (highest price)."""
if self.bids:
return max(self.bids, key=lambda x: x[0])
return None
def get_best_ask(self) -> Optional[Tuple[float, float]]:
"""Get best ask (lowest price)."""
if self.asks:
return min(self.asks, key=lambda x: x[0])
return None
def get_spread_bps(self) -> float:
"""Get spread in basis points."""
if self.mid_price > 0 and self.spread > 0:
return (self.spread / self.mid_price) * 10000
return 0.0
def get_imbalance_summary(self) -> Dict[str, float]:
"""Get summary of all imbalance metrics."""
return {
'imbalance_1s': self.imbalance_1s,
'imbalance_5s': self.imbalance_5s,
'imbalance_15s': self.imbalance_15s,
'imbalance_60s': self.imbalance_60s,
'volume_imbalance_1s': self.volume_imbalance_1s,
'volume_imbalance_5s': self.volume_imbalance_5s,
'volume_imbalance_15s': self.volume_imbalance_15s,
'volume_imbalance_60s': self.volume_imbalance_60s
}
def is_valid(self) -> bool:
"""Check if order book data is valid."""
if not self.bids or not self.asks:
return False
best_bid = self.get_best_bid()
best_ask = self.get_best_ask()
if not best_bid or not best_ask:
return False
# Bid must be less than ask
return best_bid[0] < best_ask[0]
@dataclass
class TradeEvent:
"""Individual trade event."""
symbol: str
timestamp: datetime
price: float
size: float
side: str # 'buy' or 'sell'
trade_id: str
exchange: str = 'binance'
is_buyer_maker: bool = False
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TradeEvent':
"""Create from dictionary."""
return cls(
symbol=data['symbol'],
timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data['timestamp'], str) else data['timestamp'],
price=float(data['price']),
size=float(data['size']),
side=data['side'],
trade_id=str(data['trade_id']),
exchange=data.get('exchange', 'binance'),
is_buyer_maker=data.get('is_buyer_maker', False)
)
@dataclass
class OHLCVCandle:
"""Single OHLCV candlestick."""
symbol: str
timestamp: datetime
timeframe: str
open_price: float
high_price: float
low_price: float
close_price: float
volume: float
trade_count: int = 0
# Technical indicators (optional)
rsi_14: Optional[float] = None
macd: Optional[float] = None
macd_signal: Optional[float] = None
macd_histogram: Optional[float] = None
bb_upper: Optional[float] = None
bb_middle: Optional[float] = None
bb_lower: Optional[float] = None
ema_12: Optional[float] = None
ema_26: Optional[float] = None
sma_20: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'OHLCVCandle':
"""Create from dictionary."""
return cls(
symbol=data['symbol'],
timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data['timestamp'], str) else data['timestamp'],
timeframe=data['timeframe'],
open_price=float(data['open_price']),
high_price=float(data['high_price']),
low_price=float(data['low_price']),
close_price=float(data['close_price']),
volume=float(data['volume']),
trade_count=int(data.get('trade_count', 0)),
rsi_14=float(data['rsi_14']) if data.get('rsi_14') is not None else None,
macd=float(data['macd']) if data.get('macd') is not None else None,
macd_signal=float(data['macd_signal']) if data.get('macd_signal') is not None else None,
macd_histogram=float(data['macd_histogram']) if data.get('macd_histogram') is not None else None,
bb_upper=float(data['bb_upper']) if data.get('bb_upper') is not None else None,
bb_middle=float(data['bb_middle']) if data.get('bb_middle') is not None else None,
bb_lower=float(data['bb_lower']) if data.get('bb_lower') is not None else None,
ema_12=float(data['ema_12']) if data.get('ema_12') is not None else None,
ema_26=float(data['ema_26']) if data.get('ema_26') is not None else None,
sma_20=float(data['sma_20']) if data.get('sma_20') is not None else None
)
def is_valid(self) -> bool:
"""Check if candle data is valid."""
# High must be >= low
if self.high_price < self.low_price:
return False
# High must be >= open and close
if self.high_price < self.open_price or self.high_price < self.close_price:
return False
# Low must be <= open and close
if self.low_price > self.open_price or self.low_price > self.close_price:
return False
# Volume must be non-negative
if self.volume < 0:
return False
return True
def get_price_change(self) -> float:
"""Get price change (close - open)."""
return self.close_price - self.open_price
def get_price_change_percent(self) -> float:
"""Get price change percentage."""
if self.open_price > 0:
return ((self.close_price - self.open_price) / self.open_price) * 100
return 0.0
def get_range(self) -> float:
"""Get price range (high - low)."""
return self.high_price - self.low_price
def is_bullish(self) -> bool:
"""Check if candle is bullish (close > open)."""
return self.close_price > self.open_price
def is_bearish(self) -> bool:
"""Check if candle is bearish (close < open)."""
return self.close_price < self.open_price

View File

@@ -0,0 +1,527 @@
"""
Data Validator for unified storage system.
Validates all incoming data before storage to ensure data integrity.
"""
import logging
from datetime import datetime, timezone
from typing import Dict, List, Tuple, Optional, Any
import pandas as pd
from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent
logger = logging.getLogger(__name__)
class DataValidator:
"""
Validates all incoming data before storage.
Ensures data integrity and consistency across the system.
"""
# Validation thresholds
MAX_PRICE_CHANGE_PERCENT = 50.0 # 50% max price change per candle
MIN_PRICE = 0.0001 # Minimum valid price
MAX_PRICE = 1000000.0 # Maximum valid price
MIN_VOLUME = 0.0 # Minimum valid volume
MAX_SPREAD_PERCENT = 10.0 # 10% max spread
@staticmethod
def validate_ohlcv(candle: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate OHLCV candle data.
Args:
candle: Dictionary with OHLCV data
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check required fields
required_fields = ['timestamp', 'symbol', 'timeframe', 'open_price',
'high_price', 'low_price', 'close_price', 'volume']
for field in required_fields:
if field not in candle:
return False, f"Missing required field: {field}"
# Extract values
open_price = float(candle['open_price'])
high_price = float(candle['high_price'])
low_price = float(candle['low_price'])
close_price = float(candle['close_price'])
volume = float(candle['volume'])
# Validate price ranges
prices = [open_price, high_price, low_price, close_price]
for price in prices:
if price < DataValidator.MIN_PRICE:
return False, f"Price below minimum: {price}"
if price > DataValidator.MAX_PRICE:
return False, f"Price above maximum: {price}"
# Validate OHLC relationships
if high_price < low_price:
return False, f"High ({high_price}) < Low ({low_price})"
if high_price < open_price:
return False, f"High ({high_price}) < Open ({open_price})"
if high_price < close_price:
return False, f"High ({high_price}) < Close ({close_price})"
if low_price > open_price:
return False, f"Low ({low_price}) > Open ({open_price})"
if low_price > close_price:
return False, f"Low ({low_price}) > Close ({close_price})"
# Validate volume
if volume < DataValidator.MIN_VOLUME:
return False, f"Volume below minimum: {volume}"
# Validate price change (prevent extreme outliers)
if open_price > 0:
price_change_percent = abs((close_price - open_price) / open_price) * 100
if price_change_percent > DataValidator.MAX_PRICE_CHANGE_PERCENT:
return False, f"Price change too large: {price_change_percent:.2f}%"
# Validate timestamp
if not DataValidator._validate_timestamp(candle['timestamp']):
return False, "Invalid timestamp"
# Validate symbol
if not DataValidator._validate_symbol(candle['symbol']):
return False, f"Invalid symbol: {candle['symbol']}"
# Validate timeframe
if not DataValidator._validate_timeframe(candle['timeframe']):
return False, f"Invalid timeframe: {candle['timeframe']}"
return True, None
except (ValueError, TypeError, KeyError) as e:
return False, f"Validation error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error validating OHLCV: {e}")
return False, f"Unexpected error: {str(e)}"
@staticmethod
def validate_ohlcv_candle(candle: OHLCVCandle) -> Tuple[bool, Optional[str]]:
"""
Validate OHLCVCandle object.
Args:
candle: OHLCVCandle object
Returns:
Tuple of (is_valid, error_message)
"""
candle_dict = {
'timestamp': candle.timestamp,
'symbol': candle.symbol,
'timeframe': candle.timeframe,
'open_price': candle.open_price,
'high_price': candle.high_price,
'low_price': candle.low_price,
'close_price': candle.close_price,
'volume': candle.volume
}
return DataValidator.validate_ohlcv(candle_dict)
@staticmethod
def validate_orderbook(orderbook: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate order book data.
Args:
orderbook: Dictionary with order book data
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check required fields
if 'bids' not in orderbook or 'asks' not in orderbook:
return False, "Missing bids or asks"
bids = orderbook['bids']
asks = orderbook['asks']
# Check if bids and asks are lists
if not isinstance(bids, list) or not isinstance(asks, list):
return False, "Bids and asks must be lists"
# Check if bids and asks are not empty
if not bids or not asks:
return False, "Bids or asks are empty"
# Validate bid levels
for bid in bids:
if not isinstance(bid, (list, tuple)) or len(bid) < 2:
return False, "Invalid bid format"
price, size = float(bid[0]), float(bid[1])
if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE:
return False, f"Invalid bid price: {price}"
if size <= 0:
return False, f"Invalid bid size: {size}"
# Validate ask levels
for ask in asks:
if not isinstance(ask, (list, tuple)) or len(ask) < 2:
return False, "Invalid ask format"
price, size = float(ask[0]), float(ask[1])
if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE:
return False, f"Invalid ask price: {price}"
if size <= 0:
return False, f"Invalid ask size: {size}"
# Validate bid/ask relationship
best_bid = max(float(bid[0]) for bid in bids)
best_ask = min(float(ask[0]) for ask in asks)
if best_bid >= best_ask:
return False, f"Best bid ({best_bid}) >= Best ask ({best_ask})"
# Validate spread
spread = best_ask - best_bid
mid_price = (best_bid + best_ask) / 2
spread_percent = (spread / mid_price) * 100
if spread_percent > DataValidator.MAX_SPREAD_PERCENT:
return False, f"Spread too large: {spread_percent:.2f}%"
# Validate timestamp if present
if 'timestamp' in orderbook:
if not DataValidator._validate_timestamp(orderbook['timestamp']):
return False, "Invalid timestamp"
# Validate symbol if present
if 'symbol' in orderbook:
if not DataValidator._validate_symbol(orderbook['symbol']):
return False, f"Invalid symbol: {orderbook['symbol']}"
return True, None
except (ValueError, TypeError, KeyError) as e:
return False, f"Validation error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error validating orderbook: {e}")
return False, f"Unexpected error: {str(e)}"
@staticmethod
def validate_orderbook_dataframe(orderbook: OrderBookDataFrame) -> Tuple[bool, Optional[str]]:
"""
Validate OrderBookDataFrame object.
Args:
orderbook: OrderBookDataFrame object
Returns:
Tuple of (is_valid, error_message)
"""
orderbook_dict = {
'bids': orderbook.bids,
'asks': orderbook.asks,
'timestamp': orderbook.timestamp,
'symbol': orderbook.symbol
}
return DataValidator.validate_orderbook(orderbook_dict)
@staticmethod
def validate_trade(trade: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate trade event data.
Args:
trade: Dictionary with trade data
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check required fields
required_fields = ['timestamp', 'symbol', 'price', 'size', 'side', 'trade_id']
for field in required_fields:
if field not in trade:
return False, f"Missing required field: {field}"
# Validate price
price = float(trade['price'])
if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE:
return False, f"Invalid price: {price}"
# Validate size
size = float(trade['size'])
if size <= 0:
return False, f"Invalid size: {size}"
# Validate side
side = trade['side'].lower()
if side not in ['buy', 'sell', 'bid', 'ask']:
return False, f"Invalid side: {trade['side']}"
# Validate timestamp
if not DataValidator._validate_timestamp(trade['timestamp']):
return False, "Invalid timestamp"
# Validate symbol
if not DataValidator._validate_symbol(trade['symbol']):
return False, f"Invalid symbol: {trade['symbol']}"
# Validate trade_id
if not trade['trade_id']:
return False, "Empty trade_id"
return True, None
except (ValueError, TypeError, KeyError) as e:
return False, f"Validation error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error validating trade: {e}")
return False, f"Unexpected error: {str(e)}"
@staticmethod
def validate_trade_event(trade: TradeEvent) -> Tuple[bool, Optional[str]]:
"""
Validate TradeEvent object.
Args:
trade: TradeEvent object
Returns:
Tuple of (is_valid, error_message)
"""
trade_dict = {
'timestamp': trade.timestamp,
'symbol': trade.symbol,
'price': trade.price,
'size': trade.size,
'side': trade.side,
'trade_id': trade.trade_id
}
return DataValidator.validate_trade(trade_dict)
@staticmethod
def validate_imbalances(imbalances: Dict[str, float]) -> Tuple[bool, Optional[str]]:
"""
Validate order book imbalance metrics.
Args:
imbalances: Dictionary with imbalance metrics
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Expected imbalance fields
expected_fields = [
'imbalance_1s', 'imbalance_5s', 'imbalance_15s', 'imbalance_60s',
'volume_imbalance_1s', 'volume_imbalance_5s',
'volume_imbalance_15s', 'volume_imbalance_60s'
]
# Check if at least some imbalance fields are present
present_fields = [f for f in expected_fields if f in imbalances]
if not present_fields:
return False, "No imbalance fields present"
# Validate imbalance values (should be between -1 and 1)
for field in present_fields:
value = float(imbalances[field])
if value < -1.0 or value > 1.0:
logger.warning(f"Imbalance {field} out of range [-1, 1]: {value}")
# Don't fail validation, just warn
return True, None
except (ValueError, TypeError) as e:
return False, f"Validation error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error validating imbalances: {e}")
return False, f"Unexpected error: {str(e)}"
@staticmethod
def _validate_timestamp(timestamp: Any) -> bool:
"""Validate timestamp format and value."""
try:
if isinstance(timestamp, datetime):
# Check if timestamp is not too far in the future or past
now = datetime.now(timezone.utc)
diff_days = abs((timestamp - now).days)
# Allow timestamps within 10 years
if diff_days > 3650:
logger.warning(f"Timestamp too far from now: {timestamp}")
return False
return True
elif isinstance(timestamp, (int, float)):
# Unix timestamp
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return DataValidator._validate_timestamp(dt)
elif isinstance(timestamp, str):
# ISO format string
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return DataValidator._validate_timestamp(dt)
else:
return False
except Exception as e:
logger.debug(f"Timestamp validation error: {e}")
return False
@staticmethod
def _validate_symbol(symbol: str) -> bool:
"""Validate trading symbol format."""
if not symbol or not isinstance(symbol, str):
return False
# Symbol should be non-empty and reasonable length
if len(symbol) < 3 or len(symbol) > 20:
return False
# Common symbol formats: BTCUSDT, BTC/USDT, BTC-USDT
valid_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/-')
if not all(c in valid_chars for c in symbol.upper()):
return False
return True
@staticmethod
def _validate_timeframe(timeframe: str) -> bool:
"""Validate timeframe format."""
valid_timeframes = ['1s', '1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w']
return timeframe in valid_timeframes
@staticmethod
def validate_dataframe(df: pd.DataFrame, required_columns: List[str]) -> Tuple[bool, Optional[str]]:
"""
Validate pandas DataFrame has required columns and valid data.
Args:
df: DataFrame to validate
required_columns: List of required column names
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check if DataFrame is empty
if df.empty:
return False, "DataFrame is empty"
# Check required columns
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"Missing columns: {missing_columns}"
# Check for null values in required columns
null_counts = df[required_columns].isnull().sum()
if null_counts.any():
null_cols = null_counts[null_counts > 0].to_dict()
return False, f"Null values found: {null_cols}"
return True, None
except Exception as e:
logger.error(f"DataFrame validation error: {e}")
return False, f"Validation error: {str(e)}"
@staticmethod
def sanitize_ohlcv(candle: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanitize OHLCV data by fixing common issues.
Args:
candle: OHLCV candle dictionary
Returns:
Sanitized candle dictionary
"""
sanitized = candle.copy()
try:
# Ensure numeric types
for field in ['open_price', 'high_price', 'low_price', 'close_price', 'volume']:
if field in sanitized:
sanitized[field] = float(sanitized[field])
# Fix high/low if needed
prices = [sanitized.get('open_price', 0), sanitized.get('close_price', 0)]
if 'high_price' in sanitized and 'low_price' in sanitized:
high = sanitized['high_price']
low = sanitized['low_price']
# Ensure high >= all prices
sanitized['high_price'] = max(high, *prices)
# Ensure low <= all prices
sanitized['low_price'] = min(low, *prices)
# Ensure non-negative volume
if 'volume' in sanitized:
sanitized['volume'] = max(0, sanitized['volume'])
# Ensure trade_count is integer
if 'trade_count' in sanitized:
sanitized['trade_count'] = int(sanitized['trade_count'])
except Exception as e:
logger.error(f"Error sanitizing OHLCV: {e}")
return sanitized
@staticmethod
def sanitize_orderbook(orderbook: Dict[str, Any]) -> Dict[str, Any]:
"""
Sanitize order book data by fixing common issues.
Args:
orderbook: Order book dictionary
Returns:
Sanitized order book dictionary
"""
sanitized = orderbook.copy()
try:
# Ensure bids and asks are lists
if 'bids' in sanitized and not isinstance(sanitized['bids'], list):
sanitized['bids'] = []
if 'asks' in sanitized and not isinstance(sanitized['asks'], list):
sanitized['asks'] = []
# Remove invalid levels
if 'bids' in sanitized:
sanitized['bids'] = [
bid for bid in sanitized['bids']
if isinstance(bid, (list, tuple)) and len(bid) >= 2 and float(bid[0]) > 0 and float(bid[1]) > 0
]
if 'asks' in sanitized:
sanitized['asks'] = [
ask for ask in sanitized['asks']
if isinstance(ask, (list, tuple)) and len(ask) >= 2 and float(ask[0]) > 0 and float(ask[1]) > 0
]
# Sort bids descending, asks ascending
if 'bids' in sanitized:
sanitized['bids'] = sorted(sanitized['bids'], key=lambda x: float(x[0]), reverse=True)
if 'asks' in sanitized:
sanitized['asks'] = sorted(sanitized['asks'], key=lambda x: float(x[0]))
except Exception as e:
logger.error(f"Error sanitizing orderbook: {e}")
return sanitized