1371 lines
60 KiB
Python
1371 lines
60 KiB
Python
"""
|
|
Multi-Timeframe, Multi-Symbol Data Provider
|
|
|
|
This module consolidates all data functionality including:
|
|
- Historical data fetching from Binance API
|
|
- Real-time data streaming via WebSocket
|
|
- Multi-timeframe candle generation
|
|
- Caching and data management
|
|
- Technical indicators calculation
|
|
- Centralized data distribution to multiple subscribers (AI models, dashboard, etc.)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
import websockets
|
|
import requests
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
import ta
|
|
from threading import Thread, Lock
|
|
from collections import deque
|
|
|
|
from .config import get_config
|
|
from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class MarketTick:
|
|
"""Standardized market tick data structure"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
price: float
|
|
volume: float
|
|
quantity: float
|
|
side: str # 'buy' or 'sell'
|
|
trade_id: str
|
|
is_buyer_maker: bool
|
|
raw_data: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class DataSubscriber:
|
|
"""Data subscriber information"""
|
|
subscriber_id: str
|
|
callback: Callable[[MarketTick], None]
|
|
symbols: List[str]
|
|
active: bool = True
|
|
last_update: datetime = field(default_factory=datetime.now)
|
|
tick_count: int = 0
|
|
subscriber_name: str = "unknown"
|
|
|
|
class DataProvider:
|
|
"""Unified data provider for historical and real-time market data with centralized distribution"""
|
|
|
|
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
|
|
"""Initialize the data provider"""
|
|
self.config = get_config()
|
|
self.symbols = symbols or self.config.symbols
|
|
self.timeframes = timeframes or self.config.timeframes
|
|
|
|
# Data storage
|
|
self.historical_data = {} # {symbol: {timeframe: DataFrame}}
|
|
self.real_time_data = {} # {symbol: {timeframe: deque}}
|
|
self.current_prices = {} # {symbol: float}
|
|
|
|
# Real-time processing
|
|
self.websocket_tasks = {}
|
|
self.is_streaming = False
|
|
self.data_lock = Lock()
|
|
|
|
# Subscriber management for centralized data distribution
|
|
self.subscribers: Dict[str, DataSubscriber] = {}
|
|
self.subscriber_lock = Lock()
|
|
self.tick_buffers: Dict[str, deque] = {}
|
|
self.buffer_size = 1000 # Keep last 1000 ticks per symbol
|
|
|
|
# Initialize tick buffers
|
|
for symbol in self.symbols:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
|
|
|
|
# Initialize tick aggregator for raw tick processing
|
|
binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols]
|
|
self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols)
|
|
|
|
# Raw tick and OHLCV bar callbacks
|
|
self.raw_tick_callbacks = []
|
|
self.ohlcv_bar_callbacks = []
|
|
|
|
# Performance tracking for subscribers
|
|
self.distribution_stats = {
|
|
'total_ticks_received': 0,
|
|
'total_ticks_distributed': 0,
|
|
'distribution_errors': 0,
|
|
'last_tick_time': {},
|
|
'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols},
|
|
'raw_ticks_processed': 0,
|
|
'ohlcv_bars_created': 0,
|
|
'patterns_detected': 0
|
|
}
|
|
|
|
# Data validation
|
|
self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols}
|
|
self.price_change_threshold = 0.1 # 10% price change threshold for validation
|
|
|
|
# Cache settings
|
|
self.cache_enabled = self.config.data.get('cache_enabled', True)
|
|
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Timeframe conversion
|
|
self.timeframe_seconds = {
|
|
'1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800,
|
|
'1h': 3600, '4h': 14400, '1d': 86400
|
|
}
|
|
|
|
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
|
logger.info(f"Timeframes: {self.timeframes}")
|
|
logger.info("Centralized data distribution enabled")
|
|
|
|
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
|
|
"""Get historical OHLCV data for a symbol and timeframe"""
|
|
try:
|
|
# If refresh=True, always fetch fresh data (skip cache for real-time updates)
|
|
if not refresh:
|
|
if self.cache_enabled:
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and len(cached_data) >= limit * 0.8:
|
|
logger.info(f"Using cached data for {symbol} {timeframe}")
|
|
return cached_data.tail(limit)
|
|
|
|
# Check if we need to preload 300s of data for first load
|
|
should_preload = self._should_preload_data(symbol, timeframe, limit)
|
|
|
|
if should_preload:
|
|
logger.info(f"Preloading 300s of data for {symbol} {timeframe}")
|
|
df = self._preload_300s_data(symbol, timeframe)
|
|
else:
|
|
# Fetch from API with requested limit (Binance primary, MEXC fallback)
|
|
logger.info(f"Fetching historical data for {symbol} {timeframe}")
|
|
df = self._fetch_from_binance(symbol, timeframe, limit)
|
|
|
|
# Fallback to MEXC if Binance fails
|
|
if df is None or df.empty:
|
|
logger.info(f"Binance failed, trying MEXC fallback for {symbol}")
|
|
df = self._fetch_from_mexc(symbol, timeframe, limit)
|
|
|
|
if df is not None and not df.empty:
|
|
# Add technical indicators
|
|
df = self._add_technical_indicators(df)
|
|
|
|
# Cache the data
|
|
if self.cache_enabled:
|
|
self._save_to_cache(df, symbol, timeframe)
|
|
|
|
# Store in memory
|
|
if symbol not in self.historical_data:
|
|
self.historical_data[symbol] = {}
|
|
self.historical_data[symbol][timeframe] = df
|
|
|
|
# Return requested amount
|
|
return df.tail(limit)
|
|
|
|
logger.warning(f"No data received for {symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool:
|
|
"""Determine if we should preload 300s of data"""
|
|
try:
|
|
# Check if we have any cached data
|
|
if self.cache_enabled:
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and len(cached_data) > 0:
|
|
return False # Already have some data
|
|
|
|
# Check if we have data in memory
|
|
if (symbol in self.historical_data and
|
|
timeframe in self.historical_data[symbol] and
|
|
len(self.historical_data[symbol][timeframe]) > 0):
|
|
return False # Already have data in memory
|
|
|
|
# Calculate if 300s worth of data would be more than requested limit
|
|
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
|
candles_in_300s = 300 // timeframe_seconds
|
|
|
|
# Preload if we need more than the requested limit or if it's a short timeframe
|
|
if candles_in_300s > limit or timeframe in ['1s', '1m']:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error determining if should preload data: {e}")
|
|
return False
|
|
|
|
def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
|
"""Preload 300 seconds worth of data for better initial performance"""
|
|
try:
|
|
# Calculate how many candles we need for 300 seconds
|
|
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
|
candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles
|
|
|
|
# For very short timeframes, limit to reasonable amount
|
|
if timeframe == '1s':
|
|
candles_needed = min(candles_needed, 300) # Max 300 1s candles
|
|
elif timeframe == '1m':
|
|
candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour)
|
|
else:
|
|
candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes
|
|
|
|
logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)")
|
|
|
|
# Fetch the data (Binance primary, MEXC fallback)
|
|
df = self._fetch_from_binance(symbol, timeframe, candles_needed)
|
|
|
|
# Fallback to MEXC if Binance fails
|
|
if df is None or df.empty:
|
|
logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}")
|
|
df = self._fetch_from_mexc(symbol, timeframe, candles_needed)
|
|
|
|
if df is not None and not df.empty:
|
|
logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
else:
|
|
logger.warning(f"Failed to preload data for {symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]:
|
|
"""Preload 300s of data for all symbols and timeframes"""
|
|
try:
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
preload_results = {}
|
|
|
|
for symbol in self.symbols:
|
|
preload_results[symbol] = {}
|
|
|
|
for timeframe in timeframes:
|
|
try:
|
|
logger.info(f"Preloading data for {symbol} {timeframe}")
|
|
|
|
# Check if we should preload
|
|
if self._should_preload_data(symbol, timeframe, 100):
|
|
df = self._preload_300s_data(symbol, timeframe)
|
|
|
|
if df is not None and not df.empty:
|
|
# Add technical indicators
|
|
df = self._add_technical_indicators(df)
|
|
|
|
# Cache the data
|
|
if self.cache_enabled:
|
|
self._save_to_cache(df, symbol, timeframe)
|
|
|
|
# Store in memory
|
|
if symbol not in self.historical_data:
|
|
self.historical_data[symbol] = {}
|
|
self.historical_data[symbol][timeframe] = df
|
|
|
|
preload_results[symbol][timeframe] = True
|
|
logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}")
|
|
else:
|
|
preload_results[symbol][timeframe] = False
|
|
logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}")
|
|
else:
|
|
preload_results[symbol][timeframe] = True # Already have data
|
|
logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preloading {symbol} {timeframe}: {e}")
|
|
preload_results[symbol][timeframe] = False
|
|
|
|
# Log summary
|
|
total_pairs = len(self.symbols) * len(timeframes)
|
|
successful_pairs = sum(1 for symbol_results in preload_results.values()
|
|
for success in symbol_results.values() if success)
|
|
|
|
logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded")
|
|
|
|
return preload_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in preload_all_symbols_data: {e}")
|
|
return {}
|
|
|
|
def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Fetch data from MEXC API (fallback data source when Binance is unavailable)"""
|
|
try:
|
|
# MEXC doesn't support 1s intervals
|
|
if timeframe == '1s':
|
|
logger.warning(f"MEXC doesn't support 1s intervals, skipping {symbol}")
|
|
return None
|
|
|
|
# Convert symbol format
|
|
mexc_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert timeframe for MEXC (excluding 1s)
|
|
timeframe_map = {
|
|
'1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
|
|
'1h': '1h', '4h': '4h', '1d': '1d'
|
|
}
|
|
mexc_timeframe = timeframe_map.get(timeframe)
|
|
|
|
if mexc_timeframe is None:
|
|
logger.warning(f"MEXC doesn't support timeframe {timeframe}, skipping {symbol}")
|
|
return None
|
|
|
|
# MEXC API request
|
|
url = "https://api.mexc.com/api/v3/klines"
|
|
params = {
|
|
'symbol': mexc_symbol,
|
|
'interval': mexc_timeframe,
|
|
'limit': limit
|
|
}
|
|
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Convert to DataFrame (MEXC uses 8 columns vs Binance's 12)
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"MEXC: Fetched {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"MEXC: Error fetching data: {e}")
|
|
return None
|
|
|
|
def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Fetch data from Binance API (primary data source) with HTTP 451 error handling"""
|
|
try:
|
|
# Convert symbol format
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert timeframe (now includes 1s support)
|
|
timeframe_map = {
|
|
'1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
|
|
'1h': '1h', '4h': '4h', '1d': '1d'
|
|
}
|
|
binance_timeframe = timeframe_map.get(timeframe, '1h')
|
|
|
|
# API request with timeout and better headers
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'interval': binance_timeframe,
|
|
'limit': limit
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'application/json',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
|
|
response = requests.get(url, params=params, headers=headers, timeout=10)
|
|
|
|
# Handle HTTP 451 (Unavailable For Legal Reasons) specifically
|
|
if response.status_code == 451:
|
|
logger.warning(f"Binance API returned 451 (blocked) for {symbol} {timeframe} - using fallback")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
|
|
'taker_buy_quote', 'ignore'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"Binance: Fetched {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
if "451" in str(e) or "Client Error" in str(e):
|
|
logger.warning(f"Binance API access blocked (451) for {symbol} {timeframe} - using fallback")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
else:
|
|
logger.error(f"Error fetching from Binance API: {e}")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
|
|
def _get_fallback_data(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Get fallback data when Binance API is unavailable - REAL DATA ONLY"""
|
|
try:
|
|
logger.info(f"FALLBACK: Attempting to get real cached data for {symbol} {timeframe}")
|
|
|
|
# ONLY try cached data
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and not cached_data.empty:
|
|
# Limit to requested amount
|
|
limited_data = cached_data.tail(limit) if len(cached_data) > limit else cached_data
|
|
logger.info(f"FALLBACK: Using cached real data for {symbol} {timeframe}: {len(limited_data)} bars")
|
|
return limited_data
|
|
|
|
# Try MEXC as secondary real data source
|
|
mexc_data = self._fetch_from_mexc(symbol, timeframe, limit)
|
|
if mexc_data is not None and not mexc_data.empty:
|
|
logger.info(f"FALLBACK: Using MEXC real data for {symbol} {timeframe}: {len(mexc_data)} bars")
|
|
return mexc_data
|
|
|
|
# NO SYNTHETIC DATA - Return None if no real data available
|
|
logger.warning(f"FALLBACK: No real data available for {symbol} {timeframe} - waiting for real data")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting fallback data: {e}")
|
|
return None
|
|
|
|
def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add comprehensive technical indicators for multi-timeframe analysis"""
|
|
try:
|
|
df = df.copy()
|
|
|
|
# Ensure we have enough data for indicators
|
|
if len(df) < 50:
|
|
logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows")
|
|
return self._add_basic_indicators(df)
|
|
|
|
# === TREND INDICATORS ===
|
|
# Moving averages (multiple timeframes)
|
|
df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10)
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50)
|
|
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
|
|
df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26)
|
|
df['ema_50'] = ta.trend.ema_indicator(df['close'], window=50)
|
|
|
|
# MACD family
|
|
macd = ta.trend.MACD(df['close'])
|
|
df['macd'] = macd.macd()
|
|
df['macd_signal'] = macd.macd_signal()
|
|
df['macd_histogram'] = macd.macd_diff()
|
|
|
|
# ADX (Average Directional Index)
|
|
adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close'])
|
|
df['adx'] = adx.adx()
|
|
df['adx_pos'] = adx.adx_pos()
|
|
df['adx_neg'] = adx.adx_neg()
|
|
|
|
# Parabolic SAR
|
|
psar = ta.trend.PSARIndicator(df['high'], df['low'], df['close'])
|
|
df['psar'] = psar.psar()
|
|
|
|
# === MOMENTUM INDICATORS ===
|
|
# RSI (multiple periods)
|
|
df['rsi_14'] = ta.momentum.rsi(df['close'], window=14)
|
|
df['rsi_7'] = ta.momentum.rsi(df['close'], window=7)
|
|
df['rsi_21'] = ta.momentum.rsi(df['close'], window=21)
|
|
|
|
# Stochastic Oscillator
|
|
stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close'])
|
|
df['stoch_k'] = stoch.stoch()
|
|
df['stoch_d'] = stoch.stoch_signal()
|
|
|
|
# Williams %R
|
|
df['williams_r'] = ta.momentum.williams_r(df['high'], df['low'], df['close'])
|
|
|
|
# Ultimate Oscillator (instead of CCI which isn't available)
|
|
df['ultimate_osc'] = ta.momentum.ultimate_oscillator(df['high'], df['low'], df['close'])
|
|
|
|
# === VOLATILITY INDICATORS ===
|
|
# Bollinger Bands
|
|
bollinger = ta.volatility.BollingerBands(df['close'])
|
|
df['bb_upper'] = bollinger.bollinger_hband()
|
|
df['bb_lower'] = bollinger.bollinger_lband()
|
|
df['bb_middle'] = bollinger.bollinger_mavg()
|
|
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
|
|
df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
|
|
|
|
# Average True Range
|
|
df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'])
|
|
|
|
# Keltner Channels
|
|
keltner = ta.volatility.KeltnerChannel(df['high'], df['low'], df['close'])
|
|
df['keltner_upper'] = keltner.keltner_channel_hband()
|
|
df['keltner_lower'] = keltner.keltner_channel_lband()
|
|
df['keltner_middle'] = keltner.keltner_channel_mband()
|
|
|
|
# === VOLUME INDICATORS ===
|
|
# Volume moving averages
|
|
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
|
|
df['volume_sma_20'] = df['volume'].rolling(window=20).mean()
|
|
df['volume_sma_50'] = df['volume'].rolling(window=50).mean()
|
|
|
|
# On Balance Volume
|
|
df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume'])
|
|
|
|
# Volume Price Trend
|
|
df['vpt'] = ta.volume.volume_price_trend(df['close'], df['volume'])
|
|
|
|
# Money Flow Index
|
|
df['mfi'] = ta.volume.money_flow_index(df['high'], df['low'], df['close'], df['volume'])
|
|
|
|
# Accumulation/Distribution Line
|
|
df['ad_line'] = ta.volume.acc_dist_index(df['high'], df['low'], df['close'], df['volume'])
|
|
|
|
# Volume Weighted Average Price (VWAP)
|
|
df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
|
|
|
|
# === PRICE ACTION INDICATORS ===
|
|
# Price position relative to range
|
|
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
|
|
|
|
# True Range (use ATR calculation for true range)
|
|
df['true_range'] = df['atr'] # ATR is based on true range, so use it directly
|
|
|
|
# Rate of Change
|
|
df['roc'] = ta.momentum.roc(df['close'], window=10)
|
|
|
|
# === CUSTOM INDICATORS ===
|
|
# Trend strength (combination of multiple trend indicators)
|
|
df['trend_strength'] = (
|
|
(df['close'] > df['sma_20']).astype(int) +
|
|
(df['sma_10'] > df['sma_20']).astype(int) +
|
|
(df['macd'] > df['macd_signal']).astype(int) +
|
|
(df['adx'] > 25).astype(int)
|
|
) / 4.0
|
|
|
|
# Momentum composite
|
|
df['momentum_composite'] = (
|
|
(df['rsi_14'] / 100) +
|
|
((df['stoch_k'] + 50) / 100) + # Normalize stoch_k
|
|
((df['williams_r'] + 50) / 100) # Normalize williams_r
|
|
) / 3.0
|
|
|
|
# Volatility regime
|
|
df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True)
|
|
|
|
# === FILL NaN VALUES ===
|
|
# Forward fill first, then backward fill, then zero fill
|
|
df = df.ffill().bfill().fillna(0)
|
|
|
|
logger.debug(f"Added {len([col for col in df.columns if col not in ['timestamp', 'open', 'high', 'low', 'close', 'volume']])} technical indicators")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding comprehensive technical indicators: {e}")
|
|
# Fallback to basic indicators
|
|
return self._add_basic_indicators(df)
|
|
|
|
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add basic indicators for small datasets"""
|
|
try:
|
|
df = df.copy()
|
|
|
|
# Basic moving averages
|
|
if len(df) >= 20:
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
|
|
|
|
# Basic RSI
|
|
if len(df) >= 14:
|
|
df['rsi_14'] = ta.momentum.rsi(df['close'], window=14)
|
|
|
|
# Basic volume indicators
|
|
if len(df) >= 10:
|
|
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
|
|
|
|
# Basic price action
|
|
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
|
|
df['price_position'] = df['price_position'].fillna(0.5) # Default to middle
|
|
|
|
# Fill NaN values
|
|
df = df.ffill().bfill().fillna(0)
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding basic indicators: {e}")
|
|
return df
|
|
|
|
def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
|
"""Load data from cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
if cache_file.exists():
|
|
# Check if cache is recent (less than 1 hour old)
|
|
cache_age = time.time() - cache_file.stat().st_mtime
|
|
if cache_age < 3600: # 1 hour
|
|
df = pd.read_parquet(cache_file)
|
|
logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe}")
|
|
return df
|
|
else:
|
|
logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/3600:.1f}h)")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str):
|
|
"""Save data to cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
df.to_parquet(cache_file, index=False)
|
|
logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}")
|
|
except Exception as e:
|
|
logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}")
|
|
|
|
async def start_real_time_streaming(self):
|
|
"""Start real-time data streaming for all symbols"""
|
|
if self.is_streaming:
|
|
logger.warning("Real-time streaming already active")
|
|
return
|
|
|
|
self.is_streaming = True
|
|
logger.info("Starting real-time data streaming")
|
|
|
|
# Start WebSocket for each symbol
|
|
for symbol in self.symbols:
|
|
task = asyncio.create_task(self._websocket_stream(symbol))
|
|
self.websocket_tasks[symbol] = task
|
|
|
|
async def stop_real_time_streaming(self):
|
|
"""Stop real-time data streaming"""
|
|
if not self.is_streaming:
|
|
return
|
|
|
|
logger.info("Stopping real-time data streaming")
|
|
self.is_streaming = False
|
|
|
|
# Cancel all WebSocket tasks
|
|
for symbol, task in self.websocket_tasks.items():
|
|
if not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self.websocket_tasks.clear()
|
|
|
|
async def _websocket_stream(self, symbol: str):
|
|
"""WebSocket stream for a single symbol using trade stream for better granularity"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
logger.info(f"Connecting to WebSocket for {symbol}: {url}")
|
|
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"WebSocket connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
await self._process_trade_message(binance_symbol, message)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing trade message for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for {symbol}: {e}")
|
|
self.distribution_stats['distribution_errors'] += 1
|
|
|
|
if self.is_streaming:
|
|
logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def _process_trade_message(self, symbol: str, message: str):
|
|
"""Process incoming trade message and distribute to subscribers"""
|
|
try:
|
|
trade_data = json.loads(message)
|
|
|
|
# Extract trade information
|
|
price = float(trade_data.get('p', 0))
|
|
quantity = float(trade_data.get('q', 0))
|
|
timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000)
|
|
is_buyer_maker = trade_data.get('m', False)
|
|
trade_id = trade_data.get('t', '')
|
|
|
|
# Calculate volume in USDT
|
|
volume_usdt = price * quantity
|
|
|
|
# Data validation
|
|
if not self._validate_tick_data(symbol, price, volume_usdt):
|
|
logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}")
|
|
return
|
|
|
|
# Process raw tick through aggregator
|
|
side = 'sell' if is_buyer_maker else 'buy'
|
|
raw_tick, completed_bar = self.tick_aggregator.process_tick(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
price=price,
|
|
volume=volume_usdt,
|
|
quantity=quantity,
|
|
side=side,
|
|
trade_id=str(trade_id)
|
|
)
|
|
|
|
# Update statistics
|
|
self.distribution_stats['total_ticks_received'] += 1
|
|
self.distribution_stats['ticks_per_symbol'][symbol] += 1
|
|
self.distribution_stats['last_tick_time'][symbol] = timestamp
|
|
self.last_prices[symbol] = price
|
|
|
|
if raw_tick:
|
|
self.distribution_stats['raw_ticks_processed'] += 1
|
|
|
|
# Notify raw tick callbacks
|
|
for callback in self.raw_tick_callbacks:
|
|
try:
|
|
callback(raw_tick)
|
|
except Exception as e:
|
|
logger.error(f"Error in raw tick callback: {e}")
|
|
|
|
if completed_bar:
|
|
self.distribution_stats['ohlcv_bars_created'] += 1
|
|
|
|
# Notify OHLCV bar callbacks
|
|
for callback in self.ohlcv_bar_callbacks:
|
|
try:
|
|
callback(completed_bar)
|
|
except Exception as e:
|
|
logger.error(f"Error in OHLCV bar callback: {e}")
|
|
|
|
# Create standardized tick for legacy compatibility
|
|
tick = MarketTick(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
price=price,
|
|
volume=volume_usdt,
|
|
quantity=quantity,
|
|
side=side,
|
|
trade_id=str(trade_id),
|
|
is_buyer_maker=is_buyer_maker,
|
|
raw_data=trade_data
|
|
)
|
|
|
|
# Add to buffer
|
|
self.tick_buffers[symbol].append(tick)
|
|
|
|
# Update current prices and candles
|
|
await self._process_tick(symbol, tick)
|
|
|
|
# Distribute to all subscribers
|
|
self._distribute_tick(tick)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing trade message for {symbol}: {e}")
|
|
|
|
async def _process_tick(self, symbol: str, tick: MarketTick):
|
|
"""Process a single tick and update candles"""
|
|
try:
|
|
# Update current price
|
|
with self.data_lock:
|
|
self.current_prices[symbol] = tick.price
|
|
|
|
# Initialize real-time data structure if needed
|
|
if symbol not in self.real_time_data:
|
|
self.real_time_data[symbol] = {}
|
|
for tf in self.timeframes:
|
|
self.real_time_data[symbol][tf] = deque(maxlen=1000)
|
|
|
|
# Create tick record for candle updates
|
|
tick_record = {
|
|
'timestamp': tick.timestamp,
|
|
'price': tick.price,
|
|
'volume': tick.volume
|
|
}
|
|
|
|
# Update all timeframes
|
|
for timeframe in self.timeframes:
|
|
self._update_candle(symbol, timeframe, tick_record)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tick for {symbol}: {e}")
|
|
|
|
def _update_candle(self, symbol: str, timeframe: str, tick: Dict):
|
|
"""Update candle for specific timeframe"""
|
|
try:
|
|
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
|
|
current_time = tick['timestamp']
|
|
|
|
# Calculate candle start time
|
|
candle_start = current_time.floor(f'{timeframe_secs}s')
|
|
|
|
# Get current candle queue
|
|
candle_queue = self.real_time_data[symbol][timeframe]
|
|
|
|
# Check if we need a new candle
|
|
if not candle_queue or candle_queue[-1]['timestamp'] != candle_start:
|
|
# Create new candle
|
|
new_candle = {
|
|
'timestamp': candle_start,
|
|
'open': tick['price'],
|
|
'high': tick['price'],
|
|
'low': tick['price'],
|
|
'close': tick['price'],
|
|
'volume': tick['volume']
|
|
}
|
|
candle_queue.append(new_candle)
|
|
else:
|
|
# Update existing candle
|
|
current_candle = candle_queue[-1]
|
|
current_candle['high'] = max(current_candle['high'], tick['price'])
|
|
current_candle['low'] = min(current_candle['low'], tick['price'])
|
|
current_candle['close'] = tick['price']
|
|
current_candle['volume'] += tick['volume']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating candle for {symbol} {timeframe}: {e}")
|
|
|
|
def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
|
|
"""Get the latest candles combining historical and real-time data"""
|
|
try:
|
|
# Get historical data
|
|
historical_df = self.get_historical_data(symbol, timeframe, limit=limit)
|
|
|
|
# Get real-time data
|
|
with self.data_lock:
|
|
if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]:
|
|
real_time_candles = list(self.real_time_data[symbol][timeframe])
|
|
|
|
if real_time_candles:
|
|
# Convert to DataFrame
|
|
rt_df = pd.DataFrame(real_time_candles)
|
|
|
|
if historical_df is not None:
|
|
# Combine historical and real-time
|
|
# Remove overlapping candles from historical data
|
|
if not rt_df.empty:
|
|
cutoff_time = rt_df['timestamp'].min()
|
|
historical_df = historical_df[historical_df['timestamp'] < cutoff_time]
|
|
|
|
# Concatenate
|
|
combined_df = pd.concat([historical_df, rt_df], ignore_index=True)
|
|
else:
|
|
combined_df = rt_df
|
|
|
|
return combined_df.tail(limit)
|
|
|
|
# Return just historical data if no real-time data
|
|
return historical_df.tail(limit) if historical_df is not None else pd.DataFrame()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_current_price(self, symbol: str) -> Optional[float]:
|
|
"""Get current price for a symbol from latest candle"""
|
|
try:
|
|
# Try to get from 1s candle first (most recent)
|
|
for tf in ['1s', '1m', '5m', '1h']:
|
|
df = self.get_latest_candles(symbol, tf, limit=1)
|
|
if df is not None and not df.empty:
|
|
return float(df.iloc[-1]['close'])
|
|
|
|
# Fallback to any available data
|
|
key = f"{symbol}_{self.timeframes[0]}"
|
|
if key in self.historical_data and not self.historical_data[key].empty:
|
|
return float(self.historical_data[key].iloc[-1]['close'])
|
|
|
|
logger.warning(f"No price data available for {symbol}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting current price for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]:
|
|
"""Get price at specific index for backtesting"""
|
|
try:
|
|
key = f"{symbol}_{timeframe}"
|
|
if key in self.historical_data:
|
|
df = self.historical_data[key]
|
|
if 0 <= index < len(df):
|
|
return float(df.iloc[index]['close'])
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting price at index {index}: {e}")
|
|
return None
|
|
|
|
def get_feature_matrix(self, symbol: str, timeframes: List[str] = None,
|
|
window_size: int = 20) -> Optional[np.ndarray]:
|
|
"""
|
|
Get comprehensive feature matrix for multiple timeframes with technical indicators
|
|
|
|
Returns:
|
|
np.ndarray: Shape (n_timeframes, window_size, n_features)
|
|
Each timeframe becomes a separate channel for CNN
|
|
"""
|
|
try:
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
feature_channels = []
|
|
common_feature_names = None
|
|
|
|
# First pass: determine common features across all timeframes
|
|
timeframe_features = {}
|
|
for tf in timeframes:
|
|
logger.debug(f"Processing timeframe {tf} for {symbol}")
|
|
df = self.get_latest_candles(symbol, tf, limit=window_size + 100)
|
|
|
|
if df is None or len(df) < window_size:
|
|
logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows")
|
|
continue
|
|
|
|
# Get feature columns
|
|
basic_cols = ['open', 'high', 'low', 'close', 'volume']
|
|
indicator_cols = [col for col in df.columns
|
|
if col not in basic_cols + ['timestamp'] and not col.startswith('unnamed')]
|
|
|
|
selected_features = self._select_cnn_features(df, basic_cols, indicator_cols)
|
|
timeframe_features[tf] = (df, selected_features)
|
|
|
|
if common_feature_names is None:
|
|
common_feature_names = set(selected_features)
|
|
else:
|
|
common_feature_names = common_feature_names.intersection(set(selected_features))
|
|
|
|
if not common_feature_names:
|
|
logger.error(f"No common features found across timeframes for {symbol}")
|
|
return None
|
|
|
|
# Convert to sorted list for consistent ordering
|
|
common_feature_names = sorted(list(common_feature_names))
|
|
logger.info(f"Using {len(common_feature_names)} common features: {common_feature_names}")
|
|
|
|
# Second pass: create feature channels with common features
|
|
for tf in timeframes:
|
|
if tf not in timeframe_features:
|
|
continue
|
|
|
|
df, _ = timeframe_features[tf]
|
|
|
|
# Use only common features
|
|
try:
|
|
tf_features = self._normalize_features(df[common_feature_names].tail(window_size))
|
|
|
|
if tf_features is not None and len(tf_features) == window_size:
|
|
feature_channels.append(tf_features.values)
|
|
logger.debug(f"Added {len(common_feature_names)} features for {tf}")
|
|
else:
|
|
logger.warning(f"Feature normalization failed for {tf}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing features for {tf}: {e}")
|
|
continue
|
|
|
|
if not feature_channels:
|
|
logger.error(f"No valid feature channels created for {symbol}")
|
|
return None
|
|
|
|
# Verify all channels have the same shape
|
|
shapes = [channel.shape for channel in feature_channels]
|
|
if len(set(shapes)) > 1:
|
|
logger.error(f"Shape mismatch in feature channels: {shapes}")
|
|
return None
|
|
|
|
# Stack all timeframe channels
|
|
feature_matrix = np.stack(feature_channels, axis=0)
|
|
|
|
logger.info(f"Created feature matrix for {symbol}: {feature_matrix.shape} "
|
|
f"({len(feature_channels)} timeframes, {window_size} steps, {len(common_feature_names)} features)")
|
|
|
|
return feature_matrix
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating feature matrix for {symbol}: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def _select_cnn_features(self, df: pd.DataFrame, basic_cols: List[str], indicator_cols: List[str]) -> List[str]:
|
|
"""Select the most important features for CNN training"""
|
|
try:
|
|
selected = []
|
|
|
|
# Always include basic OHLCV (normalized)
|
|
selected.extend(basic_cols)
|
|
|
|
# Priority indicators (most informative for CNNs)
|
|
priority_indicators = [
|
|
# Trend indicators
|
|
'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50',
|
|
'macd', 'macd_signal', 'macd_histogram',
|
|
'adx', 'adx_pos', 'adx_neg', 'psar',
|
|
|
|
# Momentum indicators
|
|
'rsi_14', 'rsi_7', 'rsi_21',
|
|
'stoch_k', 'stoch_d', 'williams_r', 'ultimate_osc',
|
|
|
|
# Volatility indicators
|
|
'bb_upper', 'bb_lower', 'bb_middle', 'bb_width', 'bb_percent',
|
|
'atr', 'keltner_upper', 'keltner_lower', 'keltner_middle',
|
|
|
|
# Volume indicators
|
|
'volume_sma_10', 'volume_sma_20', 'obv', 'vpt', 'mfi', 'ad_line', 'vwap',
|
|
|
|
# Price action
|
|
'price_position', 'true_range', 'roc',
|
|
|
|
# Custom composites
|
|
'trend_strength', 'momentum_composite', 'volatility_regime'
|
|
]
|
|
|
|
# Add available priority indicators
|
|
for indicator in priority_indicators:
|
|
if indicator in indicator_cols:
|
|
selected.append(indicator)
|
|
|
|
# Add any other technical indicators not in priority list (limit to avoid curse of dimensionality)
|
|
remaining_indicators = [col for col in indicator_cols if col not in selected]
|
|
if remaining_indicators:
|
|
# Limit to 10 additional indicators
|
|
selected.extend(remaining_indicators[:10])
|
|
|
|
# Verify all selected features exist in dataframe
|
|
final_selected = [col for col in selected if col in df.columns]
|
|
|
|
logger.debug(f"Selected {len(final_selected)} features from {len(df.columns)} available columns")
|
|
return final_selected
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error selecting CNN features: {e}")
|
|
return basic_cols # Fallback to basic OHLCV
|
|
|
|
def _normalize_features(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
|
|
"""Normalize features for CNN training"""
|
|
try:
|
|
df_norm = df.copy()
|
|
|
|
# Handle different normalization strategies for different feature types
|
|
for col in df_norm.columns:
|
|
if col in ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
|
|
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
|
|
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']:
|
|
# Price-based indicators: normalize by close price
|
|
if 'close' in df_norm.columns:
|
|
base_price = df_norm['close'].iloc[-1] # Use latest close as reference
|
|
if base_price > 0:
|
|
df_norm[col] = df_norm[col] / base_price
|
|
|
|
elif col == 'volume':
|
|
# Volume: normalize by its own rolling mean
|
|
volume_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
|
|
if volume_mean > 0:
|
|
df_norm[col] = df_norm[col] / volume_mean
|
|
|
|
elif col in ['rsi_14', 'rsi_7', 'rsi_21']:
|
|
# RSI: already 0-100, normalize to 0-1
|
|
df_norm[col] = df_norm[col] / 100.0
|
|
|
|
elif col in ['stoch_k', 'stoch_d']:
|
|
# Stochastic: already 0-100, normalize to 0-1
|
|
df_norm[col] = df_norm[col] / 100.0
|
|
|
|
elif col == 'williams_r':
|
|
# Williams %R: -100 to 0, normalize to 0-1
|
|
df_norm[col] = (df_norm[col] + 100) / 100.0
|
|
|
|
elif col in ['macd', 'macd_signal', 'macd_histogram']:
|
|
# MACD: normalize by ATR or close price
|
|
if 'atr' in df_norm.columns and df_norm['atr'].iloc[-1] > 0:
|
|
df_norm[col] = df_norm[col] / df_norm['atr'].iloc[-1]
|
|
elif 'close' in df_norm.columns:
|
|
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
|
|
|
|
elif col in ['bb_width', 'bb_percent', 'price_position', 'trend_strength',
|
|
'momentum_composite', 'volatility_regime']:
|
|
# Already normalized indicators: ensure 0-1 range
|
|
df_norm[col] = np.clip(df_norm[col], 0, 1)
|
|
|
|
elif col in ['atr', 'true_range']:
|
|
# Volatility indicators: normalize by close price
|
|
if 'close' in df_norm.columns:
|
|
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
|
|
|
|
else:
|
|
# Other indicators: z-score normalization
|
|
col_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
|
|
col_std = df_norm[col].rolling(window=min(20, len(df_norm))).std().iloc[-1]
|
|
if col_std > 0:
|
|
df_norm[col] = (df_norm[col] - col_mean) / col_std
|
|
else:
|
|
df_norm[col] = 0
|
|
|
|
# Replace inf/-inf with 0
|
|
df_norm = df_norm.replace([np.inf, -np.inf], 0)
|
|
|
|
# Fill any remaining NaN values
|
|
df_norm = df_norm.fillna(0)
|
|
|
|
return df_norm
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error normalizing features: {e}")
|
|
return df
|
|
|
|
def get_multi_symbol_feature_matrix(self, symbols: List[str] = None,
|
|
timeframes: List[str] = None,
|
|
window_size: int = 20) -> Optional[np.ndarray]:
|
|
"""
|
|
Get feature matrix for multiple symbols and timeframes
|
|
|
|
Returns:
|
|
np.ndarray: Shape (n_symbols, n_timeframes, window_size, n_features)
|
|
"""
|
|
try:
|
|
if symbols is None:
|
|
symbols = self.symbols
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
symbol_matrices = []
|
|
|
|
for symbol in symbols:
|
|
symbol_matrix = self.get_feature_matrix(symbol, timeframes, window_size)
|
|
if symbol_matrix is not None:
|
|
symbol_matrices.append(symbol_matrix)
|
|
else:
|
|
logger.warning(f"Could not create feature matrix for {symbol}")
|
|
|
|
if symbol_matrices:
|
|
# Stack all symbol matrices
|
|
multi_symbol_matrix = np.stack(symbol_matrices, axis=0)
|
|
logger.info(f"Created multi-symbol feature matrix: {multi_symbol_matrix.shape}")
|
|
return multi_symbol_matrix
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating multi-symbol feature matrix: {e}")
|
|
return None
|
|
|
|
def health_check(self) -> Dict[str, Any]:
|
|
"""Get health status of the data provider"""
|
|
status = {
|
|
'streaming': self.is_streaming,
|
|
'symbols': len(self.symbols),
|
|
'timeframes': len(self.timeframes),
|
|
'current_prices': len(self.current_prices),
|
|
'websocket_tasks': len(self.websocket_tasks),
|
|
'historical_data_loaded': {}
|
|
}
|
|
|
|
# Check historical data availability
|
|
for symbol in self.symbols:
|
|
status['historical_data_loaded'][symbol] = {}
|
|
for tf in self.timeframes:
|
|
has_data = (symbol in self.historical_data and
|
|
tf in self.historical_data[symbol] and
|
|
not self.historical_data[symbol][tf].empty)
|
|
status['historical_data_loaded'][symbol][tf] = has_data
|
|
|
|
return status
|
|
|
|
def subscribe_to_ticks(self, callback: Callable[[MarketTick], None],
|
|
symbols: List[str] = None,
|
|
subscriber_name: str = None) -> str:
|
|
"""Subscribe to real-time tick data updates"""
|
|
subscriber_id = str(uuid.uuid4())[:8]
|
|
subscriber_name = subscriber_name or f"subscriber_{subscriber_id}"
|
|
|
|
# Convert symbols to Binance format
|
|
if symbols:
|
|
binance_symbols = [s.replace('/', '').upper() for s in symbols]
|
|
else:
|
|
binance_symbols = [s.replace('/', '').upper() for s in self.symbols]
|
|
|
|
subscriber = DataSubscriber(
|
|
subscriber_id=subscriber_id,
|
|
callback=callback,
|
|
symbols=binance_symbols,
|
|
subscriber_name=subscriber_name
|
|
)
|
|
|
|
with self.subscriber_lock:
|
|
self.subscribers[subscriber_id] = subscriber
|
|
|
|
logger.info(f"New tick subscriber registered: {subscriber_name} ({subscriber_id}) for symbols: {binance_symbols}")
|
|
|
|
# Send recent tick data to new subscriber
|
|
self._send_recent_ticks_to_subscriber(subscriber)
|
|
|
|
return subscriber_id
|
|
|
|
def unsubscribe_from_ticks(self, subscriber_id: str):
|
|
"""Unsubscribe from tick data updates"""
|
|
with self.subscriber_lock:
|
|
if subscriber_id in self.subscribers:
|
|
subscriber_name = self.subscribers[subscriber_id].subscriber_name
|
|
self.subscribers[subscriber_id].active = False
|
|
del self.subscribers[subscriber_id]
|
|
logger.info(f"Subscriber {subscriber_name} ({subscriber_id}) unsubscribed")
|
|
|
|
def _send_recent_ticks_to_subscriber(self, subscriber: DataSubscriber):
|
|
"""Send recent tick data to a new subscriber"""
|
|
try:
|
|
for symbol in subscriber.symbols:
|
|
if symbol in self.tick_buffers:
|
|
# Send last 50 ticks to get subscriber up to speed
|
|
recent_ticks = list(self.tick_buffers[symbol])[-50:]
|
|
for tick in recent_ticks:
|
|
try:
|
|
subscriber.callback(tick)
|
|
except Exception as e:
|
|
logger.warning(f"Error sending recent tick to subscriber {subscriber.subscriber_id}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending recent ticks: {e}")
|
|
|
|
def _distribute_tick(self, tick: MarketTick):
|
|
"""Distribute tick to all relevant subscribers"""
|
|
distributed_count = 0
|
|
|
|
with self.subscriber_lock:
|
|
subscribers_to_remove = []
|
|
|
|
for subscriber_id, subscriber in self.subscribers.items():
|
|
if not subscriber.active:
|
|
subscribers_to_remove.append(subscriber_id)
|
|
continue
|
|
|
|
if tick.symbol in subscriber.symbols:
|
|
try:
|
|
# Call subscriber callback in a thread to avoid blocking
|
|
def call_callback():
|
|
try:
|
|
subscriber.callback(tick)
|
|
subscriber.tick_count += 1
|
|
subscriber.last_update = datetime.now()
|
|
except Exception as e:
|
|
logger.warning(f"Error in subscriber {subscriber_id} callback: {e}")
|
|
subscriber.active = False
|
|
|
|
# Use thread to avoid blocking the main data processing
|
|
Thread(target=call_callback, daemon=True).start()
|
|
distributed_count += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error distributing tick to subscriber {subscriber_id}: {e}")
|
|
subscriber.active = False
|
|
|
|
# Remove inactive subscribers
|
|
for subscriber_id in subscribers_to_remove:
|
|
if subscriber_id in self.subscribers:
|
|
del self.subscribers[subscriber_id]
|
|
|
|
self.distribution_stats['total_ticks_distributed'] += distributed_count
|
|
|
|
def _validate_tick_data(self, symbol: str, price: float, volume: float) -> bool:
|
|
"""Validate incoming tick data for quality"""
|
|
try:
|
|
# Basic validation
|
|
if price <= 0 or volume < 0:
|
|
return False
|
|
|
|
# Price change validation
|
|
last_price = self.last_prices.get(symbol, 0)
|
|
if last_price > 0:
|
|
price_change_pct = abs(price - last_price) / last_price
|
|
if price_change_pct > self.price_change_threshold:
|
|
logger.warning(f"Large price change for {symbol}: {price_change_pct:.2%}")
|
|
# Don't reject, just warn - could be legitimate
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating tick data: {e}")
|
|
return False
|
|
|
|
def get_recent_ticks(self, symbol: str, count: int = 100) -> List[MarketTick]:
|
|
"""Get recent ticks for a symbol"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
if binance_symbol in self.tick_buffers:
|
|
return list(self.tick_buffers[binance_symbol])[-count:]
|
|
return []
|
|
|
|
def subscribe_to_raw_ticks(self, callback: Callable[[RawTick], None]) -> str:
|
|
"""Subscribe to raw tick data with timing information"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.raw_tick_callbacks.append(callback)
|
|
logger.info(f"Raw tick subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def subscribe_to_ohlcv_bars(self, callback: Callable[[OHLCVBar], None]) -> str:
|
|
"""Subscribe to 1s OHLCV bars calculated from ticks"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.ohlcv_bar_callbacks.append(callback)
|
|
logger.info(f"OHLCV bar subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def get_raw_tick_features(self, symbol: str, window_size: int = 50) -> Optional[np.ndarray]:
|
|
"""Get raw tick features for model consumption"""
|
|
return self.tick_aggregator.get_tick_features_for_model(symbol, window_size)
|
|
|
|
def get_ohlcv_features(self, symbol: str, window_size: int = 60) -> Optional[np.ndarray]:
|
|
"""Get 1s OHLCV features for model consumption"""
|
|
return self.tick_aggregator.get_ohlcv_features_for_model(symbol, window_size)
|
|
|
|
def get_detected_patterns(self, symbol: str, count: int = 50) -> List:
|
|
"""Get recently detected tick patterns"""
|
|
return self.tick_aggregator.get_detected_patterns(symbol, count)
|
|
|
|
def get_tick_aggregator_stats(self) -> Dict[str, Any]:
|
|
"""Get tick aggregator statistics"""
|
|
return self.tick_aggregator.get_statistics()
|
|
|
|
def get_subscriber_stats(self) -> Dict[str, Any]:
|
|
"""Get subscriber and distribution statistics"""
|
|
with self.subscriber_lock:
|
|
active_subscribers = len([s for s in self.subscribers.values() if s.active])
|
|
subscriber_stats = {
|
|
sid: {
|
|
'name': s.subscriber_name,
|
|
'active': s.active,
|
|
'symbols': s.symbols,
|
|
'tick_count': s.tick_count,
|
|
'last_update': s.last_update.isoformat() if s.last_update else None
|
|
}
|
|
for sid, s in self.subscribers.items()
|
|
}
|
|
|
|
# Get tick aggregator stats
|
|
aggregator_stats = self.get_tick_aggregator_stats()
|
|
|
|
return {
|
|
'active_subscribers': active_subscribers,
|
|
'total_subscribers': len(self.subscribers),
|
|
'raw_tick_callbacks': len(self.raw_tick_callbacks),
|
|
'ohlcv_bar_callbacks': len(self.ohlcv_bar_callbacks),
|
|
'subscriber_details': subscriber_stats,
|
|
'distribution_stats': self.distribution_stats.copy(),
|
|
'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()},
|
|
'tick_aggregator': aggregator_stats
|
|
} |