3177 lines
142 KiB
Python
3177 lines
142 KiB
Python
"""
|
|
Multi-Timeframe, Multi-Symbol Data Provider
|
|
|
|
This module consolidates all data functionality including:
|
|
- Historical data fetching from Binance API
|
|
- Real-time data streaming via WebSocket
|
|
- Multi-timeframe candle generation
|
|
- Caching and data management
|
|
- Technical indicators calculation
|
|
- Williams Market Structure pivot points with monthly data analysis
|
|
- Pivot-based feature normalization for improved model training
|
|
- Centralized data distribution to multiple subscribers (AI models, dashboard, etc.)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
import websockets
|
|
import requests
|
|
import pandas as pd
|
|
import numpy as np
|
|
import pickle
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
import ta
|
|
from threading import Thread, Lock
|
|
from collections import deque
|
|
|
|
from .config import get_config
|
|
from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
|
|
from .cnn_monitor import log_cnn_prediction
|
|
from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class PivotBounds:
|
|
"""Pivot-based normalization bounds derived from Williams Market Structure"""
|
|
symbol: str
|
|
price_max: float
|
|
price_min: float
|
|
volume_max: float
|
|
volume_min: float
|
|
pivot_support_levels: List[float]
|
|
pivot_resistance_levels: List[float]
|
|
pivot_context: Dict[str, Any]
|
|
created_timestamp: datetime
|
|
data_period_start: datetime
|
|
data_period_end: datetime
|
|
total_candles_analyzed: int
|
|
|
|
def get_price_range(self) -> float:
|
|
"""Get price range for normalization"""
|
|
return self.price_max - self.price_min
|
|
|
|
def normalize_price(self, price: float) -> float:
|
|
"""Normalize price using pivot bounds"""
|
|
return (price - self.price_min) / self.get_price_range()
|
|
|
|
def get_nearest_support_distance(self, current_price: float) -> float:
|
|
"""Get distance to nearest support level (normalized)"""
|
|
if not self.pivot_support_levels:
|
|
return 0.5
|
|
distances = [abs(current_price - s) for s in self.pivot_support_levels]
|
|
return min(distances) / self.get_price_range()
|
|
|
|
def get_nearest_resistance_distance(self, current_price: float) -> float:
|
|
"""Get distance to nearest resistance level (normalized)"""
|
|
if not self.pivot_resistance_levels:
|
|
return 0.5
|
|
distances = [abs(current_price - r) for r in self.pivot_resistance_levels]
|
|
return min(distances) / self.get_price_range()
|
|
|
|
@dataclass
|
|
class MarketTick:
|
|
"""Standardized market tick data structure"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
price: float
|
|
volume: float
|
|
quantity: float
|
|
side: str # 'buy' or 'sell'
|
|
trade_id: str
|
|
is_buyer_maker: bool
|
|
raw_data: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class DataSubscriber:
|
|
"""Data subscriber information"""
|
|
subscriber_id: str
|
|
callback: Callable[[MarketTick], None]
|
|
symbols: List[str]
|
|
active: bool = True
|
|
last_update: datetime = field(default_factory=datetime.now)
|
|
tick_count: int = 0
|
|
subscriber_name: str = "unknown"
|
|
|
|
class DataProvider:
|
|
"""Unified data provider for historical and real-time market data with centralized distribution"""
|
|
|
|
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
|
|
"""Initialize the data provider"""
|
|
self.config = get_config()
|
|
self.symbols = symbols or self.config.symbols
|
|
self.timeframes = timeframes or self.config.timeframes
|
|
|
|
# Cache settings (initialize first)
|
|
self.cache_enabled = self.config.data.get('cache_enabled', True)
|
|
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Data storage
|
|
self.historical_data = {} # {symbol: {timeframe: DataFrame}}
|
|
self.real_time_data = {} # {symbol: {timeframe: deque}}
|
|
self.current_prices = {} # {symbol: float}
|
|
|
|
# Pivot-based normalization system
|
|
self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds}
|
|
self.pivot_cache_dir = self.cache_dir / 'pivot_bounds'
|
|
self.pivot_cache_dir.mkdir(parents=True, exist_ok=True)
|
|
self.pivot_refresh_interval = timedelta(days=1) # Refresh pivot bounds daily
|
|
self.monthly_data_cache_dir = self.cache_dir / 'monthly_1s_data'
|
|
self.monthly_data_cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Real-time processing
|
|
self.websocket_tasks = {}
|
|
self.is_streaming = False
|
|
self.data_lock = Lock()
|
|
|
|
# Subscriber management for centralized data distribution
|
|
self.subscribers: Dict[str, DataSubscriber] = {}
|
|
self.subscriber_lock = Lock()
|
|
self.tick_buffers: Dict[str, deque] = {}
|
|
self.buffer_size = 1000 # Keep last 1000 ticks per symbol
|
|
|
|
# Initialize tick buffers
|
|
for symbol in self.symbols:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
|
|
|
|
# BOM (Book of Market) data caching - 1s resolution for last 5 minutes
|
|
self.bom_cache_duration = 300 # 5 minutes in seconds
|
|
self.bom_feature_count = 120 # Number of BOM features per timestamp
|
|
self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)}
|
|
|
|
# Initialize BOM cache for each symbol
|
|
for symbol in self.symbols:
|
|
# Store 300 seconds worth of 1s BOM data
|
|
self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration)
|
|
|
|
# Initialize tick aggregator for raw tick processing
|
|
binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols]
|
|
self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols)
|
|
|
|
# Raw tick and OHLCV bar callbacks
|
|
self.raw_tick_callbacks = []
|
|
self.ohlcv_bar_callbacks = []
|
|
|
|
# Performance tracking for subscribers
|
|
self.distribution_stats = {
|
|
'total_ticks_received': 0,
|
|
'total_ticks_distributed': 0,
|
|
'distribution_errors': 0,
|
|
'last_tick_time': {},
|
|
'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols},
|
|
'raw_ticks_processed': 0,
|
|
'ohlcv_bars_created': 0,
|
|
'patterns_detected': 0
|
|
}
|
|
|
|
# Data validation
|
|
self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols}
|
|
self.price_change_threshold = 0.1 # 10% price change threshold for validation
|
|
|
|
# Timeframe conversion
|
|
self.timeframe_seconds = {
|
|
'1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800,
|
|
'1h': 3600, '4h': 14400, '1d': 86400
|
|
}
|
|
|
|
# Williams Market Structure integration
|
|
self.williams_structure: Dict[str, WilliamsMarketStructure] = {}
|
|
for symbol in self.symbols:
|
|
self.williams_structure[symbol] = WilliamsMarketStructure(min_pivot_distance=3)
|
|
|
|
# Pivot point caching
|
|
self.pivot_points_cache: Dict[str, Dict[int, TrendLevel]] = {} # {symbol: {level: TrendLevel}}
|
|
self.last_pivot_calculation: Dict[str, datetime] = {}
|
|
self.pivot_calculation_interval = timedelta(minutes=5) # Recalculate every 5 minutes
|
|
|
|
# Load existing pivot bounds from cache
|
|
self._load_all_pivot_bounds()
|
|
|
|
# Centralized data collection for models and dashboard
|
|
self.cob_data_cache: Dict[str, deque] = {} # COB data for models
|
|
self.training_data_cache: Dict[str, deque] = {} # Training data for models
|
|
self.model_data_subscribers: Dict[str, List[Callable]] = {} # Model-specific data callbacks
|
|
|
|
# Callbacks for data distribution
|
|
self.cob_data_callbacks: List[Callable] = [] # COB data callbacks
|
|
self.training_data_callbacks: List[Callable] = [] # Training data callbacks
|
|
self.model_prediction_callbacks: List[Callable] = [] # Model prediction callbacks
|
|
|
|
# Initialize data caches
|
|
for symbol in self.symbols:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data
|
|
self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer
|
|
|
|
# Data collection threads
|
|
self.data_collection_active = False
|
|
|
|
# COB data collection
|
|
self.cob_collection_active = False
|
|
self.cob_collection_thread = None
|
|
|
|
# Training data collection
|
|
self.training_data_collection_active = False
|
|
self.training_data_thread = None
|
|
|
|
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
|
logger.info(f"Timeframes: {self.timeframes}")
|
|
logger.info("Centralized data distribution enabled")
|
|
logger.info("Pivot-based normalization system enabled")
|
|
logger.info("Williams Market Structure integration enabled")
|
|
logger.info("COB and training data collection enabled")
|
|
|
|
# Rate limiting
|
|
self.last_request_time = {}
|
|
self.request_interval = 0.2 # 200ms between requests
|
|
self.retry_delay = 60 # 1 minute retry delay for 451 errors
|
|
self.max_retries = 3
|
|
|
|
def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Ensure dataframe has proper datetime index"""
|
|
if df is None or df.empty:
|
|
return df
|
|
|
|
try:
|
|
# If we already have a proper DatetimeIndex, return as is
|
|
if isinstance(df.index, pd.DatetimeIndex):
|
|
return df
|
|
|
|
# If timestamp column exists, use it as index
|
|
if 'timestamp' in df.columns:
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
df.set_index('timestamp', inplace=True)
|
|
return df
|
|
|
|
# If we have a RangeIndex or other non-datetime index, create datetime index
|
|
if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex):
|
|
# Use current time and work backwards for realistic timestamps
|
|
from datetime import datetime, timedelta
|
|
end_time = datetime.now()
|
|
start_time = end_time - timedelta(minutes=len(df))
|
|
df.index = pd.date_range(start=start_time, end=end_time, periods=len(df))
|
|
logger.debug(f"Converted RangeIndex to DatetimeIndex for {len(df)} records")
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error ensuring datetime index: {e}")
|
|
return df
|
|
|
|
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
|
|
"""Get historical OHLCV data for a symbol and timeframe"""
|
|
try:
|
|
# If refresh=True, always fetch fresh data (skip cache for real-time updates)
|
|
if not refresh:
|
|
if self.cache_enabled:
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and len(cached_data) >= limit * 0.8:
|
|
# Ensure proper datetime index for cached data
|
|
cached_data = self._ensure_datetime_index(cached_data)
|
|
# logger.info(f"Using cached data for {symbol} {timeframe}")
|
|
return cached_data.tail(limit)
|
|
|
|
# Check if we need to preload 300s of data for first load
|
|
should_preload = self._should_preload_data(symbol, timeframe, limit)
|
|
|
|
if should_preload:
|
|
logger.info(f"Preloading 300s of data for {symbol} {timeframe}")
|
|
df = self._preload_300s_data(symbol, timeframe)
|
|
else:
|
|
# Fetch from API with requested limit (Binance primary, MEXC fallback)
|
|
logger.info(f"Fetching historical data for {symbol} {timeframe}")
|
|
df = self._fetch_from_binance(symbol, timeframe, limit)
|
|
|
|
# Fallback to MEXC if Binance fails
|
|
if df is None or df.empty:
|
|
logger.info(f"Binance failed, trying MEXC fallback for {symbol}")
|
|
df = self._fetch_from_mexc(symbol, timeframe, limit)
|
|
|
|
if df is not None and not df.empty:
|
|
# Ensure proper datetime index
|
|
df = self._ensure_datetime_index(df)
|
|
|
|
# Add technical indicators. temporarily disabled to save time as it is not working as expected.
|
|
# df = self._add_technical_indicators(df)
|
|
|
|
# Cache the data
|
|
if self.cache_enabled:
|
|
self._save_to_cache(df, symbol, timeframe)
|
|
|
|
# Store in memory
|
|
if symbol not in self.historical_data:
|
|
self.historical_data[symbol] = {}
|
|
self.historical_data[symbol][timeframe] = df
|
|
|
|
# Return requested amount
|
|
return df.tail(limit)
|
|
|
|
logger.warning(f"No data received for {symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool:
|
|
"""Determine if we should preload 300s of data"""
|
|
try:
|
|
# Check if we have any cached data
|
|
if self.cache_enabled:
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and len(cached_data) > 0:
|
|
return False # Already have some data
|
|
|
|
# Check if we have data in memory
|
|
if (symbol in self.historical_data and
|
|
timeframe in self.historical_data[symbol] and
|
|
len(self.historical_data[symbol][timeframe]) > 0):
|
|
return False # Already have data in memory
|
|
|
|
# Calculate if 300s worth of data would be more than requested limit
|
|
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
|
candles_in_300s = 300 // timeframe_seconds
|
|
|
|
# Preload if we need more than the requested limit or if it's a short timeframe
|
|
if candles_in_300s > limit or timeframe in ['1s', '1m']:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error determining if should preload data: {e}")
|
|
return False
|
|
|
|
def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
|
"""Preload 300 seconds worth of data for better initial performance"""
|
|
try:
|
|
# Calculate how many candles we need for 300 seconds
|
|
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
|
candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles
|
|
|
|
# For very short timeframes, limit to reasonable amount
|
|
if timeframe == '1s':
|
|
candles_needed = min(candles_needed, 300) # Max 300 1s candles
|
|
elif timeframe == '1m':
|
|
candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour)
|
|
else:
|
|
candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes
|
|
|
|
logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)")
|
|
|
|
# Fetch the data (Binance primary, MEXC fallback)
|
|
df = self._fetch_from_binance(symbol, timeframe, candles_needed)
|
|
|
|
# Fallback to MEXC if Binance fails
|
|
if df is None or df.empty:
|
|
logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}")
|
|
df = self._fetch_from_mexc(symbol, timeframe, candles_needed)
|
|
|
|
if df is not None and not df.empty:
|
|
logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
else:
|
|
logger.warning(f"Failed to preload data for {symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]:
|
|
"""Preload 300s of data for all symbols and timeframes"""
|
|
try:
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
preload_results = {}
|
|
|
|
for symbol in self.symbols:
|
|
preload_results[symbol] = {}
|
|
|
|
for timeframe in timeframes:
|
|
try:
|
|
logger.info(f"Preloading data for {symbol} {timeframe}")
|
|
|
|
# Check if we should preload
|
|
if self._should_preload_data(symbol, timeframe, 100):
|
|
df = self._preload_300s_data(symbol, timeframe)
|
|
|
|
if df is not None and not df.empty:
|
|
# Add technical indicators
|
|
df = self._add_technical_indicators(df)
|
|
|
|
# Cache the data
|
|
if self.cache_enabled:
|
|
self._save_to_cache(df, symbol, timeframe)
|
|
|
|
# Store in memory
|
|
if symbol not in self.historical_data:
|
|
self.historical_data[symbol] = {}
|
|
self.historical_data[symbol][timeframe] = df
|
|
|
|
preload_results[symbol][timeframe] = True
|
|
logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}")
|
|
else:
|
|
preload_results[symbol][timeframe] = False
|
|
logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}")
|
|
else:
|
|
preload_results[symbol][timeframe] = True # Already have data
|
|
logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preloading {symbol} {timeframe}: {e}")
|
|
preload_results[symbol][timeframe] = False
|
|
|
|
# Log summary
|
|
total_pairs = len(self.symbols) * len(timeframes)
|
|
successful_pairs = sum(1 for symbol_results in preload_results.values()
|
|
for success in symbol_results.values() if success)
|
|
|
|
logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded")
|
|
|
|
return preload_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in preload_all_symbols_data: {e}")
|
|
return {}
|
|
|
|
def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Fetch data from MEXC API (fallback data source when Binance is unavailable)"""
|
|
try:
|
|
# MEXC doesn't support 1s intervals
|
|
if timeframe == '1s':
|
|
logger.warning(f"MEXC doesn't support 1s intervals, skipping {symbol}")
|
|
return None
|
|
|
|
# Convert symbol format
|
|
mexc_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert timeframe for MEXC (excluding 1s)
|
|
timeframe_map = {
|
|
'1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
|
|
'1h': '1h', '4h': '4h', '1d': '1d'
|
|
}
|
|
mexc_timeframe = timeframe_map.get(timeframe)
|
|
|
|
if mexc_timeframe is None:
|
|
logger.warning(f"MEXC doesn't support timeframe {timeframe}, skipping {symbol}")
|
|
return None
|
|
|
|
# MEXC API request
|
|
url = "https://api.mexc.com/api/v3/klines"
|
|
params = {
|
|
'symbol': mexc_symbol,
|
|
'interval': mexc_timeframe,
|
|
'limit': limit
|
|
}
|
|
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Convert to DataFrame (MEXC uses 8 columns vs Binance's 12)
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"MEXC: Fetched {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"MEXC: Error fetching data: {e}")
|
|
return None
|
|
|
|
def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Fetch data from Binance API (primary data source) with HTTP 451 error handling"""
|
|
try:
|
|
# Convert symbol format
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert timeframe (now includes 1s support)
|
|
timeframe_map = {
|
|
'1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
|
|
'1h': '1h', '4h': '4h', '1d': '1d'
|
|
}
|
|
binance_timeframe = timeframe_map.get(timeframe, '1h')
|
|
|
|
# API request with timeout and better headers
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'interval': binance_timeframe,
|
|
'limit': limit
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'application/json',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
|
|
response = requests.get(url, params=params, headers=headers, timeout=10)
|
|
|
|
# Handle HTTP 451 (Unavailable For Legal Reasons) specifically
|
|
if response.status_code == 451:
|
|
logger.warning(f"Binance API returned 451 (blocked) for {symbol} {timeframe} - using fallback")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
|
|
'taker_buy_quote', 'ignore'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"Binance: Fetched {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
if "451" in str(e) or "Client Error" in str(e):
|
|
logger.warning(f"Binance API access blocked (451) for {symbol} {timeframe} - using fallback")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
else:
|
|
logger.error(f"Error fetching from Binance API: {e}")
|
|
return self._get_fallback_data(symbol, timeframe, limit)
|
|
|
|
def _get_fallback_data(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
|
"""Get fallback data when Binance API is unavailable - REAL DATA ONLY"""
|
|
try:
|
|
logger.info(f"FALLBACK: Attempting to get real cached data for {symbol} {timeframe}")
|
|
|
|
# ONLY try cached data
|
|
cached_data = self._load_from_cache(symbol, timeframe)
|
|
if cached_data is not None and not cached_data.empty:
|
|
# Limit to requested amount
|
|
limited_data = cached_data.tail(limit) if len(cached_data) > limit else cached_data
|
|
logger.info(f"FALLBACK: Using cached real data for {symbol} {timeframe}: {len(limited_data)} bars")
|
|
return limited_data
|
|
|
|
# Try MEXC as secondary real data source
|
|
mexc_data = self._fetch_from_mexc(symbol, timeframe, limit)
|
|
if mexc_data is not None and not mexc_data.empty:
|
|
logger.info(f"FALLBACK: Using MEXC real data for {symbol} {timeframe}: {len(mexc_data)} bars")
|
|
return mexc_data
|
|
|
|
# NO SYNTHETIC DATA - Return None if no real data available
|
|
logger.warning(f"FALLBACK: No real data available for {symbol} {timeframe} - waiting for real data")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting fallback data: {e}")
|
|
return None
|
|
|
|
def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add comprehensive technical indicators AND pivot-based normalization context"""
|
|
try:
|
|
df = df.copy()
|
|
|
|
# Ensure we have enough data for indicators
|
|
if len(df) < 50:
|
|
logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows")
|
|
return self._add_basic_indicators(df)
|
|
|
|
# === EXISTING TECHNICAL INDICATORS ===
|
|
# Moving averages (multiple timeframes)
|
|
df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10)
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50)
|
|
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
|
|
df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26)
|
|
df['ema_50'] = ta.trend.ema_indicator(df['close'], window=50)
|
|
|
|
# MACD family
|
|
macd = ta.trend.MACD(df['close'])
|
|
df['macd'] = macd.macd()
|
|
df['macd_signal'] = macd.macd_signal()
|
|
df['macd_histogram'] = macd.macd_diff()
|
|
|
|
# ADX (Average Directional Index)
|
|
adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close'])
|
|
df['adx'] = adx.adx()
|
|
df['adx_pos'] = adx.adx_pos()
|
|
df['adx_neg'] = adx.adx_neg()
|
|
|
|
# Parabolic SAR
|
|
psar = ta.trend.PSARIndicator(df['high'], df['low'], df['close'])
|
|
df['psar'] = psar.psar()
|
|
|
|
# === MOMENTUM INDICATORS ===
|
|
# RSI (multiple periods)
|
|
df['rsi_14'] = ta.momentum.rsi(df['close'], window=14)
|
|
df['rsi_7'] = ta.momentum.rsi(df['close'], window=7)
|
|
df['rsi_21'] = ta.momentum.rsi(df['close'], window=21)
|
|
|
|
# Stochastic Oscillator
|
|
stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close'])
|
|
df['stoch_k'] = stoch.stoch()
|
|
df['stoch_d'] = stoch.stoch_signal()
|
|
|
|
# Williams %R
|
|
df['williams_r'] = ta.momentum.williams_r(df['high'], df['low'], df['close'])
|
|
|
|
# Ultimate Oscillator (instead of CCI which isn't available)
|
|
df['ultimate_osc'] = ta.momentum.ultimate_oscillator(df['high'], df['low'], df['close'])
|
|
|
|
# === VOLATILITY INDICATORS ===
|
|
# Bollinger Bands
|
|
bollinger = ta.volatility.BollingerBands(df['close'])
|
|
df['bb_upper'] = bollinger.bollinger_hband()
|
|
df['bb_lower'] = bollinger.bollinger_lband()
|
|
df['bb_middle'] = bollinger.bollinger_mavg()
|
|
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
|
|
df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
|
|
|
|
# Average True Range
|
|
df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'])
|
|
|
|
# Keltner Channels
|
|
keltner = ta.volatility.KeltnerChannel(df['high'], df['low'], df['close'])
|
|
df['keltner_upper'] = keltner.keltner_channel_hband()
|
|
df['keltner_lower'] = keltner.keltner_channel_lband()
|
|
df['keltner_middle'] = keltner.keltner_channel_mband()
|
|
|
|
# === VOLUME INDICATORS ===
|
|
# Volume moving averages
|
|
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
|
|
df['volume_sma_20'] = df['volume'].rolling(window=20).mean()
|
|
df['volume_sma_50'] = df['volume'].rolling(window=50).mean()
|
|
|
|
# On Balance Volume
|
|
df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume'])
|
|
|
|
# Volume Price Trend
|
|
df['vpt'] = ta.volume.volume_price_trend(df['close'], df['volume'])
|
|
|
|
# Money Flow Index
|
|
df['mfi'] = ta.volume.money_flow_index(df['high'], df['low'], df['close'], df['volume'])
|
|
|
|
# Accumulation/Distribution Line
|
|
df['ad_line'] = ta.volume.acc_dist_index(df['high'], df['low'], df['close'], df['volume'])
|
|
|
|
# Volume Weighted Average Price (VWAP)
|
|
df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
|
|
|
|
# === PRICE ACTION INDICATORS ===
|
|
# Price position relative to range
|
|
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
|
|
|
|
# True Range (use ATR calculation for true range)
|
|
df['true_range'] = df['atr'] # ATR is based on true range, so use it directly
|
|
|
|
# Rate of Change
|
|
df['roc'] = ta.momentum.roc(df['close'], window=10)
|
|
|
|
# === CUSTOM INDICATORS ===
|
|
# Trend strength (combination of multiple trend indicators)
|
|
df['trend_strength'] = (
|
|
(df['close'] > df['sma_20']).astype(int) +
|
|
(df['sma_10'] > df['sma_20']).astype(int) +
|
|
(df['macd'] > df['macd_signal']).astype(int) +
|
|
(df['adx'] > 25).astype(int)
|
|
) / 4.0
|
|
|
|
# Momentum composite
|
|
df['momentum_composite'] = (
|
|
(df['rsi_14'] / 100) +
|
|
((df['stoch_k'] + 50) / 100) + # Normalize stoch_k
|
|
((df['williams_r'] + 50) / 100) # Normalize williams_r
|
|
) / 3.0
|
|
|
|
# Volatility regime
|
|
df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True)
|
|
|
|
# === WILLIAMS MARKET STRUCTURE PIVOT CONTEXT ===
|
|
# Check if we need to refresh pivot bounds for this symbol
|
|
symbol = self._extract_symbol_from_dataframe(df)
|
|
if symbol and self._should_refresh_pivot_bounds(symbol):
|
|
logger.info(f"Refreshing pivot bounds for {symbol}")
|
|
self._refresh_pivot_bounds_for_symbol(symbol)
|
|
|
|
# Add pivot-based context features
|
|
if symbol and symbol in self.pivot_bounds:
|
|
df = self._add_pivot_context_features(df, symbol)
|
|
|
|
# === FILL NaN VALUES ===
|
|
# Forward fill first, then backward fill, then zero fill
|
|
df = df.ffill().bfill().fillna(0)
|
|
|
|
logger.debug(f"Added technical indicators + pivot context for {len(df)} rows")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding comprehensive technical indicators: {e}")
|
|
# Fallback to basic indicators
|
|
return self._add_basic_indicators(df)
|
|
|
|
# === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM ===
|
|
|
|
def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]:
|
|
"""Collect 30 days of 1m candles with smart gap-filling cache system"""
|
|
try:
|
|
# Check for cached data and determine what we need to fetch
|
|
cached_data = self._load_monthly_data_from_cache(symbol)
|
|
|
|
end_time = datetime.now()
|
|
start_time = end_time - timedelta(days=30)
|
|
|
|
if cached_data is not None and not cached_data.empty:
|
|
logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles")
|
|
|
|
# Check cache data range
|
|
cache_start = cached_data['timestamp'].min()
|
|
cache_end = cached_data['timestamp'].max()
|
|
|
|
logger.info(f"Cache range: {cache_start} to {cache_end}")
|
|
|
|
# Remove data older than 30 days
|
|
cached_data = cached_data[cached_data['timestamp'] >= start_time]
|
|
|
|
# Check if we need to fill gaps
|
|
gap_start = cache_end + timedelta(minutes=1)
|
|
|
|
if gap_start < end_time:
|
|
# Need to fill gap from cache_end to now
|
|
logger.info(f"Filling gap from {gap_start} to {end_time}")
|
|
gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time)
|
|
|
|
if gap_data is not None and not gap_data.empty:
|
|
# Combine cached data with gap data
|
|
monthly_df = pd.concat([cached_data, gap_data], ignore_index=True)
|
|
monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
|
|
logger.info(f"Combined cache + gap: {len(monthly_df)} total candles")
|
|
else:
|
|
monthly_df = cached_data
|
|
logger.info(f"Using cached data only: {len(monthly_df)} candles")
|
|
else:
|
|
monthly_df = cached_data
|
|
logger.info(f"Cache is up to date: {len(monthly_df)} candles")
|
|
else:
|
|
# No cache - fetch full 30 days
|
|
logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}")
|
|
monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time)
|
|
|
|
if monthly_df is not None and not monthly_df.empty:
|
|
# Final cleanup: ensure exactly 30 days
|
|
monthly_df = monthly_df[monthly_df['timestamp'] >= start_time]
|
|
monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}")
|
|
|
|
# Update cache
|
|
self._save_monthly_data_to_cache(symbol, monthly_df)
|
|
|
|
return monthly_df
|
|
else:
|
|
logger.error(f"No monthly 1m data collected for {symbol}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting monthly 1m data for {symbol}: {e}")
|
|
return None
|
|
|
|
def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]:
|
|
"""Fetch a batch of 1s candles ending at specific time"""
|
|
try:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Convert end_time to milliseconds
|
|
end_ms = int(end_time.timestamp() * 1000)
|
|
|
|
# API request
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'interval': '1s',
|
|
'endTime': end_ms,
|
|
'limit': limit
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
response = requests.get(url, params=params, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if not data:
|
|
return None
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
|
|
'taker_buy_quote', 'ignore'
|
|
])
|
|
|
|
# Process columns
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching 1s batch for {symbol}: {e}")
|
|
return None
|
|
|
|
def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]:
|
|
"""Fetch 1m candles for a specific time range with efficient batching"""
|
|
try:
|
|
# Convert symbol format for Binance API
|
|
if '/' in symbol:
|
|
api_symbol = symbol.replace('/', '')
|
|
else:
|
|
api_symbol = symbol
|
|
|
|
logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}")
|
|
|
|
all_candles = []
|
|
current_start = start_time
|
|
batch_size = 1000 # Binance limit
|
|
api_calls_made = 0
|
|
|
|
while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days
|
|
try:
|
|
# Calculate end time for this batch
|
|
batch_end = min(current_start + timedelta(minutes=batch_size), end_time)
|
|
|
|
# Convert to milliseconds
|
|
start_timestamp = int(current_start.timestamp() * 1000)
|
|
end_timestamp = int(batch_end.timestamp() * 1000)
|
|
|
|
# Binance API call
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
params = {
|
|
'symbol': api_symbol,
|
|
'interval': '1m',
|
|
'startTime': start_timestamp,
|
|
'endTime': end_timestamp,
|
|
'limit': batch_size
|
|
}
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
response = requests.get(url, params=params, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
api_calls_made += 1
|
|
|
|
if not data:
|
|
logger.warning(f"No data returned for batch {current_start} to {batch_end}")
|
|
break
|
|
|
|
# Convert to DataFrame
|
|
batch_df = pd.DataFrame(data, columns=[
|
|
'timestamp', 'open', 'high', 'low', 'close', 'volume',
|
|
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
|
|
'taker_buy_quote', 'ignore'
|
|
])
|
|
|
|
# Process columns
|
|
batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms')
|
|
for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
batch_df[col] = batch_df[col].astype(float)
|
|
|
|
# Keep only OHLCV columns
|
|
batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
|
|
|
|
all_candles.append(batch_df)
|
|
|
|
# Move to next batch (add 1 minute to avoid overlap)
|
|
current_start = batch_end + timedelta(minutes=1)
|
|
|
|
# Rate limiting (Binance allows 1200/min)
|
|
time.sleep(0.05) # 50ms delay
|
|
|
|
# Progress logging
|
|
if api_calls_made % 10 == 0:
|
|
total_candles = sum(len(df) for df in all_candles)
|
|
logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in batch {current_start} to {batch_end}: {e}")
|
|
current_start += timedelta(minutes=batch_size)
|
|
time.sleep(1) # Wait longer on error
|
|
continue
|
|
|
|
if not all_candles:
|
|
logger.error(f"No data collected for {symbol}")
|
|
return None
|
|
|
|
# Combine all batches
|
|
df = pd.concat(all_candles, ignore_index=True)
|
|
df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
|
|
|
|
logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching 1m data range for {symbol}: {e}")
|
|
return None
|
|
|
|
def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]:
|
|
"""Extract pivot bounds using Williams Market Structure analysis"""
|
|
try:
|
|
logger.info(f"Analyzing {len(monthly_data)} candles for pivot extraction...")
|
|
|
|
# Convert DataFrame to numpy array format expected by Williams Market Structure
|
|
ohlcv_array = monthly_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy()
|
|
|
|
# Convert timestamp to numeric for Williams analysis
|
|
ohlcv_array['timestamp'] = ohlcv_array['timestamp'].astype(np.int64) // 10**9 # Convert to seconds
|
|
ohlcv_array = ohlcv_array.to_numpy()
|
|
|
|
# Initialize Williams Market Structure analyzer
|
|
try:
|
|
from training.williams_market_structure import WilliamsMarketStructure
|
|
|
|
williams = WilliamsMarketStructure(
|
|
swing_strengths=[2, 3, 5, 8], # Multi-strength pivot detection
|
|
enable_cnn_feature=False # We just want pivot data, not CNN training
|
|
)
|
|
|
|
# Calculate 5 levels of recursive pivot points
|
|
logger.info("Running Williams Market Structure analysis...")
|
|
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
|
|
|
|
except ImportError:
|
|
logger.warning("Williams Market Structure not available, using simplified pivot detection")
|
|
pivot_levels = self._simple_pivot_detection(monthly_data)
|
|
|
|
# Extract bounds from pivot analysis
|
|
bounds = self._extract_bounds_from_pivot_levels(symbol, monthly_data, pivot_levels)
|
|
|
|
return bounds
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting pivot bounds for {symbol}: {e}")
|
|
return None
|
|
|
|
def _extract_bounds_from_pivot_levels(self, symbol: str, monthly_data: pd.DataFrame,
|
|
pivot_levels: Dict[str, Any]) -> PivotBounds:
|
|
"""Extract normalization bounds from Williams pivot levels"""
|
|
try:
|
|
# Initialize bounds
|
|
price_max = monthly_data['high'].max()
|
|
price_min = monthly_data['low'].min()
|
|
volume_max = monthly_data['volume'].max()
|
|
volume_min = monthly_data['volume'].min()
|
|
|
|
support_levels = []
|
|
resistance_levels = []
|
|
|
|
# Extract pivot points from all Williams levels
|
|
for level_key, level_data in pivot_levels.items():
|
|
if level_data and hasattr(level_data, 'swing_points') and level_data.swing_points:
|
|
# Get prices from swing points
|
|
level_prices = [sp.price for sp in level_data.swing_points]
|
|
|
|
# Update overall price bounds
|
|
price_max = max(price_max, max(level_prices))
|
|
price_min = min(price_min, min(level_prices))
|
|
|
|
# Extract support and resistance levels
|
|
if hasattr(level_data, 'support_levels') and level_data.support_levels:
|
|
support_levels.extend(level_data.support_levels)
|
|
|
|
if hasattr(level_data, 'resistance_levels') and level_data.resistance_levels:
|
|
resistance_levels.extend(level_data.resistance_levels)
|
|
|
|
# Remove duplicates and sort
|
|
support_levels = sorted(list(set(support_levels)))
|
|
resistance_levels = sorted(list(set(resistance_levels)))
|
|
|
|
# Create PivotBounds object
|
|
bounds = PivotBounds(
|
|
symbol=symbol,
|
|
price_max=float(price_max),
|
|
price_min=float(price_min),
|
|
volume_max=float(volume_max),
|
|
volume_min=float(volume_min),
|
|
pivot_support_levels=support_levels,
|
|
pivot_resistance_levels=resistance_levels,
|
|
pivot_context=pivot_levels,
|
|
created_timestamp=datetime.now(),
|
|
data_period_start=monthly_data['timestamp'].min(),
|
|
data_period_end=monthly_data['timestamp'].max(),
|
|
total_candles_analyzed=len(monthly_data)
|
|
)
|
|
|
|
logger.info(f"Extracted pivot bounds for {symbol}:")
|
|
logger.info(f" Price range: ${bounds.price_min:.2f} - ${bounds.price_max:.2f}")
|
|
logger.info(f" Volume range: {bounds.volume_min:.2f} - {bounds.volume_max:.2f}")
|
|
logger.info(f" Support levels: {len(bounds.pivot_support_levels)}")
|
|
logger.info(f" Resistance levels: {len(bounds.pivot_resistance_levels)}")
|
|
|
|
return bounds
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting bounds from pivot levels: {e}")
|
|
# Fallback to simple min/max bounds
|
|
return PivotBounds(
|
|
symbol=symbol,
|
|
price_max=float(monthly_data['high'].max()),
|
|
price_min=float(monthly_data['low'].min()),
|
|
volume_max=float(monthly_data['volume'].max()),
|
|
volume_min=float(monthly_data['volume'].min()),
|
|
pivot_support_levels=[],
|
|
pivot_resistance_levels=[],
|
|
pivot_context={},
|
|
created_timestamp=datetime.now(),
|
|
data_period_start=monthly_data['timestamp'].min(),
|
|
data_period_end=monthly_data['timestamp'].max(),
|
|
total_candles_analyzed=len(monthly_data)
|
|
)
|
|
|
|
def _simple_pivot_detection(self, monthly_data: pd.DataFrame) -> Dict[str, Any]:
|
|
"""Simple pivot detection fallback when Williams Market Structure is not available"""
|
|
try:
|
|
# Simple high/low pivot detection using rolling windows
|
|
highs = monthly_data['high']
|
|
lows = monthly_data['low']
|
|
|
|
# Find local maxima and minima using different windows
|
|
pivot_highs = []
|
|
pivot_lows = []
|
|
|
|
for window in [5, 10, 20, 50]:
|
|
if len(monthly_data) > window * 2:
|
|
# Rolling max/min detection
|
|
rolling_max = highs.rolling(window=window, center=True).max()
|
|
rolling_min = lows.rolling(window=window, center=True).min()
|
|
|
|
# Find pivot highs (local maxima)
|
|
high_pivots = monthly_data[highs == rolling_max]['high'].tolist()
|
|
pivot_highs.extend(high_pivots)
|
|
|
|
# Find pivot lows (local minima)
|
|
low_pivots = monthly_data[lows == rolling_min]['low'].tolist()
|
|
pivot_lows.extend(low_pivots)
|
|
|
|
# Create mock level structure
|
|
mock_level = type('MockLevel', (), {
|
|
'swing_points': [],
|
|
'support_levels': list(set(pivot_lows)),
|
|
'resistance_levels': list(set(pivot_highs))
|
|
})()
|
|
|
|
return {'level_0': mock_level}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in simple pivot detection: {e}")
|
|
return {}
|
|
|
|
def _should_refresh_pivot_bounds(self, symbol: str) -> bool:
|
|
"""Check if pivot bounds need refreshing"""
|
|
try:
|
|
if symbol not in self.pivot_bounds:
|
|
return True
|
|
|
|
bounds = self.pivot_bounds[symbol]
|
|
age = datetime.now() - bounds.created_timestamp
|
|
|
|
return age > self.pivot_refresh_interval
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking pivot bounds refresh: {e}")
|
|
return True
|
|
|
|
def _refresh_pivot_bounds_for_symbol(self, symbol: str):
|
|
"""Refresh pivot bounds for a specific symbol"""
|
|
try:
|
|
# Collect monthly 1m data
|
|
monthly_data = self._collect_monthly_1m_data(symbol)
|
|
if monthly_data is None or monthly_data.empty:
|
|
logger.warning(f"Could not collect monthly data for {symbol}")
|
|
return
|
|
|
|
# Extract pivot bounds
|
|
bounds = self._extract_pivot_bounds_from_monthly_data(symbol, monthly_data)
|
|
if bounds is None:
|
|
logger.warning(f"Could not extract pivot bounds for {symbol}")
|
|
return
|
|
|
|
# Store bounds
|
|
self.pivot_bounds[symbol] = bounds
|
|
|
|
# Save to cache
|
|
self._save_pivot_bounds_to_cache(symbol, bounds)
|
|
|
|
logger.info(f"Successfully refreshed pivot bounds for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing pivot bounds for {symbol}: {e}")
|
|
|
|
def _add_pivot_context_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame:
|
|
"""Add pivot-derived context features for normalization"""
|
|
try:
|
|
if symbol not in self.pivot_bounds:
|
|
return df
|
|
|
|
bounds = self.pivot_bounds[symbol]
|
|
current_prices = df['close']
|
|
|
|
# Distance to nearest support/resistance levels (normalized)
|
|
df['pivot_support_distance'] = current_prices.apply(bounds.get_nearest_support_distance)
|
|
df['pivot_resistance_distance'] = current_prices.apply(bounds.get_nearest_resistance_distance)
|
|
|
|
# Price position within pivot range (0 = price_min, 1 = price_max)
|
|
df['pivot_price_position'] = current_prices.apply(bounds.normalize_price).clip(0, 1)
|
|
|
|
# Add binary features for proximity to key levels
|
|
price_range = bounds.get_price_range()
|
|
proximity_threshold = price_range * 0.02 # 2% of price range
|
|
|
|
df['near_pivot_support'] = 0
|
|
df['near_pivot_resistance'] = 0
|
|
|
|
for price in current_prices:
|
|
# Check if near any support level
|
|
if any(abs(price - s) <= proximity_threshold for s in bounds.pivot_support_levels):
|
|
df.loc[df['close'] == price, 'near_pivot_support'] = 1
|
|
|
|
# Check if near any resistance level
|
|
if any(abs(price - r) <= proximity_threshold for r in bounds.pivot_resistance_levels):
|
|
df.loc[df['close'] == price, 'near_pivot_resistance'] = 1
|
|
|
|
logger.debug(f"Added pivot context features for {symbol}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error adding pivot context features for {symbol}: {e}")
|
|
return df
|
|
|
|
def _extract_symbol_from_dataframe(self, df: pd.DataFrame) -> Optional[str]:
|
|
"""Extract symbol from dataframe context (basic implementation)"""
|
|
# This is a simple implementation - in a real system, you might pass symbol explicitly
|
|
# or store it as metadata in the dataframe
|
|
for symbol in self.symbols:
|
|
# Check if this dataframe might belong to this symbol based on current processing
|
|
return symbol # Return first symbol for now - can be improved
|
|
return None
|
|
|
|
# === PIVOT BOUNDS CACHING ===
|
|
|
|
def _load_all_pivot_bounds(self):
|
|
"""Load all cached pivot bounds on startup"""
|
|
try:
|
|
for symbol in self.symbols:
|
|
bounds = self._load_pivot_bounds_from_cache(symbol)
|
|
if bounds:
|
|
self.pivot_bounds[symbol] = bounds
|
|
logger.info(f"Loaded cached pivot bounds for {symbol}")
|
|
except Exception as e:
|
|
logger.error(f"Error loading pivot bounds from cache: {e}")
|
|
|
|
def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]:
|
|
"""Load pivot bounds from cache"""
|
|
try:
|
|
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
|
|
if cache_file.exists():
|
|
with open(cache_file, 'rb') as f:
|
|
bounds = pickle.load(f)
|
|
|
|
# Check if bounds are still valid (not too old)
|
|
age = datetime.now() - bounds.created_timestamp
|
|
if age <= self.pivot_refresh_interval:
|
|
return bounds
|
|
else:
|
|
logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)")
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}")
|
|
return None
|
|
|
|
def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds):
|
|
"""Save pivot bounds to cache"""
|
|
try:
|
|
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
|
|
with open(cache_file, 'wb') as f:
|
|
pickle.dump(bounds, f)
|
|
logger.debug(f"Saved pivot bounds to cache for {symbol}")
|
|
except Exception as e:
|
|
logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}")
|
|
|
|
def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]:
|
|
"""Load monthly 1m data from cache"""
|
|
try:
|
|
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
|
|
if cache_file.exists():
|
|
try:
|
|
df = pd.read_parquet(cache_file)
|
|
logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}")
|
|
return df
|
|
except Exception as parquet_e:
|
|
# Handle corrupted Parquet file
|
|
if "Parquet magic bytes not found" in str(parquet_e) or "corrupted" in str(parquet_e).lower():
|
|
logger.warning(f"Corrupted Parquet cache file for {symbol}, removing and returning None: {parquet_e}")
|
|
try:
|
|
cache_file.unlink() # Delete corrupted file
|
|
except Exception:
|
|
pass
|
|
return None
|
|
else:
|
|
raise parquet_e
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error loading monthly data from cache for {symbol}: {e}")
|
|
return None
|
|
|
|
def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame):
|
|
"""Save monthly 1m data to cache"""
|
|
try:
|
|
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
|
|
df.to_parquet(cache_file, index=False)
|
|
logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}")
|
|
except Exception as e:
|
|
logger.warning(f"Error saving monthly data to cache for {symbol}: {e}")
|
|
|
|
def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]:
|
|
"""Get pivot bounds for a symbol"""
|
|
return self.pivot_bounds.get(symbol)
|
|
|
|
def get_pivot_normalized_features(self, symbol: str, df: pd.DataFrame) -> Optional[pd.DataFrame]:
|
|
"""Get dataframe with pivot-normalized features"""
|
|
try:
|
|
if symbol not in self.pivot_bounds:
|
|
logger.warning(f"No pivot bounds available for {symbol}")
|
|
return df
|
|
|
|
bounds = self.pivot_bounds[symbol]
|
|
normalized_df = df.copy()
|
|
|
|
# Normalize price columns using pivot bounds
|
|
price_range = bounds.get_price_range()
|
|
for col in ['open', 'high', 'low', 'close']:
|
|
if col in normalized_df.columns:
|
|
normalized_df[col] = (normalized_df[col] - bounds.price_min) / price_range
|
|
|
|
# Normalize volume using pivot bounds
|
|
volume_range = bounds.volume_max - bounds.volume_min
|
|
if volume_range > 0 and 'volume' in normalized_df.columns:
|
|
normalized_df['volume'] = (normalized_df['volume'] - bounds.volume_min) / volume_range
|
|
|
|
return normalized_df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying pivot normalization for {symbol}: {e}")
|
|
return df
|
|
|
|
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Add basic indicators for small datasets"""
|
|
try:
|
|
df = df.copy()
|
|
|
|
# Basic moving averages
|
|
if len(df) >= 20:
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
|
|
|
|
# Basic RSI
|
|
if len(df) >= 14:
|
|
df['rsi_14'] = ta.momentum.rsi(df['close'], window=14)
|
|
|
|
# Basic volume indicators
|
|
if len(df) >= 10:
|
|
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
|
|
|
|
# Basic price action
|
|
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
|
|
df['price_position'] = df['price_position'].fillna(0.5) # Default to middle
|
|
|
|
# Fill NaN values
|
|
df = df.ffill().bfill().fillna(0)
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding basic indicators: {e}")
|
|
return df
|
|
|
|
def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
|
"""Load data from cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
if cache_file.exists():
|
|
# Check if cache is recent - stricter rules for startup
|
|
cache_age = time.time() - cache_file.stat().st_mtime
|
|
|
|
# For 1m data, use cache only if less than 5 minutes old to avoid gaps
|
|
if timeframe == '1m':
|
|
max_age = 300 # 5 minutes
|
|
else:
|
|
max_age = 3600 # 1 hour for other timeframes
|
|
|
|
if cache_age < max_age:
|
|
try:
|
|
df = pd.read_parquet(cache_file)
|
|
logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe} (age: {cache_age/60:.1f}min)")
|
|
return df
|
|
except Exception as parquet_e:
|
|
# Handle corrupted Parquet file
|
|
if "Parquet magic bytes not found" in str(parquet_e) or "corrupted" in str(parquet_e).lower():
|
|
logger.warning(f"Corrupted Parquet cache file for {symbol} {timeframe}, removing and returning None: {parquet_e}")
|
|
try:
|
|
cache_file.unlink() # Delete corrupted file
|
|
except Exception:
|
|
pass
|
|
return None
|
|
else:
|
|
raise parquet_e
|
|
else:
|
|
logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/60:.1f}min > {max_age/60:.1f}min)")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str):
|
|
"""Save data to cache"""
|
|
try:
|
|
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
|
|
df.to_parquet(cache_file, index=False)
|
|
logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}")
|
|
except Exception as e:
|
|
logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}")
|
|
|
|
async def start_real_time_streaming(self):
|
|
"""Start real-time data streaming for all symbols"""
|
|
if self.is_streaming:
|
|
logger.warning("Real-time streaming already active")
|
|
return
|
|
|
|
self.is_streaming = True
|
|
logger.info("Starting real-time data streaming")
|
|
|
|
# Start WebSocket for each symbol
|
|
for symbol in self.symbols:
|
|
task = asyncio.create_task(self._websocket_stream(symbol))
|
|
self.websocket_tasks[symbol] = task
|
|
|
|
async def stop_real_time_streaming(self):
|
|
"""Stop real-time data streaming"""
|
|
if not self.is_streaming:
|
|
return
|
|
|
|
logger.info("Stopping real-time data streaming")
|
|
self.is_streaming = False
|
|
|
|
# Cancel all WebSocket tasks
|
|
for symbol, task in self.websocket_tasks.items():
|
|
if not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self.websocket_tasks.clear()
|
|
|
|
async def _websocket_stream(self, symbol: str):
|
|
"""WebSocket stream for a single symbol using trade stream for better granularity"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade"
|
|
|
|
while self.is_streaming:
|
|
try:
|
|
logger.info(f"Connecting to WebSocket for {symbol}: {url}")
|
|
|
|
async with websockets.connect(url) as websocket:
|
|
logger.info(f"WebSocket connected for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
await self._process_trade_message(binance_symbol, message)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing trade message for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for {symbol}: {e}")
|
|
self.distribution_stats['distribution_errors'] += 1
|
|
|
|
if self.is_streaming:
|
|
logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def _process_trade_message(self, symbol: str, message: str):
|
|
"""Process incoming trade message and distribute to subscribers"""
|
|
try:
|
|
trade_data = json.loads(message)
|
|
|
|
# Extract trade information
|
|
price = float(trade_data.get('p', 0))
|
|
quantity = float(trade_data.get('q', 0))
|
|
timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000)
|
|
is_buyer_maker = trade_data.get('m', False)
|
|
trade_id = trade_data.get('t', '')
|
|
|
|
# Calculate volume in USDT
|
|
volume_usdt = price * quantity
|
|
|
|
# Data validation
|
|
if not self._validate_tick_data(symbol, price, volume_usdt):
|
|
logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}")
|
|
return
|
|
|
|
# Process raw tick through aggregator
|
|
side = 'sell' if is_buyer_maker else 'buy'
|
|
raw_tick, completed_bar = self.tick_aggregator.process_tick(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
price=price,
|
|
volume=volume_usdt,
|
|
quantity=quantity,
|
|
side=side,
|
|
trade_id=str(trade_id)
|
|
)
|
|
|
|
# Update statistics
|
|
self.distribution_stats['total_ticks_received'] += 1
|
|
self.distribution_stats['ticks_per_symbol'][symbol] += 1
|
|
self.distribution_stats['last_tick_time'][symbol] = timestamp
|
|
self.last_prices[symbol] = price
|
|
|
|
if raw_tick:
|
|
self.distribution_stats['raw_ticks_processed'] += 1
|
|
|
|
# Notify raw tick callbacks
|
|
for callback in self.raw_tick_callbacks:
|
|
try:
|
|
callback(raw_tick)
|
|
except Exception as e:
|
|
logger.error(f"Error in raw tick callback: {e}")
|
|
|
|
if completed_bar:
|
|
self.distribution_stats['ohlcv_bars_created'] += 1
|
|
|
|
# Notify OHLCV bar callbacks
|
|
for callback in self.ohlcv_bar_callbacks:
|
|
try:
|
|
callback(completed_bar)
|
|
except Exception as e:
|
|
logger.error(f"Error in OHLCV bar callback: {e}")
|
|
|
|
# Create standardized tick for legacy compatibility
|
|
tick = MarketTick(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
price=price,
|
|
volume=volume_usdt,
|
|
quantity=quantity,
|
|
side=side,
|
|
trade_id=str(trade_id),
|
|
is_buyer_maker=is_buyer_maker,
|
|
raw_data=trade_data
|
|
)
|
|
|
|
# Add to buffer
|
|
self.tick_buffers[symbol].append(tick)
|
|
|
|
# Update current prices and candles
|
|
await self._process_tick(symbol, tick)
|
|
|
|
# Distribute to all subscribers
|
|
self._distribute_tick(tick)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing trade message for {symbol}: {e}")
|
|
|
|
async def _process_tick(self, symbol: str, tick: MarketTick):
|
|
"""Process a single tick and update candles"""
|
|
try:
|
|
# Update current price
|
|
with self.data_lock:
|
|
self.current_prices[symbol] = tick.price
|
|
|
|
# Initialize real-time data structure if needed
|
|
if symbol not in self.real_time_data:
|
|
self.real_time_data[symbol] = {}
|
|
for tf in self.timeframes:
|
|
self.real_time_data[symbol][tf] = deque(maxlen=1000)
|
|
|
|
# Create tick record for candle updates
|
|
tick_record = {
|
|
'timestamp': tick.timestamp,
|
|
'price': tick.price,
|
|
'volume': tick.volume
|
|
}
|
|
|
|
# Update all timeframes
|
|
for timeframe in self.timeframes:
|
|
self._update_candle(symbol, timeframe, tick_record)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tick for {symbol}: {e}")
|
|
|
|
def _update_candle(self, symbol: str, timeframe: str, tick: Dict):
|
|
"""Update candle for specific timeframe"""
|
|
try:
|
|
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
|
|
current_time = tick['timestamp']
|
|
|
|
# Calculate candle start time using proper datetime truncation
|
|
if isinstance(current_time, datetime):
|
|
timestamp_seconds = current_time.timestamp()
|
|
else:
|
|
timestamp_seconds = current_time.timestamp() if hasattr(current_time, 'timestamp') else current_time
|
|
|
|
# Truncate to timeframe boundary
|
|
candle_start_seconds = int(timestamp_seconds // timeframe_secs) * timeframe_secs
|
|
candle_start = datetime.fromtimestamp(candle_start_seconds)
|
|
|
|
# Get current candle queue
|
|
candle_queue = self.real_time_data[symbol][timeframe]
|
|
|
|
# Check if we need a new candle
|
|
if not candle_queue or candle_queue[-1]['timestamp'] != candle_start:
|
|
# Create new candle
|
|
new_candle = {
|
|
'timestamp': candle_start,
|
|
'open': tick['price'],
|
|
'high': tick['price'],
|
|
'low': tick['price'],
|
|
'close': tick['price'],
|
|
'volume': tick['volume']
|
|
}
|
|
candle_queue.append(new_candle)
|
|
else:
|
|
# Update existing candle
|
|
current_candle = candle_queue[-1]
|
|
current_candle['high'] = max(current_candle['high'], tick['price'])
|
|
current_candle['low'] = min(current_candle['low'], tick['price'])
|
|
current_candle['close'] = tick['price']
|
|
current_candle['volume'] += tick['volume']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating candle for {symbol} {timeframe}: {e}")
|
|
|
|
def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
|
|
"""Get the latest candles combining historical and real-time data"""
|
|
try:
|
|
# Get historical data
|
|
historical_df = self.get_historical_data(symbol, timeframe, limit=limit)
|
|
|
|
# Get real-time data
|
|
with self.data_lock:
|
|
if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]:
|
|
real_time_candles = list(self.real_time_data[symbol][timeframe])
|
|
|
|
if real_time_candles:
|
|
# Convert to DataFrame
|
|
rt_df = pd.DataFrame(real_time_candles)
|
|
|
|
if historical_df is not None:
|
|
# Combine historical and real-time
|
|
# Remove overlapping candles from historical data
|
|
if not rt_df.empty:
|
|
cutoff_time = rt_df['timestamp'].min()
|
|
historical_df = historical_df[historical_df['timestamp'] < cutoff_time]
|
|
|
|
# Concatenate
|
|
combined_df = pd.concat([historical_df, rt_df], ignore_index=True)
|
|
else:
|
|
combined_df = rt_df
|
|
|
|
return combined_df.tail(limit)
|
|
|
|
# Return just historical data if no real-time data
|
|
return historical_df.tail(limit) if historical_df is not None else pd.DataFrame()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_current_price(self, symbol: str) -> Optional[float]:
|
|
"""Get current price for a symbol from latest candle"""
|
|
try:
|
|
# Try to get from 1s candle first (most recent)
|
|
for tf in ['1s', '1m', '5m', '1h']:
|
|
df = self.get_latest_candles(symbol, tf, limit=1)
|
|
if df is not None and not df.empty:
|
|
return float(df.iloc[-1]['close'])
|
|
|
|
# Fallback to any available data
|
|
key = f"{symbol}_{self.timeframes[0]}"
|
|
if key in self.historical_data and not self.historical_data[key].empty:
|
|
return float(self.historical_data[key].iloc[-1]['close'])
|
|
|
|
logger.warning(f"No price data available for {symbol}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting current price for {symbol}: {e}")
|
|
return None
|
|
|
|
def calculate_williams_pivot_points(self, symbol: str, force_recalculate: bool = False) -> Dict[int, TrendLevel]:
|
|
"""
|
|
Calculate Williams Market Structure pivot points for a symbol
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
force_recalculate: Force recalculation even if cache is fresh
|
|
|
|
Returns:
|
|
Dictionary of trend levels with pivot points
|
|
"""
|
|
try:
|
|
# Check if we need to recalculate
|
|
now = datetime.now()
|
|
if (not force_recalculate and
|
|
symbol in self.last_pivot_calculation and
|
|
now - self.last_pivot_calculation[symbol] < self.pivot_calculation_interval):
|
|
# Return cached results
|
|
return self.pivot_points_cache.get(symbol, {})
|
|
|
|
# Get 1s OHLCV data for Williams Market Structure calculation
|
|
df_1s = self.get_historical_data(symbol, '1s', limit=1000)
|
|
if df_1s is None or len(df_1s) < 50:
|
|
logger.warning(f"Insufficient 1s data for Williams pivot calculation: {symbol}")
|
|
return {}
|
|
|
|
# Convert DataFrame to numpy array for Williams calculation
|
|
# Format: [timestamp_ms, open, high, low, close, volume]
|
|
ohlcv_array = np.column_stack([
|
|
df_1s.index.astype(np.int64) // 10**6, # Convert to milliseconds
|
|
df_1s['open'].values,
|
|
df_1s['high'].values,
|
|
df_1s['low'].values,
|
|
df_1s['close'].values,
|
|
df_1s['volume'].values
|
|
])
|
|
|
|
# Calculate recursive pivot points using Williams Market Structure
|
|
williams = self.williams_structure[symbol]
|
|
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
|
|
|
|
# Cache the results
|
|
self.pivot_points_cache[symbol] = pivot_levels
|
|
self.last_pivot_calculation[symbol] = now
|
|
|
|
logger.debug(f"Calculated Williams pivot points for {symbol}: {len(pivot_levels)} levels")
|
|
return pivot_levels
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating Williams pivot points for {symbol}: {e}")
|
|
return {}
|
|
|
|
def get_pivot_features_for_ml(self, symbol: str) -> np.ndarray:
|
|
"""
|
|
Get pivot point features for machine learning models
|
|
|
|
Returns a 250-element feature vector containing:
|
|
- Recent pivot points (price, strength, type) for each level
|
|
- Trend direction and strength for each level
|
|
- Time since last pivot for each level
|
|
"""
|
|
try:
|
|
# Ensure we have fresh pivot points
|
|
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
|
|
|
if not pivot_levels:
|
|
logger.warning(f"No pivot points available for {symbol}")
|
|
return np.zeros(250, dtype=np.float32)
|
|
|
|
# Use Williams Market Structure to extract ML features
|
|
williams = self.williams_structure[symbol]
|
|
features = williams.get_pivot_features_for_ml(symbol)
|
|
|
|
return features
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting pivot features for ML: {e}")
|
|
return np.zeros(250, dtype=np.float32)
|
|
|
|
def get_market_structure_summary(self, symbol: str) -> Dict[str, Any]:
|
|
"""
|
|
Get current market structure summary for dashboard display
|
|
|
|
Returns:
|
|
Dictionary containing market structure information
|
|
"""
|
|
try:
|
|
# Ensure we have fresh pivot points
|
|
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
|
|
|
if not pivot_levels:
|
|
return {
|
|
'symbol': symbol,
|
|
'levels': {},
|
|
'overall_trend': 'sideways',
|
|
'overall_strength': 0.0,
|
|
'last_update': datetime.now().isoformat(),
|
|
'error': 'No pivot points available'
|
|
}
|
|
|
|
# Use Williams Market Structure to get summary
|
|
williams = self.williams_structure[symbol]
|
|
structure = williams.get_current_market_structure()
|
|
structure['symbol'] = symbol
|
|
|
|
return structure
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting market structure summary for {symbol}: {e}")
|
|
return {
|
|
'symbol': symbol,
|
|
'levels': {},
|
|
'overall_trend': 'sideways',
|
|
'overall_strength': 0.0,
|
|
'last_update': datetime.now().isoformat(),
|
|
'error': str(e)
|
|
}
|
|
|
|
def get_recent_pivot_points(self, symbol: str, level: int = 1, count: int = 10) -> List[PivotPoint]:
|
|
"""
|
|
Get recent pivot points for a specific level
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
level: Pivot level (1-5)
|
|
count: Number of recent pivots to return
|
|
|
|
Returns:
|
|
List of recent pivot points
|
|
"""
|
|
try:
|
|
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
|
|
|
if level not in pivot_levels:
|
|
return []
|
|
|
|
trend_level = pivot_levels[level]
|
|
recent_pivots = trend_level.pivot_points[-count:] if len(trend_level.pivot_points) >= count else trend_level.pivot_points
|
|
|
|
return recent_pivots
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting recent pivot points for {symbol} level {level}: {e}")
|
|
return []
|
|
|
|
def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]:
|
|
"""Get price at specific index for backtesting"""
|
|
try:
|
|
key = f"{symbol}_{timeframe}"
|
|
if key in self.historical_data:
|
|
df = self.historical_data[key]
|
|
if 0 <= index < len(df):
|
|
return float(df.iloc[index]['close'])
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting price at index {index}: {e}")
|
|
return None
|
|
|
|
def get_feature_matrix(self, symbol: str, timeframes: List[str] = None,
|
|
window_size: int = 20) -> Optional[np.ndarray]:
|
|
"""
|
|
Get comprehensive feature matrix for multiple timeframes with technical indicators
|
|
|
|
Returns:
|
|
np.ndarray: Shape (n_timeframes, window_size, n_features)
|
|
Each timeframe becomes a separate channel for CNN
|
|
"""
|
|
try:
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
feature_channels = []
|
|
common_feature_names = None
|
|
|
|
# First pass: determine common features across all timeframes
|
|
timeframe_features = {}
|
|
for tf in timeframes:
|
|
logger.debug(f"Processing timeframe {tf} for {symbol}")
|
|
df = self.get_latest_candles(symbol, tf, limit=window_size + 100)
|
|
|
|
if df is None or len(df) < window_size:
|
|
logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows")
|
|
continue
|
|
|
|
# Get feature columns
|
|
basic_cols = ['open', 'high', 'low', 'close', 'volume']
|
|
indicator_cols = [col for col in df.columns
|
|
if col not in basic_cols + ['timestamp'] and not col.startswith('unnamed')]
|
|
|
|
selected_features = self._select_cnn_features(df, basic_cols, indicator_cols)
|
|
timeframe_features[tf] = (df, selected_features)
|
|
|
|
if common_feature_names is None:
|
|
common_feature_names = set(selected_features)
|
|
else:
|
|
common_feature_names = common_feature_names.intersection(set(selected_features))
|
|
|
|
if not common_feature_names:
|
|
logger.error(f"No common features found across timeframes for {symbol}")
|
|
return None
|
|
|
|
# Convert to sorted list for consistent ordering
|
|
common_feature_names = sorted(list(common_feature_names))
|
|
# logger.info(f"Using {len(common_feature_names)} common features: {common_feature_names}")
|
|
|
|
# Second pass: create feature channels with common features
|
|
for tf in timeframes:
|
|
if tf not in timeframe_features:
|
|
continue
|
|
|
|
df, _ = timeframe_features[tf]
|
|
|
|
# Use only common features
|
|
try:
|
|
tf_features = self._normalize_features(df[common_feature_names].tail(window_size), symbol=symbol)
|
|
|
|
if tf_features is not None and len(tf_features) == window_size:
|
|
feature_channels.append(tf_features.values)
|
|
logger.debug(f"Added {len(common_feature_names)} features for {tf}")
|
|
else:
|
|
logger.warning(f"Feature normalization failed for {tf}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing features for {tf}: {e}")
|
|
continue
|
|
|
|
if not feature_channels:
|
|
logger.error(f"No valid feature channels created for {symbol}")
|
|
return None
|
|
|
|
# Verify all channels have the same shape
|
|
shapes = [channel.shape for channel in feature_channels]
|
|
if len(set(shapes)) > 1:
|
|
logger.error(f"Shape mismatch in feature channels: {shapes}")
|
|
return None
|
|
|
|
# Stack all timeframe channels
|
|
feature_matrix = np.stack(feature_channels, axis=0)
|
|
|
|
logger.debug(f"Created feature matrix for {symbol}: {feature_matrix.shape} "
|
|
f"({len(feature_channels)} timeframes, {window_size} steps, {len(common_feature_names)} features)")
|
|
|
|
return feature_matrix
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating feature matrix for {symbol}: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def _select_cnn_features(self, df: pd.DataFrame, basic_cols: List[str], indicator_cols: List[str]) -> List[str]:
|
|
"""Select the most important features for CNN training"""
|
|
try:
|
|
selected = []
|
|
|
|
# Always include basic OHLCV (normalized)
|
|
selected.extend(basic_cols)
|
|
|
|
# Priority indicators (most informative for CNNs)
|
|
priority_indicators = [
|
|
# Trend indicators
|
|
'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50',
|
|
'macd', 'macd_signal', 'macd_histogram',
|
|
'adx', 'adx_pos', 'adx_neg', 'psar',
|
|
|
|
# Momentum indicators
|
|
'rsi_14', 'rsi_7', 'rsi_21',
|
|
'stoch_k', 'stoch_d', 'williams_r', 'ultimate_osc',
|
|
|
|
# Volatility indicators
|
|
'bb_upper', 'bb_lower', 'bb_middle', 'bb_width', 'bb_percent',
|
|
'atr', 'keltner_upper', 'keltner_lower', 'keltner_middle',
|
|
|
|
# Volume indicators
|
|
'volume_sma_10', 'volume_sma_20', 'obv', 'vpt', 'mfi', 'ad_line', 'vwap',
|
|
|
|
# Price action
|
|
'price_position', 'true_range', 'roc',
|
|
|
|
# Custom composites
|
|
'trend_strength', 'momentum_composite', 'volatility_regime'
|
|
]
|
|
|
|
# Add available priority indicators
|
|
for indicator in priority_indicators:
|
|
if indicator in indicator_cols:
|
|
selected.append(indicator)
|
|
|
|
# Add any other technical indicators not in priority list (limit to avoid curse of dimensionality)
|
|
remaining_indicators = [col for col in indicator_cols if col not in selected]
|
|
if remaining_indicators:
|
|
# Limit to 10 additional indicators
|
|
selected.extend(remaining_indicators[:10])
|
|
|
|
# Verify all selected features exist in dataframe
|
|
final_selected = [col for col in selected if col in df.columns]
|
|
|
|
logger.debug(f"Selected {len(final_selected)} features from {len(df.columns)} available columns")
|
|
return final_selected
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error selecting CNN features: {e}")
|
|
return basic_cols # Fallback to basic OHLCV
|
|
|
|
def _normalize_features(self, df: pd.DataFrame, symbol: str = None) -> Optional[pd.DataFrame]:
|
|
"""Normalize features for CNN training using pivot-based bounds when available"""
|
|
try:
|
|
df_norm = df.copy()
|
|
|
|
# Try to use pivot-based normalization if available
|
|
if symbol and symbol in self.pivot_bounds:
|
|
bounds = self.pivot_bounds[symbol]
|
|
price_range = bounds.get_price_range()
|
|
|
|
# Normalize price-based features using pivot bounds
|
|
price_cols = ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
|
|
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
|
|
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']
|
|
|
|
for col in price_cols:
|
|
if col in df_norm.columns:
|
|
# Use pivot bounds for normalization
|
|
df_norm[col] = (df_norm[col] - bounds.price_min) / price_range
|
|
|
|
# Normalize volume using pivot bounds
|
|
if 'volume' in df_norm.columns:
|
|
volume_range = bounds.volume_max - bounds.volume_min
|
|
if volume_range > 0:
|
|
df_norm['volume'] = (df_norm['volume'] - bounds.volume_min) / volume_range
|
|
else:
|
|
df_norm['volume'] = 0.5 # Default to middle if no volume range
|
|
|
|
logger.debug(f"Applied pivot-based normalization for {symbol}")
|
|
|
|
else:
|
|
# Fallback to traditional normalization when pivot bounds not available
|
|
logger.debug("Using traditional normalization (no pivot bounds available)")
|
|
|
|
for col in df_norm.columns:
|
|
if col in ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
|
|
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
|
|
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']:
|
|
# Price-based indicators: normalize by close price
|
|
if 'close' in df_norm.columns:
|
|
base_price = df_norm['close'].iloc[-1] # Use latest close as reference
|
|
if base_price > 0:
|
|
df_norm[col] = df_norm[col] / base_price
|
|
|
|
elif col == 'volume':
|
|
# Volume: normalize by its own rolling mean
|
|
volume_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
|
|
if volume_mean > 0:
|
|
df_norm[col] = df_norm[col] / volume_mean
|
|
|
|
# Normalize indicators that have standard ranges (regardless of pivot bounds)
|
|
for col in df_norm.columns:
|
|
if col in ['rsi_14', 'rsi_7', 'rsi_21']:
|
|
# RSI: already 0-100, normalize to 0-1
|
|
df_norm[col] = df_norm[col] / 100.0
|
|
|
|
elif col in ['stoch_k', 'stoch_d']:
|
|
# Stochastic: already 0-100, normalize to 0-1
|
|
df_norm[col] = df_norm[col] / 100.0
|
|
|
|
elif col == 'williams_r':
|
|
# Williams %R: -100 to 0, normalize to 0-1
|
|
df_norm[col] = (df_norm[col] + 100) / 100.0
|
|
|
|
elif col in ['macd', 'macd_signal', 'macd_histogram']:
|
|
# MACD: normalize by ATR or close price
|
|
if 'atr' in df_norm.columns and df_norm['atr'].iloc[-1] > 0:
|
|
df_norm[col] = df_norm[col] / df_norm['atr'].iloc[-1]
|
|
elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0:
|
|
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
|
|
|
|
elif col in ['bb_width', 'bb_percent', 'price_position', 'trend_strength',
|
|
'momentum_composite', 'volatility_regime', 'pivot_price_position',
|
|
'pivot_support_distance', 'pivot_resistance_distance']:
|
|
# Already normalized indicators: ensure 0-1 range
|
|
df_norm[col] = np.clip(df_norm[col], 0, 1)
|
|
|
|
elif col in ['atr', 'true_range']:
|
|
# Volatility indicators: normalize by close price or pivot range
|
|
if symbol and symbol in self.pivot_bounds:
|
|
bounds = self.pivot_bounds[symbol]
|
|
df_norm[col] = df_norm[col] / bounds.get_price_range()
|
|
elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0:
|
|
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
|
|
|
|
elif col not in ['timestamp', 'near_pivot_support', 'near_pivot_resistance']:
|
|
# Other indicators: z-score normalization
|
|
col_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
|
|
col_std = df_norm[col].rolling(window=min(20, len(df_norm))).std().iloc[-1]
|
|
if col_std > 0:
|
|
df_norm[col] = (df_norm[col] - col_mean) / col_std
|
|
else:
|
|
df_norm[col] = 0
|
|
|
|
# Replace inf/-inf with 0
|
|
df_norm = df_norm.replace([np.inf, -np.inf], 0)
|
|
|
|
# Fill any remaining NaN values
|
|
df_norm = df_norm.fillna(0)
|
|
|
|
return df_norm
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error normalizing features: {e}")
|
|
return df
|
|
|
|
def get_multi_symbol_feature_matrix(self, symbols: List[str] = None,
|
|
timeframes: List[str] = None,
|
|
window_size: int = 20) -> Optional[np.ndarray]:
|
|
"""
|
|
Get feature matrix for multiple symbols and timeframes
|
|
|
|
Returns:
|
|
np.ndarray: Shape (n_symbols, n_timeframes, window_size, n_features)
|
|
"""
|
|
try:
|
|
if symbols is None:
|
|
symbols = self.symbols
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
symbol_matrices = []
|
|
|
|
for symbol in symbols:
|
|
symbol_matrix = self.get_feature_matrix(symbol, timeframes, window_size)
|
|
if symbol_matrix is not None:
|
|
symbol_matrices.append(symbol_matrix)
|
|
else:
|
|
logger.warning(f"Could not create feature matrix for {symbol}")
|
|
|
|
if symbol_matrices:
|
|
# Stack all symbol matrices
|
|
multi_symbol_matrix = np.stack(symbol_matrices, axis=0)
|
|
logger.info(f"Created multi-symbol feature matrix: {multi_symbol_matrix.shape}")
|
|
return multi_symbol_matrix
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating multi-symbol feature matrix: {e}")
|
|
return None
|
|
|
|
def health_check(self) -> Dict[str, Any]:
|
|
"""Get health status of the data provider"""
|
|
status = {
|
|
'streaming': self.is_streaming,
|
|
'symbols': len(self.symbols),
|
|
'timeframes': len(self.timeframes),
|
|
'current_prices': len(self.current_prices),
|
|
'websocket_tasks': len(self.websocket_tasks),
|
|
'historical_data_loaded': {}
|
|
}
|
|
|
|
# Check historical data availability
|
|
for symbol in self.symbols:
|
|
status['historical_data_loaded'][symbol] = {}
|
|
for tf in self.timeframes:
|
|
has_data = (symbol in self.historical_data and
|
|
tf in self.historical_data[symbol] and
|
|
not self.historical_data[symbol][tf].empty)
|
|
status['historical_data_loaded'][symbol][tf] = has_data
|
|
|
|
return status
|
|
|
|
def subscribe_to_ticks(self, callback: Callable[[MarketTick], None],
|
|
symbols: List[str] = None,
|
|
subscriber_name: str = None) -> str:
|
|
"""Subscribe to real-time tick data updates"""
|
|
subscriber_id = str(uuid.uuid4())[:8]
|
|
subscriber_name = subscriber_name or f"subscriber_{subscriber_id}"
|
|
|
|
# Convert symbols to Binance format
|
|
if symbols:
|
|
binance_symbols = [s.replace('/', '').upper() for s in symbols]
|
|
else:
|
|
binance_symbols = [s.replace('/', '').upper() for s in self.symbols]
|
|
|
|
subscriber = DataSubscriber(
|
|
subscriber_id=subscriber_id,
|
|
callback=callback,
|
|
symbols=binance_symbols,
|
|
subscriber_name=subscriber_name
|
|
)
|
|
|
|
with self.subscriber_lock:
|
|
self.subscribers[subscriber_id] = subscriber
|
|
|
|
logger.info(f"New tick subscriber registered: {subscriber_name} ({subscriber_id}) for symbols: {binance_symbols}")
|
|
|
|
# Send recent tick data to new subscriber
|
|
self._send_recent_ticks_to_subscriber(subscriber)
|
|
|
|
return subscriber_id
|
|
|
|
def unsubscribe_from_ticks(self, subscriber_id: str):
|
|
"""Unsubscribe from tick data updates"""
|
|
with self.subscriber_lock:
|
|
if subscriber_id in self.subscribers:
|
|
subscriber_name = self.subscribers[subscriber_id].subscriber_name
|
|
self.subscribers[subscriber_id].active = False
|
|
del self.subscribers[subscriber_id]
|
|
logger.info(f"Subscriber {subscriber_name} ({subscriber_id}) unsubscribed")
|
|
|
|
def _send_recent_ticks_to_subscriber(self, subscriber: DataSubscriber):
|
|
"""Send recent tick data to a new subscriber"""
|
|
try:
|
|
for symbol in subscriber.symbols:
|
|
if symbol in self.tick_buffers:
|
|
# Send last 50 ticks to get subscriber up to speed
|
|
recent_ticks = list(self.tick_buffers[symbol])[-50:]
|
|
for tick in recent_ticks:
|
|
try:
|
|
subscriber.callback(tick)
|
|
except Exception as e:
|
|
logger.warning(f"Error sending recent tick to subscriber {subscriber.subscriber_id}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending recent ticks: {e}")
|
|
|
|
def _distribute_tick(self, tick: MarketTick):
|
|
"""Distribute tick to all relevant subscribers"""
|
|
distributed_count = 0
|
|
|
|
with self.subscriber_lock:
|
|
subscribers_to_remove = []
|
|
|
|
for subscriber_id, subscriber in self.subscribers.items():
|
|
if not subscriber.active:
|
|
subscribers_to_remove.append(subscriber_id)
|
|
continue
|
|
|
|
if tick.symbol in subscriber.symbols:
|
|
try:
|
|
# Call subscriber callback in a thread to avoid blocking
|
|
def call_callback():
|
|
try:
|
|
subscriber.callback(tick)
|
|
subscriber.tick_count += 1
|
|
subscriber.last_update = datetime.now()
|
|
except Exception as e:
|
|
logger.warning(f"Error in subscriber {subscriber_id} callback: {e}")
|
|
subscriber.active = False
|
|
|
|
# Use thread to avoid blocking the main data processing
|
|
Thread(target=call_callback, daemon=True).start()
|
|
distributed_count += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error distributing tick to subscriber {subscriber_id}: {e}")
|
|
subscriber.active = False
|
|
|
|
# Remove inactive subscribers
|
|
for subscriber_id in subscribers_to_remove:
|
|
if subscriber_id in self.subscribers:
|
|
del self.subscribers[subscriber_id]
|
|
|
|
self.distribution_stats['total_ticks_distributed'] += distributed_count
|
|
|
|
def _validate_tick_data(self, symbol: str, price: float, volume: float) -> bool:
|
|
"""Validate incoming tick data for quality"""
|
|
try:
|
|
# Basic validation
|
|
if price <= 0 or volume < 0:
|
|
return False
|
|
|
|
# Price change validation
|
|
last_price = self.last_prices.get(symbol, 0)
|
|
if last_price > 0:
|
|
price_change_pct = abs(price - last_price) / last_price
|
|
if price_change_pct > self.price_change_threshold:
|
|
logger.warning(f"Large price change for {symbol}: {price_change_pct:.2%}")
|
|
# Don't reject, just warn - could be legitimate
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating tick data: {e}")
|
|
return False
|
|
|
|
def get_recent_ticks(self, symbol: str, count: int = 100) -> List[MarketTick]:
|
|
"""Get recent ticks for a symbol"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
if binance_symbol in self.tick_buffers:
|
|
return list(self.tick_buffers[binance_symbol])[-count:]
|
|
return []
|
|
|
|
def subscribe_to_raw_ticks(self, callback: Callable[[RawTick], None]) -> str:
|
|
"""Subscribe to raw tick data with timing information"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.raw_tick_callbacks.append(callback)
|
|
logger.info(f"Raw tick subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def subscribe_to_ohlcv_bars(self, callback: Callable[[OHLCVBar], None]) -> str:
|
|
"""Subscribe to 1s OHLCV bars calculated from ticks"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.ohlcv_bar_callbacks.append(callback)
|
|
logger.info(f"OHLCV bar subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def get_raw_tick_features(self, symbol: str, window_size: int = 50) -> Optional[np.ndarray]:
|
|
"""Get raw tick features for model consumption"""
|
|
return self.tick_aggregator.get_tick_features_for_model(symbol, window_size)
|
|
|
|
def get_ohlcv_features(self, symbol: str, window_size: int = 60) -> Optional[np.ndarray]:
|
|
"""Get 1s OHLCV features for model consumption"""
|
|
return self.tick_aggregator.get_ohlcv_features_for_model(symbol, window_size)
|
|
|
|
def get_detected_patterns(self, symbol: str, count: int = 50) -> List:
|
|
"""Get recently detected tick patterns"""
|
|
return self.tick_aggregator.get_detected_patterns(symbol, count)
|
|
|
|
def get_tick_aggregator_stats(self) -> Dict[str, Any]:
|
|
"""Get tick aggregator statistics"""
|
|
return self.tick_aggregator.get_statistics()
|
|
|
|
def get_subscriber_stats(self) -> Dict[str, Any]:
|
|
"""Get subscriber and distribution statistics"""
|
|
with self.subscriber_lock:
|
|
active_subscribers = len([s for s in self.subscribers.values() if s.active])
|
|
subscriber_stats = {
|
|
sid: {
|
|
'name': s.subscriber_name,
|
|
'active': s.active,
|
|
'symbols': s.symbols,
|
|
'tick_count': s.tick_count,
|
|
'last_update': s.last_update.isoformat() if s.last_update else None
|
|
}
|
|
for sid, s in self.subscribers.items()
|
|
}
|
|
|
|
# Get tick aggregator stats
|
|
aggregator_stats = self.get_tick_aggregator_stats()
|
|
|
|
return {
|
|
'active_subscribers': active_subscribers,
|
|
'total_subscribers': len(self.subscribers),
|
|
'raw_tick_callbacks': len(self.raw_tick_callbacks),
|
|
'ohlcv_bar_callbacks': len(self.ohlcv_bar_callbacks),
|
|
'subscriber_details': subscriber_stats,
|
|
'distribution_stats': self.distribution_stats.copy(),
|
|
'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()},
|
|
'tick_aggregator': aggregator_stats
|
|
}
|
|
|
|
def update_bom_cache(self, symbol: str, bom_features: List[float], cob_integration=None):
|
|
"""
|
|
Update BOM cache with latest features for a symbol
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
bom_features: List of BOM features (should be 120 features)
|
|
cob_integration: Optional COB integration instance for real BOM data
|
|
"""
|
|
try:
|
|
current_time = datetime.now()
|
|
|
|
# Ensure we have exactly 120 features
|
|
if len(bom_features) != self.bom_feature_count:
|
|
if len(bom_features) > self.bom_feature_count:
|
|
bom_features = bom_features[:self.bom_feature_count]
|
|
else:
|
|
bom_features.extend([0.0] * (self.bom_feature_count - len(bom_features)))
|
|
|
|
# Convert to numpy array for efficient storage
|
|
bom_array = np.array(bom_features, dtype=np.float32)
|
|
|
|
# Add timestamp and features to cache
|
|
with self.data_lock:
|
|
self.bom_data_cache[symbol].append((current_time, bom_array))
|
|
|
|
logger.debug(f"Updated BOM cache for {symbol}: {len(self.bom_data_cache[symbol])} timestamps cached")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating BOM cache for {symbol}: {e}")
|
|
|
|
def get_bom_matrix_for_cnn(self, symbol: str, sequence_length: int = 50) -> Optional[np.ndarray]:
|
|
"""
|
|
Get BOM matrix for CNN input from cached 1s data
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
sequence_length: Required sequence length (default 50)
|
|
|
|
Returns:
|
|
np.ndarray: BOM matrix of shape (sequence_length, 120) or None if insufficient data
|
|
"""
|
|
try:
|
|
with self.data_lock:
|
|
if symbol not in self.bom_data_cache or len(self.bom_data_cache[symbol]) == 0:
|
|
logger.warning(f"No BOM data cached for {symbol}")
|
|
return None
|
|
|
|
# Get recent data
|
|
cached_data = list(self.bom_data_cache[symbol])
|
|
|
|
if len(cached_data) < sequence_length:
|
|
logger.warning(f"Insufficient BOM data for {symbol}: {len(cached_data)} < {sequence_length}")
|
|
# Pad with zeros if we don't have enough data
|
|
bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32)
|
|
|
|
# Fill available data at the end
|
|
for i, (timestamp, features) in enumerate(cached_data):
|
|
if i < sequence_length:
|
|
bom_matrix[sequence_length - len(cached_data) + i] = features
|
|
|
|
return bom_matrix
|
|
|
|
# Take the most recent sequence_length samples
|
|
recent_data = cached_data[-sequence_length:]
|
|
|
|
# Create matrix
|
|
bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32)
|
|
for i, (timestamp, features) in enumerate(recent_data):
|
|
bom_matrix[i] = features
|
|
|
|
logger.debug(f"Retrieved BOM matrix for {symbol}: shape={bom_matrix.shape}")
|
|
return bom_matrix
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting BOM matrix for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_real_bom_features(self, symbol: str) -> Optional[List[float]]:
|
|
"""
|
|
Get REAL BOM features from actual market data ONLY
|
|
|
|
NO SYNTHETIC DATA - Returns None if real data is not available
|
|
"""
|
|
try:
|
|
# Try to get real COB data from integration
|
|
if hasattr(self, 'cob_integration') and self.cob_integration:
|
|
return self._extract_real_bom_features(symbol, self.cob_integration)
|
|
|
|
# No real data available - return None instead of synthetic
|
|
logger.warning(f"No real BOM data available for {symbol} - waiting for real market data")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting real BOM features for {symbol}: {e}")
|
|
return None
|
|
|
|
def start_bom_cache_updates(self, cob_integration=None):
|
|
"""
|
|
Start background updates of BOM cache every second
|
|
|
|
Args:
|
|
cob_integration: Optional COB integration instance for real data
|
|
"""
|
|
try:
|
|
def update_loop():
|
|
while self.is_streaming:
|
|
try:
|
|
for symbol in self.symbols:
|
|
if cob_integration:
|
|
# Try to get real BOM features from COB integration
|
|
try:
|
|
bom_features = self._extract_real_bom_features(symbol, cob_integration)
|
|
if bom_features:
|
|
self.update_bom_cache(symbol, bom_features, cob_integration)
|
|
else:
|
|
# NO SYNTHETIC FALLBACK - Wait for real data
|
|
logger.warning(f"No real BOM features available for {symbol} - waiting for real data")
|
|
except Exception as e:
|
|
logger.warning(f"Error getting real BOM features for {symbol}: {e}")
|
|
logger.warning(f"Waiting for real data instead of using synthetic")
|
|
else:
|
|
# NO SYNTHETIC FEATURES - Wait for real COB integration
|
|
logger.warning(f"No COB integration available for {symbol} - waiting for real data")
|
|
|
|
time.sleep(1.0) # Update every second
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in BOM cache update loop: {e}")
|
|
time.sleep(5.0) # Wait longer on error
|
|
|
|
# Start background thread
|
|
bom_thread = Thread(target=update_loop, daemon=True)
|
|
bom_thread.start()
|
|
|
|
logger.info("Started BOM cache updates (1s resolution)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting BOM cache updates: {e}")
|
|
|
|
def _extract_real_bom_features(self, symbol: str, cob_integration) -> Optional[List[float]]:
|
|
"""Extract real BOM features from COB integration"""
|
|
try:
|
|
features = []
|
|
|
|
# Get consolidated order book
|
|
if hasattr(cob_integration, 'get_consolidated_orderbook'):
|
|
cob_snapshot = cob_integration.get_consolidated_orderbook(symbol)
|
|
if cob_snapshot:
|
|
# Extract order book features (40 features)
|
|
features.extend(self._extract_orderbook_features(cob_snapshot))
|
|
else:
|
|
features.extend([0.0] * 40)
|
|
else:
|
|
features.extend([0.0] * 40)
|
|
|
|
# Get volume profile features (30 features)
|
|
if hasattr(cob_integration, 'get_session_volume_profile'):
|
|
volume_profile = cob_integration.get_session_volume_profile(symbol)
|
|
if volume_profile:
|
|
features.extend(self._extract_volume_profile_features(volume_profile))
|
|
else:
|
|
features.extend([0.0] * 30)
|
|
else:
|
|
features.extend([0.0] * 30)
|
|
|
|
# Add flow and microstructure features (50 features)
|
|
features.extend(self._extract_flow_microstructure_features(symbol, cob_integration))
|
|
|
|
# Ensure exactly 120 features
|
|
if len(features) > 120:
|
|
features = features[:120]
|
|
elif len(features) < 120:
|
|
features.extend([0.0] * (120 - len(features)))
|
|
|
|
return features
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting real BOM features for {symbol}: {e}")
|
|
return None
|
|
|
|
def _extract_orderbook_features(self, cob_snapshot) -> List[float]:
|
|
"""Extract order book features from COB snapshot"""
|
|
features = []
|
|
|
|
try:
|
|
# Top 10 bid levels
|
|
for i in range(10):
|
|
if i < len(cob_snapshot.consolidated_bids):
|
|
level = cob_snapshot.consolidated_bids[i]
|
|
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
|
|
volume_normalized = level.total_volume_usd / 1000000
|
|
features.extend([price_offset, volume_normalized])
|
|
else:
|
|
features.extend([0.0, 0.0])
|
|
|
|
# Top 10 ask levels
|
|
for i in range(10):
|
|
if i < len(cob_snapshot.consolidated_asks):
|
|
level = cob_snapshot.consolidated_asks[i]
|
|
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
|
|
volume_normalized = level.total_volume_usd / 1000000
|
|
features.extend([price_offset, volume_normalized])
|
|
else:
|
|
features.extend([0.0, 0.0])
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting order book features: {e}")
|
|
features = [0.0] * 40
|
|
|
|
return features[:40]
|
|
|
|
def _extract_volume_profile_features(self, volume_profile) -> List[float]:
|
|
"""Extract volume profile features"""
|
|
features = []
|
|
|
|
try:
|
|
if 'data' in volume_profile:
|
|
svp_data = volume_profile['data']
|
|
top_levels = sorted(svp_data, key=lambda x: x.get('total_volume', 0), reverse=True)[:10]
|
|
|
|
for level in top_levels:
|
|
buy_percent = level.get('buy_percent', 50.0) / 100.0
|
|
sell_percent = level.get('sell_percent', 50.0) / 100.0
|
|
total_volume = level.get('total_volume', 0.0) / 1000000
|
|
features.extend([buy_percent, sell_percent, total_volume])
|
|
|
|
# Pad to 30 features
|
|
while len(features) < 30:
|
|
features.extend([0.5, 0.5, 0.0])
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting volume profile features: {e}")
|
|
features = [0.0] * 30
|
|
|
|
return features[:30]
|
|
|
|
def _extract_flow_microstructure_features(self, symbol: str, cob_integration) -> List[float]:
|
|
"""Extract flow and microstructure features"""
|
|
try:
|
|
# For now, return synthetic features since full implementation would be complex
|
|
# NO SYNTHETIC DATA - Return None if no real microstructure data
|
|
logger.warning(f"No real microstructure data available for {symbol}")
|
|
return None
|
|
except:
|
|
return [0.0] * 50
|
|
|
|
def _handle_rate_limit(self, url: str):
|
|
"""Handle rate limiting with exponential backoff"""
|
|
current_time = time.time()
|
|
|
|
# Check if we need to wait
|
|
if url in self.last_request_time:
|
|
time_since_last = current_time - self.last_request_time[url]
|
|
if time_since_last < self.request_interval:
|
|
sleep_time = self.request_interval - time_since_last
|
|
logger.info(f"Rate limiting: sleeping {sleep_time:.2f}s")
|
|
time.sleep(sleep_time)
|
|
|
|
self.last_request_time[url] = time.time()
|
|
|
|
def _make_request_with_retry(self, url: str, params: dict = None):
|
|
"""Make HTTP request with retry logic for 451 errors"""
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
self._handle_rate_limit(url)
|
|
response = requests.get(url, params=params, timeout=30)
|
|
|
|
if response.status_code == 451:
|
|
logger.warning(f"Rate limit hit (451), attempt {attempt + 1}/{self.max_retries}")
|
|
if attempt < self.max_retries - 1:
|
|
sleep_time = self.retry_delay * (2 ** attempt) # Exponential backoff
|
|
logger.info(f"Waiting {sleep_time}s before retry...")
|
|
time.sleep(sleep_time)
|
|
continue
|
|
else:
|
|
logger.error("Max retries reached, using cached data")
|
|
return None
|
|
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Request failed (attempt {attempt + 1}): {e}")
|
|
if attempt < self.max_retries - 1:
|
|
time.sleep(5 * (attempt + 1))
|
|
|
|
return None
|
|
# ===== CENTRALIZED DATA COLLECTION METHODS =====
|
|
|
|
def start_centralized_data_collection(self):
|
|
"""Start all centralized data collection processes"""
|
|
logger.info("Starting centralized data collection for all models and dashboard")
|
|
|
|
# Start COB data collection
|
|
self.start_cob_data_collection()
|
|
|
|
# Start training data collection
|
|
self.start_training_data_collection()
|
|
|
|
logger.info("All centralized data collection processes started")
|
|
|
|
def stop_centralized_data_collection(self):
|
|
"""Stop all centralized data collection processes"""
|
|
logger.info("Stopping centralized data collection")
|
|
|
|
# Stop COB collection
|
|
self.cob_collection_active = False
|
|
if self.cob_collection_thread and self.cob_collection_thread.is_alive():
|
|
self.cob_collection_thread.join(timeout=5)
|
|
|
|
# Stop training data collection
|
|
self.training_data_collection_active = False
|
|
if self.training_data_thread and self.training_data_thread.is_alive():
|
|
self.training_data_thread.join(timeout=5)
|
|
|
|
logger.info("Centralized data collection stopped")
|
|
|
|
def start_cob_data_collection(self):
|
|
"""Start COB (Consolidated Order Book) data collection"""
|
|
if self.cob_collection_active:
|
|
logger.warning("COB data collection already active")
|
|
return
|
|
|
|
self.cob_collection_active = True
|
|
self.cob_collection_thread = Thread(target=self._cob_collection_worker, daemon=True)
|
|
self.cob_collection_thread.start()
|
|
logger.info("COB data collection started")
|
|
|
|
def _cob_collection_worker(self):
|
|
"""Worker thread for COB data collection"""
|
|
import requests
|
|
import time
|
|
import threading
|
|
|
|
logger.info("COB data collection worker started")
|
|
|
|
# Use separate threads for each symbol to achieve higher update frequency
|
|
def collect_symbol_data(symbol):
|
|
while self.cob_collection_active:
|
|
try:
|
|
self._collect_cob_data_for_symbol(symbol)
|
|
# Sleep for a very short time to achieve ~120 updates/sec across all symbols
|
|
# With 2 symbols, each can update at ~60/sec
|
|
time.sleep(0.016) # ~60 updates per second per symbol
|
|
except Exception as e:
|
|
logger.error(f"Error collecting COB data for {symbol}: {e}")
|
|
time.sleep(1) # Short recovery time
|
|
|
|
# Start a thread for each symbol
|
|
threads = []
|
|
for symbol in self.symbols:
|
|
thread = threading.Thread(target=collect_symbol_data, args=(symbol,), daemon=True)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
# Keep the main thread alive
|
|
while self.cob_collection_active:
|
|
time.sleep(1)
|
|
|
|
# Join threads when collection is stopped
|
|
for thread in threads:
|
|
thread.join(timeout=1)
|
|
|
|
def _collect_cob_data_for_symbol(self, symbol: str):
|
|
"""Collect COB data for a specific symbol using Binance REST API"""
|
|
try:
|
|
import requests
|
|
|
|
# Convert symbol format
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Get order book data
|
|
url = f"https://api.binance.com/api/v3/depth"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'limit': 100 # Get top 100 levels
|
|
}
|
|
|
|
response = requests.get(url, params=params, timeout=5)
|
|
if response.status_code == 200:
|
|
order_book = response.json()
|
|
|
|
# Process and cache the data
|
|
cob_snapshot = self._process_order_book_data(symbol, order_book)
|
|
|
|
# Store in cache (ensure cache exists)
|
|
if binance_symbol not in self.cob_data_cache:
|
|
self.cob_data_cache[binance_symbol] = deque(maxlen=300)
|
|
|
|
self.cob_data_cache[binance_symbol].append(cob_snapshot)
|
|
|
|
# Distribute to COB data subscribers
|
|
self._distribute_cob_data(symbol, cob_snapshot)
|
|
|
|
else:
|
|
logger.debug(f"Failed to fetch COB data for {symbol}: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error collecting COB data for {symbol}: {e}")
|
|
|
|
def _process_order_book_data(self, symbol: str, order_book: dict) -> dict:
|
|
"""Process raw order book data into structured COB snapshot with multi-timeframe imbalance metrics"""
|
|
try:
|
|
bids = [[float(price), float(qty)] for price, qty in order_book.get('bids', [])]
|
|
asks = [[float(price), float(qty)] for price, qty in order_book.get('asks', [])]
|
|
|
|
# Calculate statistics
|
|
total_bid_volume = sum(qty for _, qty in bids)
|
|
total_ask_volume = sum(qty for _, qty in asks)
|
|
|
|
best_bid = bids[0][0] if bids else 0
|
|
best_ask = asks[0][0] if asks else 0
|
|
mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0
|
|
spread = best_ask - best_bid if best_bid and best_ask else 0
|
|
spread_bps = (spread / mid_price * 10000) if mid_price > 0 else 0
|
|
|
|
# Calculate current imbalance
|
|
imbalance = (total_bid_volume - total_ask_volume) / (total_bid_volume + total_ask_volume) if (total_bid_volume + total_ask_volume) > 0 else 0
|
|
|
|
# Calculate multi-timeframe imbalances
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
# Initialize imbalance metrics
|
|
imbalance_1s = imbalance # Current imbalance is 1s
|
|
imbalance_5s = imbalance # Default to current if not enough history
|
|
imbalance_15s = imbalance
|
|
imbalance_60s = imbalance
|
|
|
|
# Calculate historical imbalances if we have enough data
|
|
if binance_symbol in self.cob_data_cache:
|
|
cache = self.cob_data_cache[binance_symbol]
|
|
now = datetime.now()
|
|
|
|
# Get snapshots for different timeframes
|
|
snapshots_5s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 5]
|
|
snapshots_15s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 15]
|
|
snapshots_60s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 60]
|
|
|
|
# Calculate imbalances for each timeframe
|
|
if snapshots_5s:
|
|
bid_vol_5s = sum(s['stats']['bid_liquidity'] for s in snapshots_5s)
|
|
ask_vol_5s = sum(s['stats']['ask_liquidity'] for s in snapshots_5s)
|
|
total_vol_5s = bid_vol_5s + ask_vol_5s
|
|
imbalance_5s = (bid_vol_5s - ask_vol_5s) / total_vol_5s if total_vol_5s > 0 else 0
|
|
|
|
if snapshots_15s:
|
|
bid_vol_15s = sum(s['stats']['bid_liquidity'] for s in snapshots_15s)
|
|
ask_vol_15s = sum(s['stats']['ask_liquidity'] for s in snapshots_15s)
|
|
total_vol_15s = bid_vol_15s + ask_vol_15s
|
|
imbalance_15s = (bid_vol_15s - ask_vol_15s) / total_vol_15s if total_vol_15s > 0 else 0
|
|
|
|
if snapshots_60s:
|
|
bid_vol_60s = sum(s['stats']['bid_liquidity'] for s in snapshots_60s)
|
|
ask_vol_60s = sum(s['stats']['ask_liquidity'] for s in snapshots_60s)
|
|
total_vol_60s = bid_vol_60s + ask_vol_60s
|
|
imbalance_60s = (bid_vol_60s - ask_vol_60s) / total_vol_60s if total_vol_60s > 0 else 0
|
|
|
|
cob_snapshot = {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now(),
|
|
'bids': bids[:20], # Top 20 levels
|
|
'asks': asks[:20], # Top 20 levels
|
|
'stats': {
|
|
'best_bid': best_bid,
|
|
'best_ask': best_ask,
|
|
'mid_price': mid_price,
|
|
'spread': spread,
|
|
'spread_bps': spread_bps,
|
|
'bid_liquidity': total_bid_volume,
|
|
'ask_liquidity': total_ask_volume,
|
|
'total_liquidity': total_bid_volume + total_ask_volume,
|
|
'imbalance': imbalance,
|
|
'imbalance_1s': imbalance_1s,
|
|
'imbalance_5s': imbalance_5s,
|
|
'imbalance_15s': imbalance_15s,
|
|
'imbalance_60s': imbalance_60s,
|
|
'levels': len(bids) + len(asks)
|
|
}
|
|
}
|
|
|
|
return cob_snapshot
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing order book data for {symbol}: {e}")
|
|
return {}
|
|
|
|
def start_training_data_collection(self):
|
|
"""Start training data collection for models"""
|
|
if self.training_data_collection_active:
|
|
logger.warning("Training data collection already active")
|
|
return
|
|
|
|
self.training_data_collection_active = True
|
|
self.training_data_thread = Thread(target=self._training_data_collection_worker, daemon=True)
|
|
self.training_data_thread.start()
|
|
logger.info("Training data collection started")
|
|
|
|
def _training_data_collection_worker(self):
|
|
"""Worker thread for training data collection"""
|
|
import time
|
|
|
|
logger.info("Training data collection worker started")
|
|
|
|
while self.training_data_collection_active:
|
|
try:
|
|
# Collect training data for all symbols
|
|
for symbol in self.symbols:
|
|
training_sample = self._collect_training_sample(symbol)
|
|
if training_sample:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
self.training_data_cache[binance_symbol].append(training_sample)
|
|
|
|
# Distribute to training data subscribers
|
|
self._distribute_training_data(symbol, training_sample)
|
|
|
|
# Sleep for 5 seconds between collections
|
|
time.sleep(5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in training data collection worker: {e}")
|
|
time.sleep(10) # Wait longer on error
|
|
|
|
def _collect_training_sample(self, symbol: str) -> Optional[dict]:
|
|
"""Collect a training sample for a specific symbol"""
|
|
try:
|
|
# Get recent market data
|
|
recent_data = self.get_historical_data(symbol, '1m', limit=100)
|
|
if recent_data is None or len(recent_data) < 50:
|
|
return None
|
|
|
|
# Get recent ticks
|
|
recent_ticks = self.get_recent_ticks(symbol, count=100)
|
|
if len(recent_ticks) < 10:
|
|
return None
|
|
|
|
# Get COB data
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
recent_cob = list(self.cob_data_cache.get(binance_symbol, []))[-10:] if binance_symbol in self.cob_data_cache else []
|
|
|
|
# Create training sample
|
|
training_sample = {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now(),
|
|
'ohlcv_data': recent_data.tail(50).to_dict('records'),
|
|
'tick_data': [
|
|
{
|
|
'price': tick.price,
|
|
'volume': tick.volume,
|
|
'timestamp': tick.timestamp
|
|
} for tick in recent_ticks[-50:]
|
|
],
|
|
'cob_data': recent_cob,
|
|
'features': self._extract_training_features(symbol, recent_data, recent_ticks, recent_cob)
|
|
}
|
|
|
|
return training_sample
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting training sample for {symbol}: {e}")
|
|
return None
|
|
|
|
def _extract_training_features(self, symbol: str, ohlcv_data: pd.DataFrame,
|
|
recent_ticks: List[MarketTick], recent_cob: List[dict]) -> dict:
|
|
"""Extract features for training from various data sources"""
|
|
try:
|
|
features = {}
|
|
|
|
# OHLCV features
|
|
if len(ohlcv_data) > 0:
|
|
latest = ohlcv_data.iloc[-1]
|
|
features.update({
|
|
'price': latest['close'],
|
|
'volume': latest['volume'],
|
|
'price_change_1m': (latest['close'] - ohlcv_data.iloc[-2]['close']) / ohlcv_data.iloc[-2]['close'] if len(ohlcv_data) > 1 else 0,
|
|
'volume_ratio': latest['volume'] / ohlcv_data['volume'].mean() if len(ohlcv_data) > 1 else 1,
|
|
'volatility': ohlcv_data['close'].pct_change().std() if len(ohlcv_data) > 1 else 0
|
|
})
|
|
|
|
# Tick features
|
|
if recent_ticks:
|
|
tick_prices = [tick.price for tick in recent_ticks]
|
|
tick_volumes = [tick.volume for tick in recent_ticks]
|
|
features.update({
|
|
'tick_price_std': np.std(tick_prices) if len(tick_prices) > 1 else 0,
|
|
'tick_volume_mean': np.mean(tick_volumes),
|
|
'tick_count': len(recent_ticks)
|
|
})
|
|
|
|
# COB features
|
|
if recent_cob:
|
|
latest_cob = recent_cob[-1]
|
|
if 'stats' in latest_cob:
|
|
stats = latest_cob['stats']
|
|
features.update({
|
|
'spread_bps': stats.get('spread_bps', 0),
|
|
'imbalance': stats.get('imbalance', 0),
|
|
'liquidity': stats.get('total_liquidity', 0),
|
|
'cob_levels': stats.get('levels', 0)
|
|
})
|
|
|
|
return features
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting training features for {symbol}: {e}")
|
|
return {}
|
|
|
|
# ===== SUBSCRIPTION METHODS FOR MODELS AND DASHBOARD =====
|
|
|
|
def subscribe_to_cob_data(self, callback: Callable[[str, dict], None]) -> str:
|
|
"""Subscribe to COB data updates"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.cob_data_callbacks.append(callback)
|
|
logger.info(f"COB data subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def subscribe_to_training_data(self, callback: Callable[[str, dict], None]) -> str:
|
|
"""Subscribe to training data updates"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.training_data_callbacks.append(callback)
|
|
logger.info(f"Training data subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def subscribe_to_model_predictions(self, callback: Callable[[str, dict], None]) -> str:
|
|
"""Subscribe to model prediction updates"""
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.model_prediction_callbacks.append(callback)
|
|
logger.info(f"Model prediction subscriber added: {subscriber_id}")
|
|
return subscriber_id
|
|
|
|
def _distribute_cob_data(self, symbol: str, cob_snapshot: dict):
|
|
"""Distribute COB data to all subscribers"""
|
|
for callback in self.cob_data_callbacks:
|
|
try:
|
|
Thread(target=lambda: callback(symbol, cob_snapshot), daemon=True).start()
|
|
except Exception as e:
|
|
logger.error(f"Error distributing COB data: {e}")
|
|
|
|
def _distribute_training_data(self, symbol: str, training_sample: dict):
|
|
"""Distribute training data to all subscribers"""
|
|
for callback in self.training_data_callbacks:
|
|
try:
|
|
Thread(target=lambda: callback(symbol, training_sample), daemon=True).start()
|
|
except Exception as e:
|
|
logger.error(f"Error distributing training data: {e}")
|
|
|
|
def _distribute_model_predictions(self, symbol: str, prediction: dict):
|
|
"""Distribute model predictions to all subscribers"""
|
|
for callback in self.model_prediction_callbacks:
|
|
try:
|
|
Thread(target=lambda: callback(symbol, prediction), daemon=True).start()
|
|
except Exception as e:
|
|
logger.error(f"Error distributing model prediction: {e}")
|
|
|
|
# ===== DATA ACCESS METHODS FOR MODELS AND DASHBOARD =====
|
|
|
|
def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]:
|
|
"""Get recent COB data for a symbol"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
if binance_symbol in self.cob_data_cache:
|
|
return list(self.cob_data_cache[binance_symbol])[-count:]
|
|
return []
|
|
|
|
def get_training_data(self, symbol: str, count: int = 100) -> List[dict]:
|
|
"""Get recent training data for a symbol"""
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
if binance_symbol in self.training_data_cache:
|
|
return list(self.training_data_cache[binance_symbol])[-count:]
|
|
return []
|
|
|
|
def collect_cob_data(self, symbol: str) -> dict:
|
|
"""
|
|
Collect Consolidated Order Book (COB) data for a symbol using REST API
|
|
|
|
This centralized method collects COB data for all consumers (models, dashboard, etc.)
|
|
"""
|
|
try:
|
|
import requests
|
|
import time
|
|
|
|
# Use Binance REST API for order book data
|
|
binance_symbol = symbol.replace('/', '')
|
|
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500"
|
|
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
# Process order book data
|
|
bids = [[float(price), float(qty)] for price, qty in data.get('bids', [])]
|
|
asks = [[float(price), float(qty)] for price, qty in data.get('asks', [])]
|
|
|
|
# Calculate mid price
|
|
best_bid = bids[0][0] if bids else 0
|
|
best_ask = asks[0][0] if asks else 0
|
|
mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0
|
|
|
|
# Calculate order book stats
|
|
bid_liquidity = sum(qty for _, qty in bids[:20])
|
|
ask_liquidity = sum(qty for _, qty in asks[:20])
|
|
total_liquidity = bid_liquidity + ask_liquidity
|
|
|
|
# Calculate imbalance
|
|
imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
|
|
|
# Calculate spread in basis points
|
|
spread = (best_ask - best_bid) / mid_price * 10000 if mid_price > 0 else 0
|
|
|
|
# Create COB snapshot
|
|
cob_snapshot = {
|
|
'symbol': symbol,
|
|
'timestamp': int(time.time() * 1000),
|
|
'bids': bids[:50], # Limit to top 50 levels
|
|
'asks': asks[:50], # Limit to top 50 levels
|
|
'stats': {
|
|
'mid_price': mid_price,
|
|
'best_bid': best_bid,
|
|
'best_ask': best_ask,
|
|
'bid_liquidity': bid_liquidity,
|
|
'ask_liquidity': ask_liquidity,
|
|
'total_liquidity': total_liquidity,
|
|
'imbalance': imbalance,
|
|
'spread_bps': spread
|
|
}
|
|
}
|
|
|
|
# Store in cache
|
|
with self.subscriber_lock:
|
|
if not hasattr(self, 'cob_data_cache'):
|
|
self.cob_data_cache = {}
|
|
|
|
if symbol not in self.cob_data_cache:
|
|
self.cob_data_cache[symbol] = []
|
|
|
|
# Add to cache with max size limit
|
|
self.cob_data_cache[symbol].append(cob_snapshot)
|
|
if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes of 1s data
|
|
self.cob_data_cache[symbol].pop(0)
|
|
|
|
# Notify subscribers
|
|
self._notify_cob_subscribers(symbol, cob_snapshot)
|
|
|
|
return cob_snapshot
|
|
else:
|
|
logger.warning(f"Failed to fetch COB data for {symbol}: {response.status_code}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error collecting COB data for {symbol}: {e}")
|
|
return {}
|
|
|
|
def start_cob_collection(self):
|
|
"""
|
|
Start COB data collection in background thread
|
|
"""
|
|
try:
|
|
import threading
|
|
import time
|
|
|
|
def cob_collector():
|
|
"""Collect COB data using REST API calls"""
|
|
logger.info("Starting centralized COB data collection")
|
|
while True:
|
|
try:
|
|
# Collect data for both symbols
|
|
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
|
self.collect_cob_data(symbol)
|
|
|
|
# Sleep for 1 second between collections
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
logger.debug(f"Error in COB collection: {e}")
|
|
time.sleep(5) # Wait longer on error
|
|
|
|
# Start collector in background thread
|
|
if not hasattr(self, '_cob_thread_started') or not self._cob_thread_started:
|
|
cob_thread = threading.Thread(target=cob_collector, daemon=True)
|
|
cob_thread.start()
|
|
self._cob_thread_started = True
|
|
logger.info("Centralized COB data collection started")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting COB collection: {e}")
|
|
|
|
def _notify_cob_subscribers(self, symbol: str, cob_snapshot: dict):
|
|
"""Notify subscribers of new COB data"""
|
|
with self.subscriber_lock:
|
|
if not hasattr(self, 'cob_subscribers'):
|
|
self.cob_subscribers = {}
|
|
|
|
# Notify all subscribers for this symbol
|
|
for subscriber_id, callback in self.cob_subscribers.items():
|
|
try:
|
|
callback(symbol, cob_snapshot)
|
|
except Exception as e:
|
|
logger.debug(f"Error notifying COB subscriber {subscriber_id}: {e}")
|
|
|
|
def subscribe_to_cob(self, callback) -> str:
|
|
"""Subscribe to COB data updates"""
|
|
with self.subscriber_lock:
|
|
if not hasattr(self, 'cob_subscribers'):
|
|
self.cob_subscribers = {}
|
|
|
|
subscriber_id = str(uuid.uuid4())
|
|
self.cob_subscribers[subscriber_id] = callback
|
|
|
|
# Start collection if not already started
|
|
self.start_cob_collection()
|
|
|
|
return subscriber_id
|
|
|
|
def get_latest_cob_data(self, symbol: str) -> dict:
|
|
"""Get latest COB data for a symbol"""
|
|
with self.subscriber_lock:
|
|
# Convert symbol to Binance format for cache lookup
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
logger.debug(f"Getting COB data for {symbol} (binance: {binance_symbol})")
|
|
|
|
if not hasattr(self, 'cob_data_cache'):
|
|
logger.debug("COB data cache not initialized")
|
|
return {}
|
|
|
|
if binance_symbol not in self.cob_data_cache:
|
|
logger.debug(f"Symbol {binance_symbol} not in COB cache. Available: {list(self.cob_data_cache.keys())}")
|
|
return {}
|
|
|
|
if not self.cob_data_cache[binance_symbol]:
|
|
logger.debug(f"COB cache for {binance_symbol} is empty")
|
|
return {}
|
|
|
|
latest_data = self.cob_data_cache[binance_symbol][-1]
|
|
logger.debug(f"Latest COB data type for {binance_symbol}: {type(latest_data)}")
|
|
return latest_data
|
|
|
|
def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]:
|
|
"""Get recent COB data for a symbol"""
|
|
with self.subscriber_lock:
|
|
# Convert symbol to Binance format for cache lookup
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
|
|
if not hasattr(self, 'cob_data_cache') or binance_symbol not in self.cob_data_cache:
|
|
return []
|
|
|
|
# Return the most recent 'count' snapshots
|
|
return list(self.cob_data_cache[binance_symbol])[-count:]
|
|
|
|
def get_data_summary(self) -> dict:
|
|
"""Get summary of all collected data"""
|
|
summary = {
|
|
'symbols': self.symbols,
|
|
'subscribers': {
|
|
'tick_subscribers': len(self.subscribers),
|
|
'cob_subscribers': len(self.cob_data_callbacks),
|
|
'training_subscribers': len(self.training_data_callbacks),
|
|
'prediction_subscribers': len(self.model_prediction_callbacks)
|
|
},
|
|
'data_counts': {},
|
|
'collection_status': {
|
|
'cob_collection': self.cob_collection_active,
|
|
'training_collection': self.training_data_collection_active,
|
|
'streaming': self.is_streaming
|
|
}
|
|
}
|
|
|
|
# Add data counts for each symbol
|
|
for symbol in self.symbols:
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
summary['data_counts'][symbol] = {
|
|
'ticks': len(self.tick_buffers.get(binance_symbol, [])),
|
|
'cob_snapshots': len(self.cob_data_cache.get(binance_symbol, [])),
|
|
'training_samples': len(self.training_data_cache.get(binance_symbol, []))
|
|
}
|
|
|
|
return summary |