""" Multi-Timeframe, Multi-Symbol Data Provider This module consolidates all data functionality including: - Historical data fetching from Binance API - Real-time data streaming via WebSocket - Multi-timeframe candle generation - Caching and data management - Technical indicators calculation """ import asyncio import json import logging import os import time import websockets import requests import pandas as pd import numpy as np from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any import ta from threading import Thread, Lock from collections import deque from .config import get_config logger = logging.getLogger(__name__) class DataProvider: """Unified data provider for historical and real-time market data""" def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): """Initialize the data provider""" self.config = get_config() self.symbols = symbols or self.config.symbols self.timeframes = timeframes or self.config.timeframes # Data storage self.historical_data = {} # {symbol: {timeframe: DataFrame}} self.real_time_data = {} # {symbol: {timeframe: deque}} self.current_prices = {} # {symbol: float} # Real-time processing self.websocket_tasks = {} self.is_streaming = False self.data_lock = Lock() # Cache settings self.cache_enabled = self.config.data.get('cache_enabled', True) self.cache_dir = Path(self.config.data.get('cache_dir', 'cache')) self.cache_dir.mkdir(parents=True, exist_ok=True) # Timeframe conversion self.timeframe_seconds = { '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]: """Get historical OHLCV data for a symbol and timeframe""" try: # Check cache first if not refresh and self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) >= limit * 0.8: logger.info(f"Using cached data for {symbol} {timeframe}") return cached_data.tail(limit) # Fetch from API logger.info(f"Fetching historical data for {symbol} {timeframe}") df = self._fetch_from_binance(symbol, timeframe, limit) if df is not None and not df.empty: # Add technical indicators df = self._add_technical_indicators(df) # Cache the data if self.cache_enabled: self._save_to_cache(df, symbol, timeframe) # Store in memory if symbol not in self.historical_data: self.historical_data[symbol] = {} self.historical_data[symbol][timeframe] = df return df logger.warning(f"No data received for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}") return None def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from Binance API""" try: # Convert symbol format binance_symbol = symbol.replace('/', '').upper() # Convert timeframe timeframe_map = { '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } binance_timeframe = timeframe_map.get(timeframe, '1h') # API request url = "https://api.binance.com/api/v3/klines" params = { 'symbol': binance_symbol, 'interval': binance_timeframe, 'limit': limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.sort_values('timestamp').reset_index(drop=True) logger.info(f"Fetched {len(df)} candles for {symbol} {timeframe}") return df except Exception as e: logger.error(f"Error fetching from Binance API: {e}") return None def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add technical indicators to the DataFrame""" try: df = df.copy() # Moving averages df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50) df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12) df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26) # MACD macd = ta.trend.MACD(df['close']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_histogram'] = macd.macd_diff() # RSI df['rsi'] = ta.momentum.rsi(df['close'], window=14) # Bollinger Bands bollinger = ta.volatility.BollingerBands(df['close']) df['bb_upper'] = bollinger.bollinger_hband() df['bb_lower'] = bollinger.bollinger_lband() df['bb_middle'] = bollinger.bollinger_mavg() # Volume moving average (simple rolling mean since ta.volume.volume_sma doesn't exist) df['volume_sma'] = df['volume'].rolling(window=20).mean() # Fill NaN values df = df.bfill().fillna(0) return df except Exception as e: logger.error(f"Error adding technical indicators: {e}") return df def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Load data from cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" if cache_file.exists(): # Check if cache is recent (less than 1 hour old) cache_age = time.time() - cache_file.stat().st_mtime if cache_age < 3600: # 1 hour df = pd.read_parquet(cache_file) logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe}") return df else: logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/3600:.1f}h)") return None except Exception as e: logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}") return None def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str): """Save data to cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" df.to_parquet(cache_file, index=False) logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}") except Exception as e: logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}") async def start_real_time_streaming(self): """Start real-time data streaming for all symbols""" if self.is_streaming: logger.warning("Real-time streaming already active") return self.is_streaming = True logger.info("Starting real-time data streaming") # Start WebSocket for each symbol for symbol in self.symbols: task = asyncio.create_task(self._websocket_stream(symbol)) self.websocket_tasks[symbol] = task async def stop_real_time_streaming(self): """Stop real-time data streaming""" if not self.is_streaming: return logger.info("Stopping real-time data streaming") self.is_streaming = False # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() async def _websocket_stream(self, symbol: str): """WebSocket stream for a single symbol""" binance_symbol = symbol.replace('/', '').lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@ticker" while self.is_streaming: try: async with websockets.connect(url) as websocket: logger.info(f"WebSocket connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_tick(symbol, data) except Exception as e: logger.warning(f"Error processing tick for {symbol}: {e}") except Exception as e: logger.error(f"WebSocket error for {symbol}: {e}") if self.is_streaming: logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...") await asyncio.sleep(5) async def _process_tick(self, symbol: str, tick_data: Dict): """Process a single tick and update candles""" try: price = float(tick_data.get('c', 0)) # Current price volume = float(tick_data.get('v', 0)) # 24h Volume timestamp = pd.Timestamp.now() # Update current price with self.data_lock: self.current_prices[symbol] = price # Initialize real-time data structure if needed if symbol not in self.real_time_data: self.real_time_data[symbol] = {} for tf in self.timeframes: self.real_time_data[symbol][tf] = deque(maxlen=1000) # Create tick record tick = { 'timestamp': timestamp, 'price': price, 'volume': volume } # Update all timeframes for timeframe in self.timeframes: self._update_candle(symbol, timeframe, tick) except Exception as e: logger.error(f"Error processing tick for {symbol}: {e}") def _update_candle(self, symbol: str, timeframe: str, tick: Dict): """Update candle for specific timeframe""" try: timeframe_secs = self.timeframe_seconds.get(timeframe, 3600) current_time = tick['timestamp'] # Calculate candle start time candle_start = current_time.floor(f'{timeframe_secs}s') # Get current candle queue candle_queue = self.real_time_data[symbol][timeframe] # Check if we need a new candle if not candle_queue or candle_queue[-1]['timestamp'] != candle_start: # Create new candle new_candle = { 'timestamp': candle_start, 'open': tick['price'], 'high': tick['price'], 'low': tick['price'], 'close': tick['price'], 'volume': tick['volume'] } candle_queue.append(new_candle) else: # Update existing candle current_candle = candle_queue[-1] current_candle['high'] = max(current_candle['high'], tick['price']) current_candle['low'] = min(current_candle['low'], tick['price']) current_candle['close'] = tick['price'] current_candle['volume'] += tick['volume'] except Exception as e: logger.error(f"Error updating candle for {symbol} {timeframe}: {e}") def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame: """Get the latest candles combining historical and real-time data""" try: # Get historical data historical_df = self.get_historical_data(symbol, timeframe, limit=limit) # Get real-time data with self.data_lock: if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]: real_time_candles = list(self.real_time_data[symbol][timeframe]) if real_time_candles: # Convert to DataFrame rt_df = pd.DataFrame(real_time_candles) if historical_df is not None: # Combine historical and real-time # Remove overlapping candles from historical data if not rt_df.empty: cutoff_time = rt_df['timestamp'].min() historical_df = historical_df[historical_df['timestamp'] < cutoff_time] # Concatenate combined_df = pd.concat([historical_df, rt_df], ignore_index=True) else: combined_df = rt_df return combined_df.tail(limit) # Return just historical data if no real-time data return historical_df.tail(limit) if historical_df is not None else pd.DataFrame() except Exception as e: logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}") return pd.DataFrame() def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol""" with self.data_lock: return self.current_prices.get(symbol) def get_feature_matrix(self, symbol: str, timeframes: List[str] = None, window_size: int = 20) -> Optional[np.ndarray]: """Get feature matrix for multiple timeframes""" try: if timeframes is None: timeframes = self.timeframes features = [] for tf in timeframes: df = self.get_latest_candles(symbol, tf, limit=window_size + 50) if df is not None and len(df) >= window_size: # Select feature columns feature_cols = ['open', 'high', 'low', 'close', 'volume'] if 'sma_20' in df.columns: feature_cols.extend(['sma_20', 'rsi', 'macd']) # Get the latest window tf_features = df[feature_cols].tail(window_size).values features.append(tf_features) else: logger.warning(f"Insufficient data for {symbol} {tf}") return None if features: # Stack features from all timeframes return np.stack(features, axis=0) # Shape: (n_timeframes, window_size, n_features) return None except Exception as e: logger.error(f"Error creating feature matrix for {symbol}: {e}") return None def health_check(self) -> Dict[str, Any]: """Get health status of the data provider""" status = { 'streaming': self.is_streaming, 'symbols': len(self.symbols), 'timeframes': len(self.timeframes), 'current_prices': len(self.current_prices), 'websocket_tasks': len(self.websocket_tasks), 'historical_data_loaded': {} } # Check historical data availability for symbol in self.symbols: status['historical_data_loaded'][symbol] = {} for tf in self.timeframes: has_data = (symbol in self.historical_data and tf in self.historical_data[symbol] and not self.historical_data[symbol][tf].empty) status['historical_data_loaded'][symbol][tf] = has_data return status