williams data structure in data provider

This commit is contained in:
Dobromir Popov
2025-05-31 00:26:05 +03:00
parent 0331bbfa7c
commit 7a0e468c3e
4 changed files with 960 additions and 76 deletions

View File

@ -7,6 +7,8 @@ This module consolidates all data functionality including:
- Multi-timeframe candle generation
- Caching and data management
- Technical indicators calculation
- Williams Market Structure pivot points with monthly data analysis
- Pivot-based feature normalization for improved model training
- Centralized data distribution to multiple subscribers (AI models, dashboard, etc.)
"""
@ -20,6 +22,7 @@ import websockets
import requests
import pandas as pd
import numpy as np
import pickle
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Callable
@ -33,6 +36,44 @@ from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
logger = logging.getLogger(__name__)
@dataclass
class PivotBounds:
"""Pivot-based normalization bounds derived from Williams Market Structure"""
symbol: str
price_max: float
price_min: float
volume_max: float
volume_min: float
pivot_support_levels: List[float]
pivot_resistance_levels: List[float]
pivot_context: Dict[str, Any]
created_timestamp: datetime
data_period_start: datetime
data_period_end: datetime
total_candles_analyzed: int
def get_price_range(self) -> float:
"""Get price range for normalization"""
return self.price_max - self.price_min
def normalize_price(self, price: float) -> float:
"""Normalize price using pivot bounds"""
return (price - self.price_min) / self.get_price_range()
def get_nearest_support_distance(self, current_price: float) -> float:
"""Get distance to nearest support level (normalized)"""
if not self.pivot_support_levels:
return 0.5
distances = [abs(current_price - s) for s in self.pivot_support_levels]
return min(distances) / self.get_price_range()
def get_nearest_resistance_distance(self, current_price: float) -> float:
"""Get distance to nearest resistance level (normalized)"""
if not self.pivot_resistance_levels:
return 0.5
distances = [abs(current_price - r) for r in self.pivot_resistance_levels]
return min(distances) / self.get_price_range()
@dataclass
class MarketTick:
"""Standardized market tick data structure"""
@ -66,11 +107,24 @@ class DataProvider:
self.symbols = symbols or self.config.symbols
self.timeframes = timeframes or self.config.timeframes
# Cache settings (initialize first)
self.cache_enabled = self.config.data.get('cache_enabled', True)
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Data storage
self.historical_data = {} # {symbol: {timeframe: DataFrame}}
self.real_time_data = {} # {symbol: {timeframe: deque}}
self.current_prices = {} # {symbol: float}
# Pivot-based normalization system
self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds}
self.pivot_cache_dir = self.cache_dir / 'pivot_bounds'
self.pivot_cache_dir.mkdir(parents=True, exist_ok=True)
self.pivot_refresh_interval = timedelta(days=1) # Refresh pivot bounds daily
self.monthly_data_cache_dir = self.cache_dir / 'monthly_1s_data'
self.monthly_data_cache_dir.mkdir(parents=True, exist_ok=True)
# Real-time processing
self.websocket_tasks = {}
self.is_streaming = False
@ -111,20 +165,19 @@ class DataProvider:
self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols}
self.price_change_threshold = 0.1 # 10% price change threshold for validation
# Cache settings
self.cache_enabled = self.config.data.get('cache_enabled', True)
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Timeframe conversion
self.timeframe_seconds = {
'1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800,
'1h': 3600, '4h': 14400, '1d': 86400
}
# Load existing pivot bounds from cache
self._load_all_pivot_bounds()
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
logger.info(f"Timeframes: {self.timeframes}")
logger.info("Centralized data distribution enabled")
logger.info("Pivot-based normalization system enabled")
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
"""Get historical OHLCV data for a symbol and timeframe"""
@ -449,7 +502,7 @@ class DataProvider:
return None
def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add comprehensive technical indicators for multi-timeframe analysis"""
"""Add comprehensive technical indicators AND pivot-based normalization context"""
try:
df = df.copy()
@ -458,7 +511,7 @@ class DataProvider:
logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows")
return self._add_basic_indicators(df)
# === TREND INDICATORS ===
# === EXISTING TECHNICAL INDICATORS ===
# Moving averages (multiple timeframes)
df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10)
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
@ -568,17 +621,516 @@ class DataProvider:
# Volatility regime
df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True)
# === WILLIAMS MARKET STRUCTURE PIVOT CONTEXT ===
# Check if we need to refresh pivot bounds for this symbol
symbol = self._extract_symbol_from_dataframe(df)
if symbol and self._should_refresh_pivot_bounds(symbol):
logger.info(f"Refreshing pivot bounds for {symbol}")
self._refresh_pivot_bounds_for_symbol(symbol)
# Add pivot-based context features
if symbol and symbol in self.pivot_bounds:
df = self._add_pivot_context_features(df, symbol)
# === FILL NaN VALUES ===
# Forward fill first, then backward fill, then zero fill
df = df.ffill().bfill().fillna(0)
logger.debug(f"Added {len([col for col in df.columns if col not in ['timestamp', 'open', 'high', 'low', 'close', 'volume']])} technical indicators")
logger.debug(f"Added technical indicators + pivot context for {len(df)} rows")
return df
except Exception as e:
logger.error(f"Error adding comprehensive technical indicators: {e}")
# Fallback to basic indicators
return self._add_basic_indicators(df)
# === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM ===
def _collect_monthly_1s_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""Collect 1 month of 1s candles using paginated API calls"""
try:
# Check if we have cached monthly data first
cached_monthly_data = self._load_monthly_data_from_cache(symbol)
if cached_monthly_data is not None:
logger.info(f"Using cached monthly 1s data for {symbol}: {len(cached_monthly_data)} candles")
return cached_monthly_data
logger.info(f"Collecting 1 month of 1s data for {symbol}...")
# Calculate time range (30 days)
end_time = datetime.now()
start_time = end_time - timedelta(days=30)
all_candles = []
current_time = end_time
api_calls_made = 0
total_candles_collected = 0
# Binance rate limit: 1200 requests/minute = 20/second
rate_limit_delay = 0.05 # 50ms between requests
while current_time > start_time and api_calls_made < 3000: # Safety limit
try:
# Get 1000 candles working backwards
batch_df = self._fetch_1s_batch_with_endtime(symbol, current_time, limit=1000)
if batch_df is None or batch_df.empty:
logger.warning(f"No data returned for batch ending at {current_time}")
break
api_calls_made += 1
batch_size = len(batch_df)
total_candles_collected += batch_size
# Add batch to collection
all_candles.append(batch_df)
# Update current time to the earliest timestamp in this batch
earliest_time = batch_df['timestamp'].min()
if earliest_time >= current_time:
logger.warning(f"No progress in time collection, breaking")
break
current_time = earliest_time - timedelta(seconds=1)
# Rate limiting
time.sleep(rate_limit_delay)
# Progress logging every 100 requests
if api_calls_made % 100 == 0:
logger.info(f"Progress: {api_calls_made} API calls, {total_candles_collected} candles collected")
# Break if we have enough data (about 2.6M candles for 30 days)
if total_candles_collected >= 2500000: # 30 days * 24 hours * 3600 seconds ≈ 2.6M
logger.info(f"Collected sufficient data: {total_candles_collected} candles")
break
except Exception as e:
logger.error(f"Error in batch collection: {e}")
time.sleep(1) # Wait longer on error
continue
if not all_candles:
logger.error(f"No monthly data collected for {symbol}")
return None
# Combine all batches
logger.info(f"Combining {len(all_candles)} batches...")
monthly_df = pd.concat(all_candles, ignore_index=True)
# Sort by timestamp and remove duplicates
monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
# Filter to exactly 30 days
cutoff_time = end_time - timedelta(days=30)
monthly_df = monthly_df[monthly_df['timestamp'] >= cutoff_time]
logger.info(f"Successfully collected {len(monthly_df)} 1s candles for {symbol} "
f"({api_calls_made} API calls made)")
# Cache the monthly data
self._save_monthly_data_to_cache(symbol, monthly_df)
return monthly_df
except Exception as e:
logger.error(f"Error collecting monthly 1s data for {symbol}: {e}")
return None
def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]:
"""Fetch a batch of 1s candles ending at specific time"""
try:
binance_symbol = symbol.replace('/', '').upper()
# Convert end_time to milliseconds
end_ms = int(end_time.timestamp() * 1000)
# API request
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': binance_symbol,
'interval': '1s',
'endTime': end_ms,
'limit': limit
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if not data:
return None
# Convert to DataFrame
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Process columns
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
# Keep only OHLCV columns
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
return df
except Exception as e:
logger.error(f"Error fetching 1s batch for {symbol}: {e}")
return None
def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]:
"""Extract pivot bounds using Williams Market Structure analysis"""
try:
logger.info(f"Analyzing {len(monthly_data)} candles for pivot extraction...")
# Convert DataFrame to numpy array format expected by Williams Market Structure
ohlcv_array = monthly_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy()
# Convert timestamp to numeric for Williams analysis
ohlcv_array['timestamp'] = ohlcv_array['timestamp'].astype(np.int64) // 10**9 # Convert to seconds
ohlcv_array = ohlcv_array.to_numpy()
# Initialize Williams Market Structure analyzer
try:
from training.williams_market_structure import WilliamsMarketStructure
williams = WilliamsMarketStructure(
swing_strengths=[2, 3, 5, 8], # Multi-strength pivot detection
enable_cnn_feature=False # We just want pivot data, not CNN training
)
# Calculate 5 levels of recursive pivot points
logger.info("Running Williams Market Structure analysis...")
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
except ImportError:
logger.warning("Williams Market Structure not available, using simplified pivot detection")
pivot_levels = self._simple_pivot_detection(monthly_data)
# Extract bounds from pivot analysis
bounds = self._extract_bounds_from_pivot_levels(symbol, monthly_data, pivot_levels)
return bounds
except Exception as e:
logger.error(f"Error extracting pivot bounds for {symbol}: {e}")
return None
def _extract_bounds_from_pivot_levels(self, symbol: str, monthly_data: pd.DataFrame,
pivot_levels: Dict[str, Any]) -> PivotBounds:
"""Extract normalization bounds from Williams pivot levels"""
try:
# Initialize bounds
price_max = monthly_data['high'].max()
price_min = monthly_data['low'].min()
volume_max = monthly_data['volume'].max()
volume_min = monthly_data['volume'].min()
support_levels = []
resistance_levels = []
# Extract pivot points from all Williams levels
for level_key, level_data in pivot_levels.items():
if level_data and hasattr(level_data, 'swing_points') and level_data.swing_points:
# Get prices from swing points
level_prices = [sp.price for sp in level_data.swing_points]
# Update overall price bounds
price_max = max(price_max, max(level_prices))
price_min = min(price_min, min(level_prices))
# Extract support and resistance levels
if hasattr(level_data, 'support_levels') and level_data.support_levels:
support_levels.extend(level_data.support_levels)
if hasattr(level_data, 'resistance_levels') and level_data.resistance_levels:
resistance_levels.extend(level_data.resistance_levels)
# Remove duplicates and sort
support_levels = sorted(list(set(support_levels)))
resistance_levels = sorted(list(set(resistance_levels)))
# Create PivotBounds object
bounds = PivotBounds(
symbol=symbol,
price_max=float(price_max),
price_min=float(price_min),
volume_max=float(volume_max),
volume_min=float(volume_min),
pivot_support_levels=support_levels,
pivot_resistance_levels=resistance_levels,
pivot_context=pivot_levels,
created_timestamp=datetime.now(),
data_period_start=monthly_data['timestamp'].min(),
data_period_end=monthly_data['timestamp'].max(),
total_candles_analyzed=len(monthly_data)
)
logger.info(f"Extracted pivot bounds for {symbol}:")
logger.info(f" Price range: ${bounds.price_min:.2f} - ${bounds.price_max:.2f}")
logger.info(f" Volume range: {bounds.volume_min:.2f} - {bounds.volume_max:.2f}")
logger.info(f" Support levels: {len(bounds.pivot_support_levels)}")
logger.info(f" Resistance levels: {len(bounds.pivot_resistance_levels)}")
return bounds
except Exception as e:
logger.error(f"Error extracting bounds from pivot levels: {e}")
# Fallback to simple min/max bounds
return PivotBounds(
symbol=symbol,
price_max=float(monthly_data['high'].max()),
price_min=float(monthly_data['low'].min()),
volume_max=float(monthly_data['volume'].max()),
volume_min=float(monthly_data['volume'].min()),
pivot_support_levels=[],
pivot_resistance_levels=[],
pivot_context={},
created_timestamp=datetime.now(),
data_period_start=monthly_data['timestamp'].min(),
data_period_end=monthly_data['timestamp'].max(),
total_candles_analyzed=len(monthly_data)
)
def _simple_pivot_detection(self, monthly_data: pd.DataFrame) -> Dict[str, Any]:
"""Simple pivot detection fallback when Williams Market Structure is not available"""
try:
# Simple high/low pivot detection using rolling windows
highs = monthly_data['high']
lows = monthly_data['low']
# Find local maxima and minima using different windows
pivot_highs = []
pivot_lows = []
for window in [5, 10, 20, 50]:
if len(monthly_data) > window * 2:
# Rolling max/min detection
rolling_max = highs.rolling(window=window, center=True).max()
rolling_min = lows.rolling(window=window, center=True).min()
# Find pivot highs (local maxima)
high_pivots = monthly_data[highs == rolling_max]['high'].tolist()
pivot_highs.extend(high_pivots)
# Find pivot lows (local minima)
low_pivots = monthly_data[lows == rolling_min]['low'].tolist()
pivot_lows.extend(low_pivots)
# Create mock level structure
mock_level = type('MockLevel', (), {
'swing_points': [],
'support_levels': list(set(pivot_lows)),
'resistance_levels': list(set(pivot_highs))
})()
return {'level_0': mock_level}
except Exception as e:
logger.error(f"Error in simple pivot detection: {e}")
return {}
def _should_refresh_pivot_bounds(self, symbol: str) -> bool:
"""Check if pivot bounds need refreshing"""
try:
if symbol not in self.pivot_bounds:
return True
bounds = self.pivot_bounds[symbol]
age = datetime.now() - bounds.created_timestamp
return age > self.pivot_refresh_interval
except Exception as e:
logger.error(f"Error checking pivot bounds refresh: {e}")
return True
def _refresh_pivot_bounds_for_symbol(self, symbol: str):
"""Refresh pivot bounds for a specific symbol"""
try:
# Collect monthly 1s data
monthly_data = self._collect_monthly_1s_data(symbol)
if monthly_data is None or monthly_data.empty:
logger.warning(f"Could not collect monthly data for {symbol}")
return
# Extract pivot bounds
bounds = self._extract_pivot_bounds_from_monthly_data(symbol, monthly_data)
if bounds is None:
logger.warning(f"Could not extract pivot bounds for {symbol}")
return
# Store bounds
self.pivot_bounds[symbol] = bounds
# Save to cache
self._save_pivot_bounds_to_cache(symbol, bounds)
logger.info(f"Successfully refreshed pivot bounds for {symbol}")
except Exception as e:
logger.error(f"Error refreshing pivot bounds for {symbol}: {e}")
def _add_pivot_context_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame:
"""Add pivot-derived context features for normalization"""
try:
if symbol not in self.pivot_bounds:
return df
bounds = self.pivot_bounds[symbol]
current_prices = df['close']
# Distance to nearest support/resistance levels (normalized)
df['pivot_support_distance'] = current_prices.apply(bounds.get_nearest_support_distance)
df['pivot_resistance_distance'] = current_prices.apply(bounds.get_nearest_resistance_distance)
# Price position within pivot range (0 = price_min, 1 = price_max)
df['pivot_price_position'] = current_prices.apply(bounds.normalize_price).clip(0, 1)
# Add binary features for proximity to key levels
price_range = bounds.get_price_range()
proximity_threshold = price_range * 0.02 # 2% of price range
df['near_pivot_support'] = 0
df['near_pivot_resistance'] = 0
for price in current_prices:
# Check if near any support level
if any(abs(price - s) <= proximity_threshold for s in bounds.pivot_support_levels):
df.loc[df['close'] == price, 'near_pivot_support'] = 1
# Check if near any resistance level
if any(abs(price - r) <= proximity_threshold for r in bounds.pivot_resistance_levels):
df.loc[df['close'] == price, 'near_pivot_resistance'] = 1
logger.debug(f"Added pivot context features for {symbol}")
return df
except Exception as e:
logger.warning(f"Error adding pivot context features for {symbol}: {e}")
return df
def _extract_symbol_from_dataframe(self, df: pd.DataFrame) -> Optional[str]:
"""Extract symbol from dataframe context (basic implementation)"""
# This is a simple implementation - in a real system, you might pass symbol explicitly
# or store it as metadata in the dataframe
for symbol in self.symbols:
# Check if this dataframe might belong to this symbol based on current processing
return symbol # Return first symbol for now - can be improved
return None
# === PIVOT BOUNDS CACHING ===
def _load_all_pivot_bounds(self):
"""Load all cached pivot bounds on startup"""
try:
for symbol in self.symbols:
bounds = self._load_pivot_bounds_from_cache(symbol)
if bounds:
self.pivot_bounds[symbol] = bounds
logger.info(f"Loaded cached pivot bounds for {symbol}")
except Exception as e:
logger.error(f"Error loading pivot bounds from cache: {e}")
def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]:
"""Load pivot bounds from cache"""
try:
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
if cache_file.exists():
with open(cache_file, 'rb') as f:
bounds = pickle.load(f)
# Check if bounds are still valid (not too old)
age = datetime.now() - bounds.created_timestamp
if age <= self.pivot_refresh_interval:
return bounds
else:
logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)")
return None
except Exception as e:
logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}")
return None
def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds):
"""Save pivot bounds to cache"""
try:
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(bounds, f)
logger.debug(f"Saved pivot bounds to cache for {symbol}")
except Exception as e:
logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}")
def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]:
"""Load monthly 1s data from cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet"
if cache_file.exists():
# Check if cache is recent (less than 1 day old)
cache_age = time.time() - cache_file.stat().st_mtime
if cache_age < 86400: # 24 hours
df = pd.read_parquet(cache_file)
return df
else:
logger.info(f"Monthly data cache for {symbol} is too old ({cache_age/3600:.1f}h)")
return None
except Exception as e:
logger.warning(f"Error loading monthly data from cache for {symbol}: {e}")
return None
def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame):
"""Save monthly 1s data to cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet"
df.to_parquet(cache_file, index=False)
logger.info(f"Saved {len(df)} monthly 1s candles to cache for {symbol}")
except Exception as e:
logger.warning(f"Error saving monthly data to cache for {symbol}: {e}")
def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]:
"""Get pivot bounds for a symbol"""
return self.pivot_bounds.get(symbol)
def get_pivot_normalized_features(self, symbol: str, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""Get dataframe with pivot-normalized features"""
try:
if symbol not in self.pivot_bounds:
logger.warning(f"No pivot bounds available for {symbol}")
return df
bounds = self.pivot_bounds[symbol]
normalized_df = df.copy()
# Normalize price columns using pivot bounds
price_range = bounds.get_price_range()
for col in ['open', 'high', 'low', 'close']:
if col in normalized_df.columns:
normalized_df[col] = (normalized_df[col] - bounds.price_min) / price_range
# Normalize volume using pivot bounds
volume_range = bounds.volume_max - bounds.volume_min
if volume_range > 0 and 'volume' in normalized_df.columns:
normalized_df['volume'] = (normalized_df['volume'] - bounds.volume_min) / volume_range
return normalized_df
except Exception as e:
logger.error(f"Error applying pivot normalization for {symbol}: {e}")
return df
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add basic indicators for small datasets"""
@ -971,7 +1523,7 @@ class DataProvider:
# Use only common features
try:
tf_features = self._normalize_features(df[common_feature_names].tail(window_size))
tf_features = self._normalize_features(df[common_feature_names].tail(window_size), symbol=symbol)
if tf_features is not None and len(tf_features) == window_size:
feature_channels.append(tf_features.values)
@ -1060,29 +1612,59 @@ class DataProvider:
logger.error(f"Error selecting CNN features: {e}")
return basic_cols # Fallback to basic OHLCV
def _normalize_features(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""Normalize features for CNN training"""
def _normalize_features(self, df: pd.DataFrame, symbol: str = None) -> Optional[pd.DataFrame]:
"""Normalize features for CNN training using pivot-based bounds when available"""
try:
df_norm = df.copy()
# Handle different normalization strategies for different feature types
# Try to use pivot-based normalization if available
if symbol and symbol in self.pivot_bounds:
bounds = self.pivot_bounds[symbol]
price_range = bounds.get_price_range()
# Normalize price-based features using pivot bounds
price_cols = ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']
for col in price_cols:
if col in df_norm.columns:
# Use pivot bounds for normalization
df_norm[col] = (df_norm[col] - bounds.price_min) / price_range
# Normalize volume using pivot bounds
if 'volume' in df_norm.columns:
volume_range = bounds.volume_max - bounds.volume_min
if volume_range > 0:
df_norm['volume'] = (df_norm['volume'] - bounds.volume_min) / volume_range
else:
df_norm['volume'] = 0.5 # Default to middle if no volume range
logger.debug(f"Applied pivot-based normalization for {symbol}")
else:
# Fallback to traditional normalization when pivot bounds not available
logger.debug("Using traditional normalization (no pivot bounds available)")
for col in df_norm.columns:
if col in ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']:
# Price-based indicators: normalize by close price
if 'close' in df_norm.columns:
base_price = df_norm['close'].iloc[-1] # Use latest close as reference
if base_price > 0:
df_norm[col] = df_norm[col] / base_price
elif col == 'volume':
# Volume: normalize by its own rolling mean
volume_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
if volume_mean > 0:
df_norm[col] = df_norm[col] / volume_mean
# Normalize indicators that have standard ranges (regardless of pivot bounds)
for col in df_norm.columns:
if col in ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50',
'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle',
'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']:
# Price-based indicators: normalize by close price
if 'close' in df_norm.columns:
base_price = df_norm['close'].iloc[-1] # Use latest close as reference
if base_price > 0:
df_norm[col] = df_norm[col] / base_price
elif col == 'volume':
# Volume: normalize by its own rolling mean
volume_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
if volume_mean > 0:
df_norm[col] = df_norm[col] / volume_mean
elif col in ['rsi_14', 'rsi_7', 'rsi_21']:
if col in ['rsi_14', 'rsi_7', 'rsi_21']:
# RSI: already 0-100, normalize to 0-1
df_norm[col] = df_norm[col] / 100.0
@ -1098,20 +1680,24 @@ class DataProvider:
# MACD: normalize by ATR or close price
if 'atr' in df_norm.columns and df_norm['atr'].iloc[-1] > 0:
df_norm[col] = df_norm[col] / df_norm['atr'].iloc[-1]
elif 'close' in df_norm.columns:
elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0:
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
elif col in ['bb_width', 'bb_percent', 'price_position', 'trend_strength',
'momentum_composite', 'volatility_regime']:
'momentum_composite', 'volatility_regime', 'pivot_price_position',
'pivot_support_distance', 'pivot_resistance_distance']:
# Already normalized indicators: ensure 0-1 range
df_norm[col] = np.clip(df_norm[col], 0, 1)
elif col in ['atr', 'true_range']:
# Volatility indicators: normalize by close price
if 'close' in df_norm.columns:
# Volatility indicators: normalize by close price or pivot range
if symbol and symbol in self.pivot_bounds:
bounds = self.pivot_bounds[symbol]
df_norm[col] = df_norm[col] / bounds.get_price_range()
elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0:
df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1]
else:
elif col not in ['timestamp', 'near_pivot_support', 'near_pivot_resistance']:
# Other indicators: z-score normalization
col_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1]
col_std = df_norm[col].rolling(window=min(20, len(df_norm))).std().iloc[-1]