437 lines
18 KiB
Python
437 lines
18 KiB
Python
"""
|
|
Multi-Timeframe, Multi-Symbol Data Provider
|
|
|
|
This module consolidates all data functionality including:
|
|
- Historical data fetching from Binance API
|
|
- Real-time data streaming via WebSocket
|
|
- Multi-timeframe candle generation
|
|
- Caching and data management
|
|
- Technical indicators calculation
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import websockets
|
|
import requests
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import ta
|
|
from threading import Thread, Lock
|
|
from collections import deque
|
|
|
|
from .config import get_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DataProvider:
|
|
"""Unified data provider for historical and real-time market data"""
|
|
|
|
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
|
|
"""Initialize the data provider"""
|
|
self.config = get_config()
|
|
self.symbols = symbols or self.config.symbols
|
|
self.timeframes = timeframes or self.config.timeframes
|
|
|
|
# Data storage
|
|
self.historical_data = {} # {symbol: {timeframe: DataFrame}}
|
|
self.real_time_data = {} # {symbol: {timeframe: deque}}
|
|
self.current_prices = {} # {symbol: float}
|
|
|
|
# Real-time processing
|
|
self.websocket_tasks = {}
|
|
self.is_streaming = False
|
|
self.data_lock = Lock()
|
|
|
|
# Cache settings
|
|
self.cache_enabled = self.config.data.get('cache_enabled', True)
|
|
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Timeframe conversion
|
|
self.timeframe_seconds = {
|
|
'1m': 60, '5m': 300, '15m': 900, '30m': 1800,
|
|
'1h': 3600, '4h': 14400, '1d': 86400
|
|
}
|
|
|
|
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
|
logger.info(f"Timeframes: {self.timeframes}")
|
|
|
|
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000,
|
|
refresh: bool = False) -> Optional[pd.DataFrame]:
|
|
"""Get historical OHLCV data for a symbol and timeframe"""
|
|
try:
|
|
# Check cache first
|
|
if not refresh and self.cache_enabled:
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and len(cached_data) >= limit * 0.8:
|
|
logger.info(f"Using cached data for {symbol} {timeframe}")
|
|
return cached_data.tail(limit)
|
|
|
|
# Fetch from API
|
|
logger.info(f"Fetching historical data for {symbol} {timeframe}")
|
|
df = self._fetch_from_binance(symbol, timeframe, limit)
|
|
|
|
if df is not None and not df.empty:
|
|
# Add technical indicators
|
|
df = self._add_technical_indicators(df)
|
|
|
|
# Cache the data
|
|
if self.cache_enabled:
|
|
self._save_to_cache(df, symbol, timeframe)
|
|
|
|
# Store in memory
|
|
if symbol not in self.historical_data:
|
|
self.historical_data[symbol] = {}
|
|
self.historical_data[symbol][timeframe] = df
|
|
|
|
return df
|
|
|
|
logger.warning(f"No data received for {symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Fetch data from Binance API"""
|
|
try:
|
|
# Convert symbol format
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert timeframe
|
|
timeframe_map = {
|
|
'1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
|
|
'1h': '1h', '4h': '4h', '1d': '1d'
|
|
}
|
|
binance_timeframe = timeframe_map.get(timeframe, '1h')
|
|
|
|
# API request
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'interval': binance_timeframe,
|
|
'limit': limit
|
|
}
|
|
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
|
|
'taker_buy_quote', 'ignore'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"Fetched {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching from Binance API: {e}")
|
|
return None
|
|
|
|
def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add technical indicators to the DataFrame"""
|
|
try:
|
|
df = df.copy()
|
|
|
|
# Moving averages
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50)
|
|
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
|
|
df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26)
|
|
|
|
# MACD
|
|
macd = ta.trend.MACD(df['close'])
|
|
df['macd'] = macd.macd()
|
|
df['macd_signal'] = macd.macd_signal()
|
|
df['macd_histogram'] = macd.macd_diff()
|
|
|
|
# RSI
|
|
df['rsi'] = ta.momentum.rsi(df['close'], window=14)
|
|
|
|
# Bollinger Bands
|
|
bollinger = ta.volatility.BollingerBands(df['close'])
|
|
df['bb_upper'] = bollinger.bollinger_hband()
|
|
df['bb_lower'] = bollinger.bollinger_lband()
|
|
df['bb_middle'] = bollinger.bollinger_mavg()
|
|
|
|
# Volume moving average (simple rolling mean since ta.volume.volume_sma doesn't exist)
|
|
df['volume_sma'] = df['volume'].rolling(window=20).mean()
|
|
|
|
# Fill NaN values
|
|
df = df.bfill().fillna(0)
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding technical indicators: {e}")
|
|
return df
|
|
|
|
def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
|
"""Load data from cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
if cache_file.exists():
|
|
# Check if cache is recent (less than 1 hour old)
|
|
cache_age = time.time() - cache_file.stat().st_mtime
|
|
if cache_age < 3600: # 1 hour
|
|
df = pd.read_parquet(cache_file)
|
|
logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe}")
|
|
return df
|
|
else:
|
|
logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/3600:.1f}h)")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str):
|
|
"""Save data to cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
df.to_parquet(cache_file, index=False)
|
|
logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}")
|
|
except Exception as e:
|
|
logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}")
|
|
|
|
async def start_real_time_streaming(self):
|
|
"""Start real-time data streaming for all symbols"""
|
|
if self.is_streaming:
|
|
logger.warning("Real-time streaming already active")
|
|
return
|
|
|
|
self.is_streaming = True
|
|
logger.info("Starting real-time data streaming")
|
|
|
|
# Start WebSocket for each symbol
|
|
for symbol in self.symbols:
|
|
task = asyncio.create_task(self._websocket_stream(symbol))
|
|
self.websocket_tasks[symbol] = task
|
|
|
|
async def stop_real_time_streaming(self):
|
|
"""Stop real-time data streaming"""
|
|
if not self.is_streaming:
|
|
return
|
|
|
|
logger.info("Stopping real-time data streaming")
|
|
self.is_streaming = False
|
|
|
|
# Cancel all WebSocket tasks
|
|
for symbol, task in self.websocket_tasks.items():
|
|
if not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self.websocket_tasks.clear()
|
|
|
|
async def _websocket_stream(self, symbol: str):
|
|
"""WebSocket stream for a single symbol"""
|
|
binance_symbol = symbol.replace('/', '').lower()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@ticker"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"WebSocket connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_tick(symbol, data)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing tick for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for {symbol}: {e}")
|
|
if self.is_streaming:
|
|
logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def _process_tick(self, symbol: str, tick_data: Dict):
|
|
"""Process a single tick and update candles"""
|
|
try:
|
|
price = float(tick_data.get('c', 0)) # Current price
|
|
volume = float(tick_data.get('v', 0)) # 24h Volume
|
|
timestamp = pd.Timestamp.now()
|
|
|
|
# Update current price
|
|
with self.data_lock:
|
|
self.current_prices[symbol] = price
|
|
|
|
# Initialize real-time data structure if needed
|
|
if symbol not in self.real_time_data:
|
|
self.real_time_data[symbol] = {}
|
|
for tf in self.timeframes:
|
|
self.real_time_data[symbol][tf] = deque(maxlen=1000)
|
|
|
|
# Create tick record
|
|
tick = {
|
|
'timestamp': timestamp,
|
|
'price': price,
|
|
'volume': volume
|
|
}
|
|
|
|
# Update all timeframes
|
|
for timeframe in self.timeframes:
|
|
self._update_candle(symbol, timeframe, tick)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tick for {symbol}: {e}")
|
|
|
|
def _update_candle(self, symbol: str, timeframe: str, tick: Dict):
|
|
"""Update candle for specific timeframe"""
|
|
try:
|
|
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
|
|
current_time = tick['timestamp']
|
|
|
|
# Calculate candle start time
|
|
candle_start = current_time.floor(f'{timeframe_secs}s')
|
|
|
|
# Get current candle queue
|
|
candle_queue = self.real_time_data[symbol][timeframe]
|
|
|
|
# Check if we need a new candle
|
|
if not candle_queue or candle_queue[-1]['timestamp'] != candle_start:
|
|
# Create new candle
|
|
new_candle = {
|
|
'timestamp': candle_start,
|
|
'open': tick['price'],
|
|
'high': tick['price'],
|
|
'low': tick['price'],
|
|
'close': tick['price'],
|
|
'volume': tick['volume']
|
|
}
|
|
candle_queue.append(new_candle)
|
|
else:
|
|
# Update existing candle
|
|
current_candle = candle_queue[-1]
|
|
current_candle['high'] = max(current_candle['high'], tick['price'])
|
|
current_candle['low'] = min(current_candle['low'], tick['price'])
|
|
current_candle['close'] = tick['price']
|
|
current_candle['volume'] += tick['volume']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating candle for {symbol} {timeframe}: {e}")
|
|
|
|
def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
|
|
"""Get the latest candles combining historical and real-time data"""
|
|
try:
|
|
# Get historical data
|
|
historical_df = self.get_historical_data(symbol, timeframe, limit=limit)
|
|
|
|
# Get real-time data
|
|
with self.data_lock:
|
|
if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]:
|
|
real_time_candles = list(self.real_time_data[symbol][timeframe])
|
|
|
|
if real_time_candles:
|
|
# Convert to DataFrame
|
|
rt_df = pd.DataFrame(real_time_candles)
|
|
|
|
if historical_df is not None:
|
|
# Combine historical and real-time
|
|
# Remove overlapping candles from historical data
|
|
if not rt_df.empty:
|
|
cutoff_time = rt_df['timestamp'].min()
|
|
historical_df = historical_df[historical_df['timestamp'] < cutoff_time]
|
|
|
|
# Concatenate
|
|
combined_df = pd.concat([historical_df, rt_df], ignore_index=True)
|
|
else:
|
|
combined_df = rt_df
|
|
|
|
return combined_df.tail(limit)
|
|
|
|
# Return just historical data if no real-time data
|
|
return historical_df.tail(limit) if historical_df is not None else pd.DataFrame()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_current_price(self, symbol: str) -> Optional[float]:
|
|
"""Get current price for a symbol"""
|
|
with self.data_lock:
|
|
return self.current_prices.get(symbol)
|
|
|
|
def get_feature_matrix(self, symbol: str, timeframes: List[str] = None,
|
|
window_size: int = 20) -> Optional[np.ndarray]:
|
|
"""Get feature matrix for multiple timeframes"""
|
|
try:
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
features = []
|
|
|
|
for tf in timeframes:
|
|
df = self.get_latest_candles(symbol, tf, limit=window_size + 50)
|
|
|
|
if df is not None and len(df) >= window_size:
|
|
# Select feature columns
|
|
feature_cols = ['open', 'high', 'low', 'close', 'volume']
|
|
if 'sma_20' in df.columns:
|
|
feature_cols.extend(['sma_20', 'rsi', 'macd'])
|
|
|
|
# Get the latest window
|
|
tf_features = df[feature_cols].tail(window_size).values
|
|
features.append(tf_features)
|
|
else:
|
|
logger.warning(f"Insufficient data for {symbol} {tf}")
|
|
return None
|
|
|
|
if features:
|
|
# Stack features from all timeframes
|
|
return np.stack(features, axis=0) # Shape: (n_timeframes, window_size, n_features)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating feature matrix for {symbol}: {e}")
|
|
return None
|
|
|
|
def health_check(self) -> Dict[str, Any]:
|
|
"""Get health status of the data provider"""
|
|
status = {
|
|
'streaming': self.is_streaming,
|
|
'symbols': len(self.symbols),
|
|
'timeframes': len(self.timeframes),
|
|
'current_prices': len(self.current_prices),
|
|
'websocket_tasks': len(self.websocket_tasks),
|
|
'historical_data_loaded': {}
|
|
}
|
|
|
|
# Check historical data availability
|
|
for symbol in self.symbols:
|
|
status['historical_data_loaded'][symbol] = {}
|
|
for tf in self.timeframes:
|
|
has_data = (symbol in self.historical_data and
|
|
tf in self.historical_data[symbol] and
|
|
not self.historical_data[symbol][tf].empty)
|
|
status['historical_data_loaded'][symbol][tf] = has_data
|
|
|
|
return status |