From ba8813f04ffc1d994406075267e688f6e830bda5 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 20 Oct 2025 11:16:27 +0300 Subject: [PATCH] unified cache. LLM report --- .kiro/specs/unified-data-storage/tasks.md | 7 +- core/data_provider.py | 72 ++ core/report_data_crawler.py | 799 ++++++++++++++++++++++ core/unified_data_models.py | 432 ++++++++++++ core/unified_data_validator.py | 527 ++++++++++++++ examples/report_crawler_example.py | 107 +++ 6 files changed, 1943 insertions(+), 1 deletion(-) create mode 100644 core/report_data_crawler.py create mode 100644 core/unified_data_validator.py create mode 100644 examples/report_crawler_example.py diff --git a/.kiro/specs/unified-data-storage/tasks.md b/.kiro/specs/unified-data-storage/tasks.md index 9490dd2..8d27472 100644 --- a/.kiro/specs/unified-data-storage/tasks.md +++ b/.kiro/specs/unified-data-storage/tasks.md @@ -10,10 +10,15 @@ - Create all necessary indexes for query optimization + + + - _Requirements: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 3.2, 3.3, 3.4, 3.5, 4.1, 4.2, 4.3, 4.4, 4.5, 4.6_ - [ ] 2. Implement data models and validation - - [ ] 2.1 Create InferenceDataFrame and OrderBookDataFrame data classes + - [x] 2.1 Create InferenceDataFrame and OrderBookDataFrame data classes + + - Write dataclasses for standardized data structures - Include all required fields (OHLCV, order book, imbalances, indicators) - Add serialization/deserialization methods diff --git a/core/data_provider.py b/core/data_provider.py index e75907a..fac836c 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -55,6 +55,7 @@ from .williams_market_structure import WilliamsMarketStructure, PivotPoint, Tren from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket from .huobi_cob_websocket import get_huobi_cob_websocket from .cob_integration import COBIntegration +from .report_data_crawler import ReportDataCrawler, ReportData logger = logging.getLogger(__name__) @@ -169,6 +170,9 @@ class DataProvider: self.enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None self.websocket_tasks = {} + # Report data crawler for comprehensive trading reports + self.report_crawler: Optional[ReportDataCrawler] = None + # COB collection state guard to prevent duplicate starts self._cob_started: bool = False @@ -3516,3 +3520,71 @@ class DataProvider: except Exception as e: logger.error(f"Error creating transformer sequences for inference: {e}") return [] + + # === REPORT DATA CRAWLER METHODS === + + def _get_report_crawler(self) -> ReportDataCrawler: + """Get or initialize the report data crawler""" + if self.report_crawler is None: + self.report_crawler = ReportDataCrawler(data_provider=self) + return self.report_crawler + + def crawl_comprehensive_report(self, symbol: str) -> Optional[ReportData]: + """Crawl comprehensive report data for a trading pair + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + ReportData object with all required information, or None if failed + """ + try: + crawler = self._get_report_crawler() + return crawler.crawl_report_data(symbol) + except Exception as e: + logger.error(f"Error crawling comprehensive report for {symbol}: {e}") + return None + + def generate_trading_report(self, symbol: str) -> Optional[str]: + """Generate formatted trading report for a symbol + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + Formatted report string, or None if failed + """ + try: + crawler = self._get_report_crawler() + return crawler.crawl_and_generate_report(symbol) + except Exception as e: + logger.error(f"Error generating trading report for {symbol}: {e}") + return None + + def get_report_data_for_multiple_pairs(self, symbols: List[str]) -> Dict[str, Optional[ReportData]]: + """Get report data for multiple trading pairs + + Args: + symbols: List of trading pair symbols + + Returns: + Dictionary mapping symbols to their ReportData objects + """ + try: + results = {} + crawler = self._get_report_crawler() + + for symbol in symbols: + try: + report_data = crawler.crawl_report_data(symbol) + results[symbol] = report_data + logger.info(f"Crawled report data for {symbol}: {'Success' if report_data else 'Failed'}") + except Exception as e: + logger.error(f"Error crawling report data for {symbol}: {e}") + results[symbol] = None + + return results + + except Exception as e: + logger.error(f"Error getting report data for multiple pairs: {e}") + return {} diff --git a/core/report_data_crawler.py b/core/report_data_crawler.py new file mode 100644 index 0000000..9bf67b5 --- /dev/null +++ b/core/report_data_crawler.py @@ -0,0 +1,799 @@ +""" +Report Data Crawler Sub-Module + +This module provides functionality to crawl and compile comprehensive trading reports +in the specified format for different trading pairs. It extends the DataProvider +with specialized methods for report generation. + +Enhanced with: +- Polymarket bets/odds data for sentiment analysis +- Multiple open interest data sources for improved reliability +- Real-time market sentiment from prediction markets + +CRITICAL POLICY: NO SYNTHETIC DATA ALLOWED +This module MUST ONLY use real market data from exchanges. +NEVER use np.random.*, mock/fake/synthetic data, or placeholder values. +If data is unavailable: return None/0/empty, log errors, raise exceptions. +""" + +import logging +import pandas as pd +import numpy as np +import requests +import time +import json +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple, Any +from dataclasses import dataclass +import ta +import warnings + +# Suppress ta library deprecation warnings +warnings.filterwarnings("ignore", category=FutureWarning, module="ta") + +logger = logging.getLogger(__name__) + +@dataclass +class PolymarketData: + """Polymarket prediction market data""" + market_id: str + question: str + outcome: str + probability: float + volume_24h: float + liquidity: float + last_trade_time: datetime + market_type: str # 'crypto', 'election', 'sports', etc. + +@dataclass +class OpenInterestData: + """Open interest data from multiple sources""" + source: str + symbol: str + open_interest: float + timestamp: datetime + change_24h: float + change_percent: float + +@dataclass +class ReportData: + """Data structure for comprehensive trading report""" + symbol: str + timestamp: datetime + + # Current values + current_price: float + current_ema20: float + current_macd: float + current_rsi_7: float + + # Enhanced Open Interest and Funding Rate (multiple sources) + open_interest_data: List[OpenInterestData] + funding_rate: float + + # Polymarket sentiment data + polymarket_data: List[PolymarketData] + market_sentiment_score: float + + # Intraday series (3-minute intervals, oldest → newest) + mid_prices: List[float] + ema_20_series: List[float] + macd_series: List[float] + rsi_7_series: List[float] + rsi_14_series: List[float] + + # Longer-term context (4-hour timeframe) + ema_20_4h: float + ema_50_4h: float + atr_3_4h: float + atr_14_4h: float + current_volume: float + average_volume: float + macd_4h_series: List[float] + rsi_14_4h_series: List[float] + +class ReportDataCrawler: + """Specialized crawler for compiling comprehensive trading reports""" + + def __init__(self, data_provider=None): + """Initialize the report data crawler + + Args: + data_provider: Instance of DataProvider to use for data fetching + """ + self.data_provider = data_provider + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + + # Cache for API responses to avoid rate limiting + self._cache = {} + self._cache_timeout = 30 # 30 seconds cache + + # Polymarket API configuration + self.polymarket_base_url = "https://gamma-api.polymarket.com" + self.polymarket_graphql_url = f"{self.polymarket_base_url}/graphql" + + # Open interest data sources + self.open_interest_sources = { + 'binance': 'https://fapi.binance.com/fapi/v1/openInterest', + 'bybit': 'https://api.bybit.com/v5/market/open-interest', + 'okx': 'https://www.okx.com/api/v5/public/open-interest', + 'coinglass': 'https://open-api.coinglass.com/public/v2/open_interest' + } + + def _get_cached_or_fetch(self, cache_key: str, fetch_func, *args, **kwargs): + """Get data from cache or fetch if expired""" + now = time.time() + + if cache_key in self._cache: + data, timestamp = self._cache[cache_key] + if now - timestamp < self._cache_timeout: + return data + + # Fetch new data + data = fetch_func(*args, **kwargs) + if data is not None: + self._cache[cache_key] = (data, now) + + return data + + def _fetch_polymarket_data(self, symbol: str) -> List[PolymarketData]: + """Fetch Polymarket prediction market data for cryptocurrency-related markets + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + List of PolymarketData objects + """ + try: + cache_key = f"polymarket_{symbol}" + cached_data = self._get_cached_or_fetch(cache_key, self._fetch_polymarket_raw, symbol) + + if not cached_data: + return [] + + polymarket_data = [] + base_symbol = symbol.split('/')[0].lower() # Extract BTC from BTC/USDT + + for market in cached_data: + try: + # Filter for crypto-related markets + if any(keyword in market.get('question', '').lower() for keyword in [base_symbol, 'bitcoin', 'ethereum', 'crypto', 'cryptocurrency']): + polymarket_data.append(PolymarketData( + market_id=market.get('id', ''), + question=market.get('question', ''), + outcome=market.get('outcome', ''), + probability=float(market.get('probability', 0)), + volume_24h=float(market.get('volume24h', 0)), + liquidity=float(market.get('liquidity', 0)), + last_trade_time=datetime.fromisoformat(market.get('lastTradeTime', datetime.now().isoformat())), + market_type='crypto' + )) + except Exception as e: + logger.warning(f"Error parsing Polymarket market data: {e}") + continue + + logger.info(f"Fetched {len(polymarket_data)} Polymarket markets for {symbol}") + return polymarket_data + + except Exception as e: + logger.error(f"Error fetching Polymarket data for {symbol}: {e}") + return [] + + def _fetch_polymarket_raw(self, symbol: str) -> List[Dict]: + """Fetch raw Polymarket data using GraphQL API""" + try: + # GraphQL query for active markets + query = """ + query GetMarkets($limit: Int!, $offset: Int!) { + markets( + limit: $limit + offset: $offset + orderBy: "volume" + orderDirection: "desc" + where: { + active: true + closed: false + } + ) { + id + question + outcome + probability + volume24h + liquidity + lastTradeTime + endDate + } + } + """ + + variables = { + "limit": 50, + "offset": 0 + } + + payload = { + "query": query, + "variables": variables + } + + response = self.session.post( + self.polymarket_graphql_url, + json=payload, + timeout=10, + headers={'Content-Type': 'application/json'} + ) + + if response.status_code == 200: + data = response.json() + if 'data' in data and 'markets' in data['data']: + return data['data']['markets'] + + logger.warning(f"Polymarket API returned status {response.status_code}") + return [] + + except Exception as e: + logger.error(f"Error fetching raw Polymarket data: {e}") + return [] + + def _calculate_market_sentiment_score(self, polymarket_data: List[PolymarketData], symbol: str) -> float: + """Calculate market sentiment score from Polymarket data + + Args: + polymarket_data: List of PolymarketData objects + symbol: Trading pair symbol + + Returns: + Sentiment score between -1 (bearish) and 1 (bullish) + """ + try: + if not polymarket_data: + return 0.0 + + base_symbol = symbol.split('/')[0].lower() + sentiment_scores = [] + + for data in polymarket_data: + # Weight by volume and liquidity + weight = (data.volume_24h + data.liquidity) / 1000000 # Normalize + + # Extract sentiment from probability + if 'above' in data.question.lower() or 'higher' in data.question.lower(): + # Bullish sentiment + sentiment = (data.probability - 0.5) * 2 # Convert 0-1 to -1 to 1 + elif 'below' in data.question.lower() or 'lower' in data.question.lower(): + # Bearish sentiment + sentiment = (0.5 - data.probability) * 2 # Convert 0-1 to -1 to 1 + else: + # Neutral or unclear sentiment + sentiment = 0.0 + + sentiment_scores.append(sentiment * weight) + + if sentiment_scores: + # Weighted average sentiment + total_weight = sum((data.volume_24h + data.liquidity) / 1000000 for data in polymarket_data) + if total_weight > 0: + return sum(sentiment_scores) / total_weight + + return 0.0 + + except Exception as e: + logger.error(f"Error calculating market sentiment score: {e}") + return 0.0 + + def _fetch_open_interest_from_multiple_sources(self, symbol: str) -> List[OpenInterestData]: + """Fetch open interest data from multiple sources for improved reliability + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + List of OpenInterestData objects from different sources + """ + try: + open_interest_data = [] + binance_symbol = symbol.replace('/', '') + + # Source 1: Binance + try: + cache_key = f"binance_oi_{symbol}" + binance_data = self._get_cached_or_fetch(cache_key, self._fetch_binance_open_interest, binance_symbol) + if binance_data: + open_interest_data.append(OpenInterestData( + source='binance', + symbol=symbol, + open_interest=binance_data['openInterest'], + timestamp=datetime.now(), + change_24h=binance_data.get('change24h', 0), + change_percent=binance_data.get('changePercent', 0) + )) + except Exception as e: + logger.warning(f"Failed to fetch Binance open interest: {e}") + + # Source 2: Bybit + try: + cache_key = f"bybit_oi_{symbol}" + bybit_data = self._get_cached_or_fetch(cache_key, self._fetch_bybit_open_interest, symbol) + if bybit_data: + open_interest_data.append(OpenInterestData( + source='bybit', + symbol=symbol, + open_interest=bybit_data['openInterest'], + timestamp=datetime.now(), + change_24h=bybit_data.get('change24h', 0), + change_percent=bybit_data.get('changePercent', 0) + )) + except Exception as e: + logger.warning(f"Failed to fetch Bybit open interest: {e}") + + # Source 3: OKX + try: + cache_key = f"okx_oi_{symbol}" + okx_data = self._get_cached_or_fetch(cache_key, self._fetch_okx_open_interest, symbol) + if okx_data: + open_interest_data.append(OpenInterestData( + source='okx', + symbol=symbol, + open_interest=okx_data['openInterest'], + timestamp=datetime.now(), + change_24h=okx_data.get('change24h', 0), + change_percent=okx_data.get('changePercent', 0) + )) + except Exception as e: + logger.warning(f"Failed to fetch OKX open interest: {e}") + + logger.info(f"Fetched open interest data from {len(open_interest_data)} sources for {symbol}") + return open_interest_data + + except Exception as e: + logger.error(f"Error fetching open interest from multiple sources for {symbol}: {e}") + return [] + + def _fetch_binance_open_interest(self, symbol: str) -> Dict: + """Fetch open interest data from Binance""" + try: + url = "https://fapi.binance.com/fapi/v1/openInterest" + params = {'symbol': symbol} + + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json() + return {} + except Exception as e: + logger.error(f"Error fetching Binance open interest: {e}") + return {} + + def _fetch_bybit_open_interest(self, symbol: str) -> Dict: + """Fetch open interest data from Bybit""" + try: + # Convert BTC/USDT to BTCUSDT for Bybit + bybit_symbol = symbol.replace('/', '') + url = "https://api.bybit.com/v5/market/open-interest" + params = { + 'category': 'linear', + 'symbol': bybit_symbol, + 'intervalTime': '5min' + } + + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + if 'result' in data and 'list' in data['result'] and data['result']['list']: + latest = data['result']['list'][0] + return { + 'openInterest': float(latest['openInterest']), + 'change24h': float(latest.get('change24h', 0)), + 'changePercent': float(latest.get('changePercent', 0)) + } + return {} + except Exception as e: + logger.error(f"Error fetching Bybit open interest: {e}") + return {} + + def _fetch_okx_open_interest(self, symbol: str) -> Dict: + """Fetch open interest data from OKX""" + try: + # Convert BTC/USDT to BTC-USDT-SWAP for OKX + okx_symbol = symbol.replace('/', '-') + '-SWAP' + url = "https://www.okx.com/api/v5/public/open-interest" + params = {'instId': okx_symbol} + + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + if 'data' in data and data['data']: + latest = data['data'][0] + return { + 'openInterest': float(latest['oi']), + 'change24h': float(latest.get('oiCcy', 0)), + 'changePercent': 0 # OKX doesn't provide percentage change + } + return {} + except Exception as e: + logger.error(f"Error fetching OKX open interest: {e}") + return {} + + def _fetch_funding_rate(self, symbol: str) -> float: + """Fetch funding rate from Binance""" + try: + binance_symbol = symbol.replace('/', '') + url = "https://fapi.binance.com/fapi/v1/premiumIndex" + params = {'symbol': binance_symbol} + + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return float(data.get('lastFundingRate', 0)) + return 0.0 + except Exception as e: + logger.error(f"Error fetching funding rate: {e}") + return 0.0 + + def _calculate_technical_indicators(self, df: pd.DataFrame) -> Dict[str, Any]: + """Calculate technical indicators for the given dataframe""" + try: + if df.empty or len(df) < 5: + logger.warning("Insufficient data for technical indicators") + return {} + + indicators = {} + + # EMA 20 (need at least 20 periods) + if len(df) >= 20: + indicators['ema_20'] = ta.trend.ema_indicator(df['close'], window=20) + else: + indicators['ema_20'] = pd.Series(index=df.index, dtype=float) + + # MACD (need at least 26 periods) + if len(df) >= 26: + macd = ta.trend.MACD(df['close']) + indicators['macd'] = macd.macd() + indicators['macd_signal'] = macd.macd_signal() + indicators['macd_histogram'] = macd.macd_diff() + else: + indicators['macd'] = pd.Series(index=df.index, dtype=float) + indicators['macd_signal'] = pd.Series(index=df.index, dtype=float) + indicators['macd_histogram'] = pd.Series(index=df.index, dtype=float) + + # RSI (7 and 14 period) - need at least 14 periods + if len(df) >= 14: + indicators['rsi_7'] = self._calculate_rsi(df['close'], period=7) + indicators['rsi_14'] = self._calculate_rsi(df['close'], period=14) + else: + indicators['rsi_7'] = pd.Series(index=df.index, dtype=float) + indicators['rsi_14'] = pd.Series(index=df.index, dtype=float) + + # ATR (need at least 14 periods) + if len(df) >= 14: + indicators['atr_3'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=3) + indicators['atr_14'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=14) + else: + indicators['atr_3'] = pd.Series(index=df.index, dtype=float) + indicators['atr_14'] = pd.Series(index=df.index, dtype=float) + + # EMA 50 (need at least 50 periods) + if len(df) >= 50: + indicators['ema_50'] = ta.trend.ema_indicator(df['close'], window=50) + else: + indicators['ema_50'] = pd.Series(index=df.index, dtype=float) + + return indicators + + except Exception as e: + logger.error(f"Error calculating technical indicators: {e}") + return {} + + def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: + """Calculate RSI using our own implementation""" + try: + delta = prices.diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + rs = gain / loss + rsi = 100 - (100 / (1 + rs)) + return rsi + except Exception as e: + logger.error(f"Error calculating RSI: {e}") + return pd.Series(index=prices.index, dtype=float) + + def _get_intraday_series(self, symbol: str, limit: int = 10) -> Dict[str, List[float]]: + """Get intraday series data (3-minute intervals)""" + try: + if not self.data_provider: + logger.error("No data provider available") + return {} + + # Get 1-minute data and resample to 3-minute intervals + df_1m = self.data_provider.get_historical_data(symbol, '1m', limit=limit*3, refresh=True) + if df_1m is None or df_1m.empty: + logger.warning(f"No 1-minute data available for {symbol}") + return {} + + # Resample to 3-minute intervals + df_3m = df_1m.resample('3min').agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }).dropna() + + if len(df_3m) < limit: + logger.warning(f"Insufficient 3-minute data: {len(df_3m)} < {limit}") + return {} + + # Take the last 'limit' periods + df_3m = df_3m.tail(limit) + + # Calculate technical indicators + indicators = self._calculate_technical_indicators(df_3m) + + # Extract series data + series_data = { + 'mid_prices': df_3m['close'].tolist(), + 'ema_20_series': indicators.get('ema_20', pd.Series()).tolist(), + 'macd_series': indicators.get('macd', pd.Series()).tolist(), + 'rsi_7_series': indicators.get('rsi_7', pd.Series()).tolist(), + 'rsi_14_series': indicators.get('rsi_14', pd.Series()).tolist() + } + + return series_data + + except Exception as e: + logger.error(f"Error getting intraday series for {symbol}: {e}") + return {} + + def _get_longer_term_context(self, symbol: str, limit: int = 10) -> Dict[str, Any]: + """Get longer-term context data (4-hour timeframe)""" + try: + if not self.data_provider: + logger.error("No data provider available") + return {} + + # Get 4-hour data + df_4h = self.data_provider.get_historical_data(symbol, '4h', limit=limit, refresh=True) + if df_4h is None or df_4h.empty: + logger.warning(f"No 4-hour data available for {symbol}") + return {} + + # Calculate technical indicators + indicators = self._calculate_technical_indicators(df_4h) + + # Calculate volume metrics + current_volume = df_4h['volume'].iloc[-1] if not df_4h.empty else 0 + average_volume = df_4h['volume'].mean() if not df_4h.empty else 0 + + context_data = { + 'ema_20_4h': indicators.get('ema_20', pd.Series()).iloc[-1] if not indicators.get('ema_20', pd.Series()).empty else 0, + 'ema_50_4h': indicators.get('ema_50', pd.Series()).iloc[-1] if not indicators.get('ema_50', pd.Series()).empty else 0, + 'atr_3_4h': indicators.get('atr_3', pd.Series()).iloc[-1] if not indicators.get('atr_3', pd.Series()).empty else 0, + 'atr_14_4h': indicators.get('atr_14', pd.Series()).iloc[-1] if not indicators.get('atr_14', pd.Series()).empty else 0, + 'current_volume': current_volume, + 'average_volume': average_volume, + 'macd_4h_series': indicators.get('macd', pd.Series()).tolist(), + 'rsi_14_4h_series': indicators.get('rsi_14', pd.Series()).tolist() + } + + return context_data + + except Exception as e: + logger.error(f"Error getting longer-term context for {symbol}: {e}") + return {} + + def crawl_report_data(self, symbol: str) -> Optional[ReportData]: + """Crawl comprehensive report data for a trading pair + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + ReportData object with all required information, or None if failed + """ + try: + logger.info(f"Crawling report data for {symbol}") + + if not self.data_provider: + logger.error("No data provider available for crawling") + return None + + # Force refresh data to ensure we have current information + logger.info(f"Refreshing data for {symbol}...") + for timeframe in ['1m', '1h', '4h']: + try: + self.data_provider.get_historical_data(symbol, timeframe, limit=100, refresh=True) + except Exception as e: + logger.warning(f"Could not refresh {timeframe} data for {symbol}: {e}") + + # Get current price and basic data + current_price = self.data_provider.get_current_price(symbol) + if current_price is None: + logger.error(f"Could not get current price for {symbol}") + return None + + # Get 1-minute data for current indicators + df_1m = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=True) + if df_1m is None or df_1m.empty: + logger.error(f"Could not get 1-minute data for {symbol}") + return None + + # Calculate current technical indicators + indicators = self._calculate_technical_indicators(df_1m) + + # Get current indicator values + current_ema20 = indicators.get('ema_20', pd.Series()).iloc[-1] if not indicators.get('ema_20', pd.Series()).empty else 0 + current_macd = indicators.get('macd', pd.Series()).iloc[-1] if not indicators.get('macd', pd.Series()).empty else 0 + current_rsi_7 = indicators.get('rsi_7', pd.Series()).iloc[-1] if not indicators.get('rsi_7', pd.Series()).empty else 0 + + # Fetch enhanced open interest data from multiple sources + open_interest_data = self._fetch_open_interest_from_multiple_sources(symbol) + + # Fetch funding rate (still using Binance as primary source) + funding_rate = self._fetch_funding_rate(symbol) + + # Fetch Polymarket sentiment data + polymarket_data = self._fetch_polymarket_data(symbol) + market_sentiment_score = self._calculate_market_sentiment_score(polymarket_data, symbol) + + # Get intraday series data + intraday_data = self._get_intraday_series(symbol, limit=10) + + # Get longer-term context + longer_term_data = self._get_longer_term_context(symbol, limit=10) + + # Create ReportData object + report_data = ReportData( + symbol=symbol, + timestamp=datetime.now(), + current_price=current_price, + current_ema20=current_ema20, + current_macd=current_macd, + current_rsi_7=current_rsi_7, + open_interest_data=open_interest_data, + funding_rate=funding_rate, + polymarket_data=polymarket_data, + market_sentiment_score=market_sentiment_score, + mid_prices=intraday_data.get('mid_prices', []), + ema_20_series=intraday_data.get('ema_20_series', []), + macd_series=intraday_data.get('macd_series', []), + rsi_7_series=intraday_data.get('rsi_7_series', []), + rsi_14_series=intraday_data.get('rsi_14_series', []), + ema_20_4h=longer_term_data.get('ema_20_4h', 0), + ema_50_4h=longer_term_data.get('ema_50_4h', 0), + atr_3_4h=longer_term_data.get('atr_3_4h', 0), + atr_14_4h=longer_term_data.get('atr_14_4h', 0), + current_volume=longer_term_data.get('current_volume', 0), + average_volume=longer_term_data.get('average_volume', 0), + macd_4h_series=longer_term_data.get('macd_4h_series', []), + rsi_14_4h_series=longer_term_data.get('rsi_14_4h_series', []) + ) + + logger.info(f"Successfully crawled report data for {symbol}") + return report_data + + except Exception as e: + logger.error(f"Error crawling report data for {symbol}: {e}") + return None + + def generate_report_text(self, report_data: ReportData) -> str: + """Generate formatted report text in the specified format + + Args: + report_data: ReportData object with all required information + + Returns: + Formatted report string + """ + try: + if not report_data: + return "Error: No report data available" + + # Format current values + current_values = ( + f"current_price = {report_data.current_price:.1f}, " + f"current_ema20 = {report_data.current_ema20:.3f}, " + f"current_macd = {report_data.current_macd:.3f}, " + f"current_rsi (7 period) = {report_data.current_rsi_7:.3f}" + ) + + # Format enhanced open interest data from multiple sources + oi_section = "Open Interest (Multiple Sources):\n" + if report_data.open_interest_data: + for oi_data in report_data.open_interest_data: + try: + open_interest_val = float(oi_data.open_interest) if oi_data.open_interest else 0 + change_percent_val = float(oi_data.change_percent) if oi_data.change_percent else 0 + oi_section += f" {oi_data.source.upper()}: {open_interest_val:,.2f} " + if change_percent_val != 0: + oi_section += f"(24h: {change_percent_val:+.2f}%)\n" + else: + oi_section += "\n" + except (ValueError, TypeError) as e: + logger.warning(f"Error formatting open interest data: {e}") + oi_section += f" {oi_data.source.upper()}: Error formatting data\n" + else: + oi_section += " No open interest data available\n" + + # Format funding rate + funding_section = f"Funding Rate: {report_data.funding_rate:.2e}\n" + + # Format Polymarket sentiment data + sentiment_section = "" + if report_data.polymarket_data: + sentiment_section = f"\nPolymarket Sentiment Analysis:\n" + sentiment_section += f"Market Sentiment Score: {report_data.market_sentiment_score:.3f} " + if report_data.market_sentiment_score > 0.1: + sentiment_section += "(Bullish)\n" + elif report_data.market_sentiment_score < -0.1: + sentiment_section += "(Bearish)\n" + else: + sentiment_section += "(Neutral)\n" + + sentiment_section += f"Active Markets: {len(report_data.polymarket_data)}\n" + for market in report_data.polymarket_data[:3]: # Show top 3 markets + sentiment_section += f" - {market.question[:50]}... (Prob: {market.probability:.2f})\n" + else: + sentiment_section = "\nPolymarket Sentiment: No data available\n" + + # Format intraday series + intraday_section = f"""Intraday series (by minute, oldest → latest): + +Mid prices: {report_data.mid_prices} +EMA indicators (20‑period): {report_data.ema_20_series} +MACD indicators: {report_data.macd_series} +RSI indicators (7‑Period): {report_data.rsi_7_series} +RSI indicators (14‑Period): {report_data.rsi_14_series}""" + + # Format longer-term context + longer_term_section = f"""Longer‑term context (4‑hour timeframe): + +20‑Period EMA: {report_data.ema_20_4h:.3f} vs. 50‑Period EMA: {report_data.ema_50_4h:.3f} +3‑Period ATR: {report_data.atr_3_4h:.3f} vs. 14‑Period ATR: {report_data.atr_14_4h:.3f} +Current Volume: {report_data.current_volume:.3f} vs. Average Volume: {report_data.average_volume:.3f} +MACD indicators: {report_data.macd_4h_series} +RSI indicators (14‑Period): {report_data.rsi_14_4h_series}""" + + # Combine all sections + report_text = f"""ALL OF THE PRICE OR SIGNAL DATA BELOW IS ORDERED: OLDEST → NEWEST + +Timeframes note: Unless stated otherwise in a section title, intraday series are provided at 3‑minute intervals. If a coin uses a different interval, it is explicitly stated in that coin's section. + +{current_values} + +In addition, here is the latest {report_data.symbol} open interest and funding rate for perps (the instrument you are trading): + +{oi_section} +{funding_section}{sentiment_section} + +{intraday_section} + +{longer_term_section}""" + + return report_text + + except Exception as e: + logger.error(f"Error generating report text: {e}") + return f"Error generating report: {e}" + + def crawl_and_generate_report(self, symbol: str) -> Optional[str]: + """Crawl data and generate formatted report in one call + + Args: + symbol: Trading pair symbol (e.g., 'BTC/USDT') + + Returns: + Formatted report string, or None if failed + """ + try: + report_data = self.crawl_report_data(symbol) + if report_data: + return self.generate_report_text(report_data) + else: + return None + except Exception as e: + logger.error(f"Error in crawl_and_generate_report for {symbol}: {e}") + return None diff --git a/core/unified_data_models.py b/core/unified_data_models.py index e69de29..ae70dd6 100644 --- a/core/unified_data_models.py +++ b/core/unified_data_models.py @@ -0,0 +1,432 @@ +""" +Unified Data Models for the storage system. +Standardized data structures for all components. +""" + +import pandas as pd +import numpy as np +from dataclasses import dataclass, field, asdict +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Any +import json +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class InferenceDataFrame: + """ + Complete inference data for a single timestamp. + Contains all data needed for model inference including multi-timeframe OHLCV, + order book data, imbalances, and technical indicators. + """ + symbol: str + timestamp: datetime + + # Multi-timeframe OHLCV data + ohlcv_1s: pd.DataFrame = field(default_factory=pd.DataFrame) + ohlcv_1m: pd.DataFrame = field(default_factory=pd.DataFrame) + ohlcv_5m: pd.DataFrame = field(default_factory=pd.DataFrame) + ohlcv_15m: pd.DataFrame = field(default_factory=pd.DataFrame) + ohlcv_1h: pd.DataFrame = field(default_factory=pd.DataFrame) + ohlcv_1d: pd.DataFrame = field(default_factory=pd.DataFrame) + + # Order book data + orderbook_snapshot: Optional[Dict] = None + orderbook_1s_agg: pd.DataFrame = field(default_factory=pd.DataFrame) + + # Imbalance metrics (multi-timeframe) + imbalances: pd.DataFrame = field(default_factory=pd.DataFrame) + + # Technical indicators (pre-calculated from latest candle) + indicators: Dict[str, float] = field(default_factory=dict) + + # Context window data (±N minutes around timestamp) + context_data: Optional[pd.DataFrame] = None + + # Metadata + data_source: str = 'unknown' # 'cache' or 'database' + query_latency_ms: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'symbol': self.symbol, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'ohlcv_1s': self.ohlcv_1s.to_dict('records') if not self.ohlcv_1s.empty else [], + 'ohlcv_1m': self.ohlcv_1m.to_dict('records') if not self.ohlcv_1m.empty else [], + 'ohlcv_5m': self.ohlcv_5m.to_dict('records') if not self.ohlcv_5m.empty else [], + 'ohlcv_15m': self.ohlcv_15m.to_dict('records') if not self.ohlcv_15m.empty else [], + 'ohlcv_1h': self.ohlcv_1h.to_dict('records') if not self.ohlcv_1h.empty else [], + 'ohlcv_1d': self.ohlcv_1d.to_dict('records') if not self.ohlcv_1d.empty else [], + 'orderbook_snapshot': self.orderbook_snapshot, + 'orderbook_1s_agg': self.orderbook_1s_agg.to_dict('records') if not self.orderbook_1s_agg.empty else [], + 'imbalances': self.imbalances.to_dict('records') if not self.imbalances.empty else [], + 'indicators': self.indicators, + 'context_data': self.context_data.to_dict('records') if self.context_data is not None and not self.context_data.empty else None, + 'data_source': self.data_source, + 'query_latency_ms': self.query_latency_ms + } + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict(), default=str) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'InferenceDataFrame': + """Create from dictionary.""" + return cls( + symbol=data['symbol'], + timestamp=datetime.fromisoformat(data['timestamp']) if data.get('timestamp') else datetime.now(), + ohlcv_1s=pd.DataFrame(data.get('ohlcv_1s', [])), + ohlcv_1m=pd.DataFrame(data.get('ohlcv_1m', [])), + ohlcv_5m=pd.DataFrame(data.get('ohlcv_5m', [])), + ohlcv_15m=pd.DataFrame(data.get('ohlcv_15m', [])), + ohlcv_1h=pd.DataFrame(data.get('ohlcv_1h', [])), + ohlcv_1d=pd.DataFrame(data.get('ohlcv_1d', [])), + orderbook_snapshot=data.get('orderbook_snapshot'), + orderbook_1s_agg=pd.DataFrame(data.get('orderbook_1s_agg', [])), + imbalances=pd.DataFrame(data.get('imbalances', [])), + indicators=data.get('indicators', {}), + context_data=pd.DataFrame(data['context_data']) if data.get('context_data') else None, + data_source=data.get('data_source', 'unknown'), + query_latency_ms=data.get('query_latency_ms', 0.0) + ) + + def get_latest_price(self) -> Optional[float]: + """Get the latest close price from 1s data.""" + if not self.ohlcv_1s.empty: + return float(self.ohlcv_1s.iloc[-1]['close_price']) + return None + + def get_timeframe_data(self, timeframe: str) -> pd.DataFrame: + """Get OHLCV data for a specific timeframe.""" + timeframe_map = { + '1s': self.ohlcv_1s, + '1m': self.ohlcv_1m, + '5m': self.ohlcv_5m, + '15m': self.ohlcv_15m, + '1h': self.ohlcv_1h, + '1d': self.ohlcv_1d + } + return timeframe_map.get(timeframe, pd.DataFrame()) + + def has_complete_data(self) -> bool: + """Check if all required data is present.""" + return ( + not self.ohlcv_1s.empty and + not self.ohlcv_1m.empty and + not self.imbalances.empty and + self.orderbook_snapshot is not None + ) + + def get_data_summary(self) -> Dict[str, Any]: + """Get summary of available data.""" + return { + 'symbol': self.symbol, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'ohlcv_1s_rows': len(self.ohlcv_1s), + 'ohlcv_1m_rows': len(self.ohlcv_1m), + 'ohlcv_5m_rows': len(self.ohlcv_5m), + 'ohlcv_15m_rows': len(self.ohlcv_15m), + 'ohlcv_1h_rows': len(self.ohlcv_1h), + 'ohlcv_1d_rows': len(self.ohlcv_1d), + 'has_orderbook': self.orderbook_snapshot is not None, + 'orderbook_1s_agg_rows': len(self.orderbook_1s_agg), + 'imbalances_rows': len(self.imbalances), + 'indicators_count': len(self.indicators), + 'has_context_data': self.context_data is not None, + 'data_source': self.data_source, + 'query_latency_ms': self.query_latency_ms, + 'is_complete': self.has_complete_data() + } + + +@dataclass +class OrderBookDataFrame: + """ + Order book data with imbalances and aggregations. + Contains raw order book, price buckets, and multi-timeframe imbalance metrics. + """ + symbol: str + timestamp: datetime + + # Raw order book (top levels) + bids: List[Tuple[float, float]] = field(default_factory=list) # (price, size) + asks: List[Tuple[float, float]] = field(default_factory=list) # (price, size) + + # Aggregated data (price buckets) + price_buckets: pd.DataFrame = field(default_factory=pd.DataFrame) + + # Multi-timeframe imbalance metrics + imbalance_1s: float = 0.0 + imbalance_5s: float = 0.0 + imbalance_15s: float = 0.0 + imbalance_60s: float = 0.0 + + # Volume-weighted imbalances + volume_imbalance_1s: float = 0.0 + volume_imbalance_5s: float = 0.0 + volume_imbalance_15s: float = 0.0 + volume_imbalance_60s: float = 0.0 + + # Order book statistics + mid_price: float = 0.0 + spread: float = 0.0 + bid_volume: float = 0.0 + ask_volume: float = 0.0 + + # Metadata + exchange: str = 'binance' + sequence_id: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'symbol': self.symbol, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'bids': self.bids, + 'asks': self.asks, + 'price_buckets': self.price_buckets.to_dict('records') if not self.price_buckets.empty else [], + 'imbalance_1s': self.imbalance_1s, + 'imbalance_5s': self.imbalance_5s, + 'imbalance_15s': self.imbalance_15s, + 'imbalance_60s': self.imbalance_60s, + 'volume_imbalance_1s': self.volume_imbalance_1s, + 'volume_imbalance_5s': self.volume_imbalance_5s, + 'volume_imbalance_15s': self.volume_imbalance_15s, + 'volume_imbalance_60s': self.volume_imbalance_60s, + 'mid_price': self.mid_price, + 'spread': self.spread, + 'bid_volume': self.bid_volume, + 'ask_volume': self.ask_volume, + 'exchange': self.exchange, + 'sequence_id': self.sequence_id + } + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict(), default=str) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'OrderBookDataFrame': + """Create from dictionary.""" + return cls( + symbol=data['symbol'], + timestamp=datetime.fromisoformat(data['timestamp']) if data.get('timestamp') else datetime.now(), + bids=data.get('bids', []), + asks=data.get('asks', []), + price_buckets=pd.DataFrame(data.get('price_buckets', [])), + imbalance_1s=data.get('imbalance_1s', 0.0), + imbalance_5s=data.get('imbalance_5s', 0.0), + imbalance_15s=data.get('imbalance_15s', 0.0), + imbalance_60s=data.get('imbalance_60s', 0.0), + volume_imbalance_1s=data.get('volume_imbalance_1s', 0.0), + volume_imbalance_5s=data.get('volume_imbalance_5s', 0.0), + volume_imbalance_15s=data.get('volume_imbalance_15s', 0.0), + volume_imbalance_60s=data.get('volume_imbalance_60s', 0.0), + mid_price=data.get('mid_price', 0.0), + spread=data.get('spread', 0.0), + bid_volume=data.get('bid_volume', 0.0), + ask_volume=data.get('ask_volume', 0.0), + exchange=data.get('exchange', 'binance'), + sequence_id=data.get('sequence_id') + ) + + def calculate_statistics(self): + """Calculate order book statistics from bids and asks.""" + if self.bids and self.asks: + # Best bid and ask + best_bid = max(self.bids, key=lambda x: x[0])[0] if self.bids else 0 + best_ask = min(self.asks, key=lambda x: x[0])[0] if self.asks else 0 + + # Mid price and spread + if best_bid > 0 and best_ask > 0: + self.mid_price = (best_bid + best_ask) / 2 + self.spread = best_ask - best_bid + + # Total volumes + self.bid_volume = sum(size for _, size in self.bids) + self.ask_volume = sum(size for _, size in self.asks) + + def get_best_bid(self) -> Optional[Tuple[float, float]]: + """Get best bid (highest price).""" + if self.bids: + return max(self.bids, key=lambda x: x[0]) + return None + + def get_best_ask(self) -> Optional[Tuple[float, float]]: + """Get best ask (lowest price).""" + if self.asks: + return min(self.asks, key=lambda x: x[0]) + return None + + def get_spread_bps(self) -> float: + """Get spread in basis points.""" + if self.mid_price > 0 and self.spread > 0: + return (self.spread / self.mid_price) * 10000 + return 0.0 + + def get_imbalance_summary(self) -> Dict[str, float]: + """Get summary of all imbalance metrics.""" + return { + 'imbalance_1s': self.imbalance_1s, + 'imbalance_5s': self.imbalance_5s, + 'imbalance_15s': self.imbalance_15s, + 'imbalance_60s': self.imbalance_60s, + 'volume_imbalance_1s': self.volume_imbalance_1s, + 'volume_imbalance_5s': self.volume_imbalance_5s, + 'volume_imbalance_15s': self.volume_imbalance_15s, + 'volume_imbalance_60s': self.volume_imbalance_60s + } + + def is_valid(self) -> bool: + """Check if order book data is valid.""" + if not self.bids or not self.asks: + return False + + best_bid = self.get_best_bid() + best_ask = self.get_best_ask() + + if not best_bid or not best_ask: + return False + + # Bid must be less than ask + return best_bid[0] < best_ask[0] + + +@dataclass +class TradeEvent: + """Individual trade event.""" + symbol: str + timestamp: datetime + price: float + size: float + side: str # 'buy' or 'sell' + trade_id: str + exchange: str = 'binance' + is_buyer_maker: bool = False + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return asdict(self) + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict(), default=str) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'TradeEvent': + """Create from dictionary.""" + return cls( + symbol=data['symbol'], + timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data['timestamp'], str) else data['timestamp'], + price=float(data['price']), + size=float(data['size']), + side=data['side'], + trade_id=str(data['trade_id']), + exchange=data.get('exchange', 'binance'), + is_buyer_maker=data.get('is_buyer_maker', False) + ) + + +@dataclass +class OHLCVCandle: + """Single OHLCV candlestick.""" + symbol: str + timestamp: datetime + timeframe: str + open_price: float + high_price: float + low_price: float + close_price: float + volume: float + trade_count: int = 0 + + # Technical indicators (optional) + rsi_14: Optional[float] = None + macd: Optional[float] = None + macd_signal: Optional[float] = None + macd_histogram: Optional[float] = None + bb_upper: Optional[float] = None + bb_middle: Optional[float] = None + bb_lower: Optional[float] = None + ema_12: Optional[float] = None + ema_26: Optional[float] = None + sma_20: Optional[float] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return asdict(self) + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict(), default=str) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'OHLCVCandle': + """Create from dictionary.""" + return cls( + symbol=data['symbol'], + timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data['timestamp'], str) else data['timestamp'], + timeframe=data['timeframe'], + open_price=float(data['open_price']), + high_price=float(data['high_price']), + low_price=float(data['low_price']), + close_price=float(data['close_price']), + volume=float(data['volume']), + trade_count=int(data.get('trade_count', 0)), + rsi_14=float(data['rsi_14']) if data.get('rsi_14') is not None else None, + macd=float(data['macd']) if data.get('macd') is not None else None, + macd_signal=float(data['macd_signal']) if data.get('macd_signal') is not None else None, + macd_histogram=float(data['macd_histogram']) if data.get('macd_histogram') is not None else None, + bb_upper=float(data['bb_upper']) if data.get('bb_upper') is not None else None, + bb_middle=float(data['bb_middle']) if data.get('bb_middle') is not None else None, + bb_lower=float(data['bb_lower']) if data.get('bb_lower') is not None else None, + ema_12=float(data['ema_12']) if data.get('ema_12') is not None else None, + ema_26=float(data['ema_26']) if data.get('ema_26') is not None else None, + sma_20=float(data['sma_20']) if data.get('sma_20') is not None else None + ) + + def is_valid(self) -> bool: + """Check if candle data is valid.""" + # High must be >= low + if self.high_price < self.low_price: + return False + + # High must be >= open and close + if self.high_price < self.open_price or self.high_price < self.close_price: + return False + + # Low must be <= open and close + if self.low_price > self.open_price or self.low_price > self.close_price: + return False + + # Volume must be non-negative + if self.volume < 0: + return False + + return True + + def get_price_change(self) -> float: + """Get price change (close - open).""" + return self.close_price - self.open_price + + def get_price_change_percent(self) -> float: + """Get price change percentage.""" + if self.open_price > 0: + return ((self.close_price - self.open_price) / self.open_price) * 100 + return 0.0 + + def get_range(self) -> float: + """Get price range (high - low).""" + return self.high_price - self.low_price + + def is_bullish(self) -> bool: + """Check if candle is bullish (close > open).""" + return self.close_price > self.open_price + + def is_bearish(self) -> bool: + """Check if candle is bearish (close < open).""" + return self.close_price < self.open_price diff --git a/core/unified_data_validator.py b/core/unified_data_validator.py new file mode 100644 index 0000000..f33eca3 --- /dev/null +++ b/core/unified_data_validator.py @@ -0,0 +1,527 @@ +""" +Data Validator for unified storage system. +Validates all incoming data before storage to ensure data integrity. +""" + +import logging +from datetime import datetime, timezone +from typing import Dict, List, Tuple, Optional, Any +import pandas as pd + +from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent + +logger = logging.getLogger(__name__) + + +class DataValidator: + """ + Validates all incoming data before storage. + Ensures data integrity and consistency across the system. + """ + + # Validation thresholds + MAX_PRICE_CHANGE_PERCENT = 50.0 # 50% max price change per candle + MIN_PRICE = 0.0001 # Minimum valid price + MAX_PRICE = 1000000.0 # Maximum valid price + MIN_VOLUME = 0.0 # Minimum valid volume + MAX_SPREAD_PERCENT = 10.0 # 10% max spread + + @staticmethod + def validate_ohlcv(candle: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Validate OHLCV candle data. + + Args: + candle: Dictionary with OHLCV data + + Returns: + Tuple of (is_valid, error_message) + """ + try: + # Check required fields + required_fields = ['timestamp', 'symbol', 'timeframe', 'open_price', + 'high_price', 'low_price', 'close_price', 'volume'] + + for field in required_fields: + if field not in candle: + return False, f"Missing required field: {field}" + + # Extract values + open_price = float(candle['open_price']) + high_price = float(candle['high_price']) + low_price = float(candle['low_price']) + close_price = float(candle['close_price']) + volume = float(candle['volume']) + + # Validate price ranges + prices = [open_price, high_price, low_price, close_price] + for price in prices: + if price < DataValidator.MIN_PRICE: + return False, f"Price below minimum: {price}" + if price > DataValidator.MAX_PRICE: + return False, f"Price above maximum: {price}" + + # Validate OHLC relationships + if high_price < low_price: + return False, f"High ({high_price}) < Low ({low_price})" + + if high_price < open_price: + return False, f"High ({high_price}) < Open ({open_price})" + + if high_price < close_price: + return False, f"High ({high_price}) < Close ({close_price})" + + if low_price > open_price: + return False, f"Low ({low_price}) > Open ({open_price})" + + if low_price > close_price: + return False, f"Low ({low_price}) > Close ({close_price})" + + # Validate volume + if volume < DataValidator.MIN_VOLUME: + return False, f"Volume below minimum: {volume}" + + # Validate price change (prevent extreme outliers) + if open_price > 0: + price_change_percent = abs((close_price - open_price) / open_price) * 100 + if price_change_percent > DataValidator.MAX_PRICE_CHANGE_PERCENT: + return False, f"Price change too large: {price_change_percent:.2f}%" + + # Validate timestamp + if not DataValidator._validate_timestamp(candle['timestamp']): + return False, "Invalid timestamp" + + # Validate symbol + if not DataValidator._validate_symbol(candle['symbol']): + return False, f"Invalid symbol: {candle['symbol']}" + + # Validate timeframe + if not DataValidator._validate_timeframe(candle['timeframe']): + return False, f"Invalid timeframe: {candle['timeframe']}" + + return True, None + + except (ValueError, TypeError, KeyError) as e: + return False, f"Validation error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error validating OHLCV: {e}") + return False, f"Unexpected error: {str(e)}" + + @staticmethod + def validate_ohlcv_candle(candle: OHLCVCandle) -> Tuple[bool, Optional[str]]: + """ + Validate OHLCVCandle object. + + Args: + candle: OHLCVCandle object + + Returns: + Tuple of (is_valid, error_message) + """ + candle_dict = { + 'timestamp': candle.timestamp, + 'symbol': candle.symbol, + 'timeframe': candle.timeframe, + 'open_price': candle.open_price, + 'high_price': candle.high_price, + 'low_price': candle.low_price, + 'close_price': candle.close_price, + 'volume': candle.volume + } + return DataValidator.validate_ohlcv(candle_dict) + + @staticmethod + def validate_orderbook(orderbook: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Validate order book data. + + Args: + orderbook: Dictionary with order book data + + Returns: + Tuple of (is_valid, error_message) + """ + try: + # Check required fields + if 'bids' not in orderbook or 'asks' not in orderbook: + return False, "Missing bids or asks" + + bids = orderbook['bids'] + asks = orderbook['asks'] + + # Check if bids and asks are lists + if not isinstance(bids, list) or not isinstance(asks, list): + return False, "Bids and asks must be lists" + + # Check if bids and asks are not empty + if not bids or not asks: + return False, "Bids or asks are empty" + + # Validate bid levels + for bid in bids: + if not isinstance(bid, (list, tuple)) or len(bid) < 2: + return False, "Invalid bid format" + + price, size = float(bid[0]), float(bid[1]) + + if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE: + return False, f"Invalid bid price: {price}" + + if size <= 0: + return False, f"Invalid bid size: {size}" + + # Validate ask levels + for ask in asks: + if not isinstance(ask, (list, tuple)) or len(ask) < 2: + return False, "Invalid ask format" + + price, size = float(ask[0]), float(ask[1]) + + if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE: + return False, f"Invalid ask price: {price}" + + if size <= 0: + return False, f"Invalid ask size: {size}" + + # Validate bid/ask relationship + best_bid = max(float(bid[0]) for bid in bids) + best_ask = min(float(ask[0]) for ask in asks) + + if best_bid >= best_ask: + return False, f"Best bid ({best_bid}) >= Best ask ({best_ask})" + + # Validate spread + spread = best_ask - best_bid + mid_price = (best_bid + best_ask) / 2 + spread_percent = (spread / mid_price) * 100 + + if spread_percent > DataValidator.MAX_SPREAD_PERCENT: + return False, f"Spread too large: {spread_percent:.2f}%" + + # Validate timestamp if present + if 'timestamp' in orderbook: + if not DataValidator._validate_timestamp(orderbook['timestamp']): + return False, "Invalid timestamp" + + # Validate symbol if present + if 'symbol' in orderbook: + if not DataValidator._validate_symbol(orderbook['symbol']): + return False, f"Invalid symbol: {orderbook['symbol']}" + + return True, None + + except (ValueError, TypeError, KeyError) as e: + return False, f"Validation error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error validating orderbook: {e}") + return False, f"Unexpected error: {str(e)}" + + @staticmethod + def validate_orderbook_dataframe(orderbook: OrderBookDataFrame) -> Tuple[bool, Optional[str]]: + """ + Validate OrderBookDataFrame object. + + Args: + orderbook: OrderBookDataFrame object + + Returns: + Tuple of (is_valid, error_message) + """ + orderbook_dict = { + 'bids': orderbook.bids, + 'asks': orderbook.asks, + 'timestamp': orderbook.timestamp, + 'symbol': orderbook.symbol + } + return DataValidator.validate_orderbook(orderbook_dict) + + @staticmethod + def validate_trade(trade: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Validate trade event data. + + Args: + trade: Dictionary with trade data + + Returns: + Tuple of (is_valid, error_message) + """ + try: + # Check required fields + required_fields = ['timestamp', 'symbol', 'price', 'size', 'side', 'trade_id'] + + for field in required_fields: + if field not in trade: + return False, f"Missing required field: {field}" + + # Validate price + price = float(trade['price']) + if price < DataValidator.MIN_PRICE or price > DataValidator.MAX_PRICE: + return False, f"Invalid price: {price}" + + # Validate size + size = float(trade['size']) + if size <= 0: + return False, f"Invalid size: {size}" + + # Validate side + side = trade['side'].lower() + if side not in ['buy', 'sell', 'bid', 'ask']: + return False, f"Invalid side: {trade['side']}" + + # Validate timestamp + if not DataValidator._validate_timestamp(trade['timestamp']): + return False, "Invalid timestamp" + + # Validate symbol + if not DataValidator._validate_symbol(trade['symbol']): + return False, f"Invalid symbol: {trade['symbol']}" + + # Validate trade_id + if not trade['trade_id']: + return False, "Empty trade_id" + + return True, None + + except (ValueError, TypeError, KeyError) as e: + return False, f"Validation error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error validating trade: {e}") + return False, f"Unexpected error: {str(e)}" + + @staticmethod + def validate_trade_event(trade: TradeEvent) -> Tuple[bool, Optional[str]]: + """ + Validate TradeEvent object. + + Args: + trade: TradeEvent object + + Returns: + Tuple of (is_valid, error_message) + """ + trade_dict = { + 'timestamp': trade.timestamp, + 'symbol': trade.symbol, + 'price': trade.price, + 'size': trade.size, + 'side': trade.side, + 'trade_id': trade.trade_id + } + return DataValidator.validate_trade(trade_dict) + + @staticmethod + def validate_imbalances(imbalances: Dict[str, float]) -> Tuple[bool, Optional[str]]: + """ + Validate order book imbalance metrics. + + Args: + imbalances: Dictionary with imbalance metrics + + Returns: + Tuple of (is_valid, error_message) + """ + try: + # Expected imbalance fields + expected_fields = [ + 'imbalance_1s', 'imbalance_5s', 'imbalance_15s', 'imbalance_60s', + 'volume_imbalance_1s', 'volume_imbalance_5s', + 'volume_imbalance_15s', 'volume_imbalance_60s' + ] + + # Check if at least some imbalance fields are present + present_fields = [f for f in expected_fields if f in imbalances] + if not present_fields: + return False, "No imbalance fields present" + + # Validate imbalance values (should be between -1 and 1) + for field in present_fields: + value = float(imbalances[field]) + if value < -1.0 or value > 1.0: + logger.warning(f"Imbalance {field} out of range [-1, 1]: {value}") + # Don't fail validation, just warn + + return True, None + + except (ValueError, TypeError) as e: + return False, f"Validation error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error validating imbalances: {e}") + return False, f"Unexpected error: {str(e)}" + + @staticmethod + def _validate_timestamp(timestamp: Any) -> bool: + """Validate timestamp format and value.""" + try: + if isinstance(timestamp, datetime): + # Check if timestamp is not too far in the future or past + now = datetime.now(timezone.utc) + diff_days = abs((timestamp - now).days) + + # Allow timestamps within 10 years + if diff_days > 3650: + logger.warning(f"Timestamp too far from now: {timestamp}") + return False + + return True + elif isinstance(timestamp, (int, float)): + # Unix timestamp + dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) + return DataValidator._validate_timestamp(dt) + elif isinstance(timestamp, str): + # ISO format string + dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) + return DataValidator._validate_timestamp(dt) + else: + return False + + except Exception as e: + logger.debug(f"Timestamp validation error: {e}") + return False + + @staticmethod + def _validate_symbol(symbol: str) -> bool: + """Validate trading symbol format.""" + if not symbol or not isinstance(symbol, str): + return False + + # Symbol should be non-empty and reasonable length + if len(symbol) < 3 or len(symbol) > 20: + return False + + # Common symbol formats: BTCUSDT, BTC/USDT, BTC-USDT + valid_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/-') + if not all(c in valid_chars for c in symbol.upper()): + return False + + return True + + @staticmethod + def _validate_timeframe(timeframe: str) -> bool: + """Validate timeframe format.""" + valid_timeframes = ['1s', '1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] + return timeframe in valid_timeframes + + @staticmethod + def validate_dataframe(df: pd.DataFrame, required_columns: List[str]) -> Tuple[bool, Optional[str]]: + """ + Validate pandas DataFrame has required columns and valid data. + + Args: + df: DataFrame to validate + required_columns: List of required column names + + Returns: + Tuple of (is_valid, error_message) + """ + try: + # Check if DataFrame is empty + if df.empty: + return False, "DataFrame is empty" + + # Check required columns + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + return False, f"Missing columns: {missing_columns}" + + # Check for null values in required columns + null_counts = df[required_columns].isnull().sum() + if null_counts.any(): + null_cols = null_counts[null_counts > 0].to_dict() + return False, f"Null values found: {null_cols}" + + return True, None + + except Exception as e: + logger.error(f"DataFrame validation error: {e}") + return False, f"Validation error: {str(e)}" + + @staticmethod + def sanitize_ohlcv(candle: Dict[str, Any]) -> Dict[str, Any]: + """ + Sanitize OHLCV data by fixing common issues. + + Args: + candle: OHLCV candle dictionary + + Returns: + Sanitized candle dictionary + """ + sanitized = candle.copy() + + try: + # Ensure numeric types + for field in ['open_price', 'high_price', 'low_price', 'close_price', 'volume']: + if field in sanitized: + sanitized[field] = float(sanitized[field]) + + # Fix high/low if needed + prices = [sanitized.get('open_price', 0), sanitized.get('close_price', 0)] + if 'high_price' in sanitized and 'low_price' in sanitized: + high = sanitized['high_price'] + low = sanitized['low_price'] + + # Ensure high >= all prices + sanitized['high_price'] = max(high, *prices) + + # Ensure low <= all prices + sanitized['low_price'] = min(low, *prices) + + # Ensure non-negative volume + if 'volume' in sanitized: + sanitized['volume'] = max(0, sanitized['volume']) + + # Ensure trade_count is integer + if 'trade_count' in sanitized: + sanitized['trade_count'] = int(sanitized['trade_count']) + + except Exception as e: + logger.error(f"Error sanitizing OHLCV: {e}") + + return sanitized + + @staticmethod + def sanitize_orderbook(orderbook: Dict[str, Any]) -> Dict[str, Any]: + """ + Sanitize order book data by fixing common issues. + + Args: + orderbook: Order book dictionary + + Returns: + Sanitized order book dictionary + """ + sanitized = orderbook.copy() + + try: + # Ensure bids and asks are lists + if 'bids' in sanitized and not isinstance(sanitized['bids'], list): + sanitized['bids'] = [] + + if 'asks' in sanitized and not isinstance(sanitized['asks'], list): + sanitized['asks'] = [] + + # Remove invalid levels + if 'bids' in sanitized: + sanitized['bids'] = [ + bid for bid in sanitized['bids'] + if isinstance(bid, (list, tuple)) and len(bid) >= 2 and float(bid[0]) > 0 and float(bid[1]) > 0 + ] + + if 'asks' in sanitized: + sanitized['asks'] = [ + ask for ask in sanitized['asks'] + if isinstance(ask, (list, tuple)) and len(ask) >= 2 and float(ask[0]) > 0 and float(ask[1]) > 0 + ] + + # Sort bids descending, asks ascending + if 'bids' in sanitized: + sanitized['bids'] = sorted(sanitized['bids'], key=lambda x: float(x[0]), reverse=True) + + if 'asks' in sanitized: + sanitized['asks'] = sorted(sanitized['asks'], key=lambda x: float(x[0])) + + except Exception as e: + logger.error(f"Error sanitizing orderbook: {e}") + + return sanitized diff --git a/examples/report_crawler_example.py b/examples/report_crawler_example.py new file mode 100644 index 0000000..5523b3e --- /dev/null +++ b/examples/report_crawler_example.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Report Data Crawler Usage Example + +This script demonstrates how to use the new report data crawler +to generate comprehensive trading reports for different pairs. +""" + +import sys +import os +import logging +from datetime import datetime + +# Add the project root to the Python path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from core.data_provider import DataProvider + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def generate_reports_example(): + """Example of generating reports for multiple trading pairs""" + try: + logger.info("Initializing DataProvider...") + data_provider = DataProvider() + + # Wait for initial data to load + logger.info("Waiting for initial data to load...") + import time + time.sleep(3) + + # Generate reports for different pairs + symbols = ['BTC/USDT', 'ETH/USDT'] + + for symbol in symbols: + logger.info(f"Generating report for {symbol}...") + + # Method 1: Generate formatted report text + report_text = data_provider.generate_trading_report(symbol) + + if report_text: + print(f"\n{'='*80}") + print(f"TRADING REPORT FOR {symbol}") + print(f"{'='*80}") + print(report_text) + print(f"{'='*80}") + else: + logger.error(f"Failed to generate report for {symbol}") + + # Method 2: Get raw report data for analysis + logger.info("Getting raw report data for analysis...") + report_data = data_provider.crawl_comprehensive_report('BTC/USDT') + + if report_data: + print(f"\nRaw Data Analysis for BTC/USDT:") + print(f"- Current Price: ${report_data.current_price:,.2f}") + print(f"- EMA20: ${report_data.current_ema20:,.2f}") + print(f"- MACD: {report_data.current_macd:.3f}") + print(f"- RSI (7): {report_data.current_rsi_7:.1f}") + print(f"- Open Interest: {report_data.open_interest_latest:,.0f}") + print(f"- Funding Rate: {report_data.funding_rate:.2e}") + print(f"- Price Trend: {len(report_data.mid_prices)} data points") + + # Method 3: Batch processing multiple pairs + logger.info("Batch processing multiple pairs...") + results = data_provider.get_report_data_for_multiple_pairs(symbols) + + print(f"\nBatch Results:") + for symbol, data in results.items(): + if data: + print(f"✅ {symbol}: ${data.current_price:,.2f} (RSI: {data.current_rsi_7:.1f})") + else: + print(f"❌ {symbol}: Failed to get data") + + return True + + except Exception as e: + logger.error(f"Example failed with error: {e}") + import traceback + traceback.print_exc() + return False + +def main(): + """Main example function""" + logger.info("Report Data Crawler Usage Example") + logger.info(f"Started at: {datetime.now()}") + + success = generate_reports_example() + + if success: + logger.info("Example completed successfully!") + print("\n🎉 Report Data Crawler is ready for use!") + print("\nUsage:") + print("1. data_provider.generate_trading_report('BTC/USDT') - Get formatted report") + print("2. data_provider.crawl_comprehensive_report('BTC/USDT') - Get raw data") + print("3. data_provider.get_report_data_for_multiple_pairs(['BTC/USDT', 'ETH/USDT']) - Batch processing") + else: + logger.error("Example failed!") + sys.exit(1) + +if __name__ == "__main__": + main()