pivots
This commit is contained in:
@ -34,6 +34,7 @@ from collections import deque
|
||||
from .config import get_config
|
||||
from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
|
||||
from .cnn_monitor import log_cnn_prediction
|
||||
from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -182,6 +183,16 @@ class DataProvider:
|
||||
'1h': 3600, '4h': 14400, '1d': 86400
|
||||
}
|
||||
|
||||
# Williams Market Structure integration
|
||||
self.williams_structure: Dict[str, WilliamsMarketStructure] = {}
|
||||
for symbol in self.symbols:
|
||||
self.williams_structure[symbol] = WilliamsMarketStructure(min_pivot_distance=3)
|
||||
|
||||
# Pivot point caching
|
||||
self.pivot_points_cache: Dict[str, Dict[int, TrendLevel]] = {} # {symbol: {level: TrendLevel}}
|
||||
self.last_pivot_calculation: Dict[str, datetime] = {}
|
||||
self.pivot_calculation_interval = timedelta(minutes=5) # Recalculate every 5 minutes
|
||||
|
||||
# Load existing pivot bounds from cache
|
||||
self._load_all_pivot_bounds()
|
||||
|
||||
@ -189,6 +200,7 @@ class DataProvider:
|
||||
logger.info(f"Timeframes: {self.timeframes}")
|
||||
logger.info("Centralized data distribution enabled")
|
||||
logger.info("Pivot-based normalization system enabled")
|
||||
logger.info("Williams Market Structure integration enabled")
|
||||
|
||||
# Rate limiting
|
||||
self.last_request_time = {}
|
||||
@ -1613,6 +1625,151 @@ class DataProvider:
|
||||
logger.error(f"Error getting current price for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def calculate_williams_pivot_points(self, symbol: str, force_recalculate: bool = False) -> Dict[int, TrendLevel]:
|
||||
"""
|
||||
Calculate Williams Market Structure pivot points for a symbol
|
||||
|
||||
Args:
|
||||
symbol: Trading symbol (e.g., 'ETH/USDT')
|
||||
force_recalculate: Force recalculation even if cache is fresh
|
||||
|
||||
Returns:
|
||||
Dictionary of trend levels with pivot points
|
||||
"""
|
||||
try:
|
||||
# Check if we need to recalculate
|
||||
now = datetime.now()
|
||||
if (not force_recalculate and
|
||||
symbol in self.last_pivot_calculation and
|
||||
now - self.last_pivot_calculation[symbol] < self.pivot_calculation_interval):
|
||||
# Return cached results
|
||||
return self.pivot_points_cache.get(symbol, {})
|
||||
|
||||
# Get 1s OHLCV data for Williams Market Structure calculation
|
||||
df_1s = self.get_historical_data(symbol, '1s', limit=1000)
|
||||
if df_1s is None or len(df_1s) < 50:
|
||||
logger.warning(f"Insufficient 1s data for Williams pivot calculation: {symbol}")
|
||||
return {}
|
||||
|
||||
# Convert DataFrame to numpy array for Williams calculation
|
||||
# Format: [timestamp_ms, open, high, low, close, volume]
|
||||
ohlcv_array = np.column_stack([
|
||||
df_1s.index.astype(np.int64) // 10**6, # Convert to milliseconds
|
||||
df_1s['open'].values,
|
||||
df_1s['high'].values,
|
||||
df_1s['low'].values,
|
||||
df_1s['close'].values,
|
||||
df_1s['volume'].values
|
||||
])
|
||||
|
||||
# Calculate recursive pivot points using Williams Market Structure
|
||||
williams = self.williams_structure[symbol]
|
||||
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
|
||||
|
||||
# Cache the results
|
||||
self.pivot_points_cache[symbol] = pivot_levels
|
||||
self.last_pivot_calculation[symbol] = now
|
||||
|
||||
logger.debug(f"Calculated Williams pivot points for {symbol}: {len(pivot_levels)} levels")
|
||||
return pivot_levels
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating Williams pivot points for {symbol}: {e}")
|
||||
return {}
|
||||
|
||||
def get_pivot_features_for_ml(self, symbol: str) -> np.ndarray:
|
||||
"""
|
||||
Get pivot point features for machine learning models
|
||||
|
||||
Returns a 250-element feature vector containing:
|
||||
- Recent pivot points (price, strength, type) for each level
|
||||
- Trend direction and strength for each level
|
||||
- Time since last pivot for each level
|
||||
"""
|
||||
try:
|
||||
# Ensure we have fresh pivot points
|
||||
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
||||
|
||||
if not pivot_levels:
|
||||
logger.warning(f"No pivot points available for {symbol}")
|
||||
return np.zeros(250, dtype=np.float32)
|
||||
|
||||
# Use Williams Market Structure to extract ML features
|
||||
williams = self.williams_structure[symbol]
|
||||
features = williams.get_pivot_features_for_ml(symbol)
|
||||
|
||||
return features
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting pivot features for ML: {e}")
|
||||
return np.zeros(250, dtype=np.float32)
|
||||
|
||||
def get_market_structure_summary(self, symbol: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get current market structure summary for dashboard display
|
||||
|
||||
Returns:
|
||||
Dictionary containing market structure information
|
||||
"""
|
||||
try:
|
||||
# Ensure we have fresh pivot points
|
||||
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
||||
|
||||
if not pivot_levels:
|
||||
return {
|
||||
'symbol': symbol,
|
||||
'levels': {},
|
||||
'overall_trend': 'sideways',
|
||||
'overall_strength': 0.0,
|
||||
'last_update': datetime.now().isoformat(),
|
||||
'error': 'No pivot points available'
|
||||
}
|
||||
|
||||
# Use Williams Market Structure to get summary
|
||||
williams = self.williams_structure[symbol]
|
||||
structure = williams.get_current_market_structure()
|
||||
structure['symbol'] = symbol
|
||||
|
||||
return structure
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting market structure summary for {symbol}: {e}")
|
||||
return {
|
||||
'symbol': symbol,
|
||||
'levels': {},
|
||||
'overall_trend': 'sideways',
|
||||
'overall_strength': 0.0,
|
||||
'last_update': datetime.now().isoformat(),
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def get_recent_pivot_points(self, symbol: str, level: int = 1, count: int = 10) -> List[PivotPoint]:
|
||||
"""
|
||||
Get recent pivot points for a specific level
|
||||
|
||||
Args:
|
||||
symbol: Trading symbol
|
||||
level: Pivot level (1-5)
|
||||
count: Number of recent pivots to return
|
||||
|
||||
Returns:
|
||||
List of recent pivot points
|
||||
"""
|
||||
try:
|
||||
pivot_levels = self.calculate_williams_pivot_points(symbol)
|
||||
|
||||
if level not in pivot_levels:
|
||||
return []
|
||||
|
||||
trend_level = pivot_levels[level]
|
||||
recent_pivots = trend_level.pivot_points[-count:] if len(trend_level.pivot_points) >= count else trend_level.pivot_points
|
||||
|
||||
return recent_pivots
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting recent pivot points for {symbol} level {level}: {e}")
|
||||
return []
|
||||
|
||||
def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]:
|
||||
"""Get price at specific index for backtesting"""
|
||||
try:
|
||||
|
@ -136,6 +136,11 @@ class TradingOrchestrator:
|
||||
self.recent_decisions: Dict[str, List[TradingDecision]] = {} # {symbol: List[TradingDecision]}
|
||||
self.model_performance: Dict[str, Dict[str, Any]] = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}}
|
||||
|
||||
# Signal rate limiting to prevent spam
|
||||
self.last_signal_time: Dict[str, Dict[str, datetime]] = {} # {symbol: {action: datetime}}
|
||||
self.min_signal_interval = timedelta(seconds=30) # Minimum 30 seconds between same signals
|
||||
self.last_confirmed_signal: Dict[str, Dict[str, Any]] = {} # {symbol: {action, timestamp, confidence}}
|
||||
|
||||
# Signal accumulation for trend confirmation
|
||||
self.signal_accumulator: Dict[str, List[Dict]] = {} # {symbol: List[signal_data]}
|
||||
self.required_confirmations = 3 # Number of consistent signals needed
|
||||
@ -871,6 +876,22 @@ class TradingOrchestrator:
|
||||
'CNN': self.config.orchestrator.get('cnn_weight', 0.7),
|
||||
'RL': self.config.orchestrator.get('rl_weight', 0.3)
|
||||
}
|
||||
|
||||
# Add weights for specific models if they exist
|
||||
if hasattr(self, 'cnn_model') and self.cnn_model:
|
||||
self.model_weights["enhanced_cnn"] = 0.4
|
||||
|
||||
# Only add DQN agent weight if it exists
|
||||
if hasattr(self, 'rl_agent') and self.rl_agent:
|
||||
self.model_weights["dqn_agent"] = 0.3
|
||||
|
||||
# Add COB RL model weight if it exists
|
||||
if hasattr(self, 'cob_rl_agent') and self.cob_rl_agent:
|
||||
self.model_weights["cob_rl_model"] = 0.2
|
||||
|
||||
# Add extrema trainer weight if it exists
|
||||
if hasattr(self, 'extrema_trainer') and self.extrema_trainer:
|
||||
self.model_weights["extrema_trainer"] = 0.15
|
||||
|
||||
def register_model(self, model: ModelInterface, weight: Optional[float] = None) -> bool:
|
||||
"""Register a new model with the orchestrator"""
|
||||
@ -1960,10 +1981,27 @@ class TradingOrchestrator:
|
||||
logger.info("Trading executor set for position tracking and P&L feedback")
|
||||
|
||||
def _check_signal_confirmation(self, symbol: str, signal_data: Dict) -> Optional[str]:
|
||||
"""Check if we have enough signal confirmations for trend confirmation"""
|
||||
"""Check if we have enough signal confirmations for trend confirmation with rate limiting"""
|
||||
try:
|
||||
# Clean up expired signals
|
||||
current_time = signal_data['timestamp']
|
||||
action = signal_data['action']
|
||||
|
||||
# Initialize signal tracking for this symbol if needed
|
||||
if symbol not in self.last_signal_time:
|
||||
self.last_signal_time[symbol] = {}
|
||||
if symbol not in self.last_confirmed_signal:
|
||||
self.last_confirmed_signal[symbol] = {}
|
||||
|
||||
# RATE LIMITING: Check if we recently confirmed the same signal
|
||||
if action in self.last_confirmed_signal[symbol]:
|
||||
last_confirmed = self.last_confirmed_signal[symbol][action]
|
||||
time_since_last = current_time - last_confirmed['timestamp']
|
||||
if time_since_last < self.min_signal_interval:
|
||||
logger.debug(f"Rate limiting: {action} signal for {symbol} too recent "
|
||||
f"({time_since_last.total_seconds():.1f}s < {self.min_signal_interval.total_seconds()}s)")
|
||||
return None
|
||||
|
||||
# Clean up expired signals
|
||||
self.signal_accumulator[symbol] = [
|
||||
s for s in self.signal_accumulator[symbol]
|
||||
if (current_time - s['timestamp']).total_seconds() < self.signal_timeout_seconds
|
||||
@ -1982,8 +2020,8 @@ class TradingOrchestrator:
|
||||
|
||||
# Count action consensus
|
||||
action_counts = {}
|
||||
for action in actions:
|
||||
action_counts[action] = action_counts.get(action, 0) + 1
|
||||
for action_item in actions:
|
||||
action_counts[action_item] = action_counts.get(action_item, 0) + 1
|
||||
|
||||
# Find dominant action
|
||||
dominant_action = max(action_counts, key=action_counts.get)
|
||||
@ -1991,8 +2029,24 @@ class TradingOrchestrator:
|
||||
|
||||
# Require at least 2/3 consensus
|
||||
if consensus_count >= max(2, self.required_confirmations * 0.67):
|
||||
# ADDITIONAL RATE LIMITING: Don't confirm if we just confirmed the same action
|
||||
if dominant_action in self.last_confirmed_signal[symbol]:
|
||||
last_confirmed = self.last_confirmed_signal[symbol][dominant_action]
|
||||
time_since_last = current_time - last_confirmed['timestamp']
|
||||
if time_since_last < self.min_signal_interval:
|
||||
logger.debug(f"Rate limiting: Preventing duplicate {dominant_action} confirmation for {symbol}")
|
||||
return None
|
||||
|
||||
# Record this confirmation
|
||||
self.last_confirmed_signal[symbol][dominant_action] = {
|
||||
'timestamp': current_time,
|
||||
'confidence': signal_data['confidence']
|
||||
}
|
||||
|
||||
# Clear accumulator after confirmation
|
||||
self.signal_accumulator[symbol] = []
|
||||
|
||||
logger.info(f"Signal confirmed after rate limiting: {dominant_action} for {symbol}")
|
||||
return dominant_action
|
||||
|
||||
return None
|
||||
|
555
core/williams_market_structure.py
Normal file
555
core/williams_market_structure.py
Normal file
@ -0,0 +1,555 @@
|
||||
"""
|
||||
Williams Market Structure Implementation
|
||||
|
||||
This module implements Larry Williams' market structure analysis with recursive pivot points.
|
||||
The system identifies swing highs and swing lows, then uses these pivot points to determine
|
||||
higher-level trends recursively.
|
||||
|
||||
Key Features:
|
||||
- Recursive pivot point calculation (5 levels)
|
||||
- Swing high/low identification
|
||||
- Trend direction and strength analysis
|
||||
- Integration with CNN model for pivot prediction
|
||||
"""
|
||||
|
||||
import logging
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from dataclasses import dataclass, field
|
||||
from collections import deque
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class PivotPoint:
|
||||
"""Represents a pivot point in the market structure"""
|
||||
timestamp: datetime
|
||||
price: float
|
||||
pivot_type: str # 'high' or 'low'
|
||||
level: int # Pivot level (1-5)
|
||||
index: int # Index in the original data
|
||||
strength: float = 0.0 # Strength of the pivot (0.0 to 1.0)
|
||||
confirmed: bool = False # Whether the pivot is confirmed
|
||||
|
||||
@dataclass
|
||||
class TrendLevel:
|
||||
"""Represents a trend level in the Williams Market Structure"""
|
||||
level: int
|
||||
pivot_points: List[PivotPoint]
|
||||
trend_direction: str # 'up', 'down', 'sideways'
|
||||
trend_strength: float # 0.0 to 1.0
|
||||
last_pivot_high: Optional[PivotPoint] = None
|
||||
last_pivot_low: Optional[PivotPoint] = None
|
||||
|
||||
class WilliamsMarketStructure:
|
||||
"""
|
||||
Implementation of Larry Williams Market Structure Analysis
|
||||
|
||||
This class implements the recursive pivot point calculation system where:
|
||||
1. Level 1: Direct swing highs/lows from 1s OHLCV data
|
||||
2. Level 2-5: Recursive analysis using previous level's pivot points as "candles"
|
||||
"""
|
||||
|
||||
def __init__(self, min_pivot_distance: int = 3):
|
||||
"""
|
||||
Initialize Williams Market Structure analyzer
|
||||
|
||||
Args:
|
||||
min_pivot_distance: Minimum distance between pivot points
|
||||
"""
|
||||
self.min_pivot_distance = min_pivot_distance
|
||||
self.pivot_levels: Dict[int, TrendLevel] = {}
|
||||
self.max_levels = 5
|
||||
|
||||
logger.info(f"Williams Market Structure initialized with {self.max_levels} levels")
|
||||
|
||||
def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[int, TrendLevel]:
|
||||
"""
|
||||
Calculate recursive pivot points following Williams Market Structure methodology
|
||||
|
||||
Args:
|
||||
ohlcv_data: OHLCV data array with shape (N, 6) [timestamp, O, H, L, C, V]
|
||||
|
||||
Returns:
|
||||
Dictionary of trend levels with pivot points
|
||||
"""
|
||||
try:
|
||||
if len(ohlcv_data) < self.min_pivot_distance * 2 + 1:
|
||||
logger.warning(f"Insufficient data for pivot calculation: {len(ohlcv_data)} bars")
|
||||
return {}
|
||||
|
||||
# Convert to DataFrame for easier processing
|
||||
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
||||
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
||||
|
||||
# Initialize pivot levels
|
||||
self.pivot_levels = {}
|
||||
|
||||
# Level 1: Calculate pivot points from raw OHLCV data
|
||||
level_1_pivots = self._calculate_level_1_pivots(df)
|
||||
if level_1_pivots:
|
||||
self.pivot_levels[1] = TrendLevel(
|
||||
level=1,
|
||||
pivot_points=level_1_pivots,
|
||||
trend_direction=self._determine_trend_direction(level_1_pivots),
|
||||
trend_strength=self._calculate_trend_strength(level_1_pivots)
|
||||
)
|
||||
|
||||
# Levels 2-5: Recursive calculation using previous level's pivots
|
||||
for level in range(2, self.max_levels + 1):
|
||||
higher_level_pivots = self._calculate_higher_level_pivots(level)
|
||||
if higher_level_pivots:
|
||||
self.pivot_levels[level] = TrendLevel(
|
||||
level=level,
|
||||
pivot_points=higher_level_pivots,
|
||||
trend_direction=self._determine_trend_direction(higher_level_pivots),
|
||||
trend_strength=self._calculate_trend_strength(higher_level_pivots)
|
||||
)
|
||||
else:
|
||||
break # No more higher level pivots possible
|
||||
|
||||
logger.debug(f"Calculated {len(self.pivot_levels)} pivot levels")
|
||||
return self.pivot_levels
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating recursive pivot points: {e}")
|
||||
return {}
|
||||
|
||||
def _calculate_level_1_pivots(self, df: pd.DataFrame) -> List[PivotPoint]:
|
||||
"""
|
||||
Calculate Level 1 pivot points from raw OHLCV data
|
||||
|
||||
A swing high is a candle with lower highs on both sides
|
||||
A swing low is a candle with higher lows on both sides
|
||||
"""
|
||||
pivots = []
|
||||
|
||||
try:
|
||||
for i in range(self.min_pivot_distance, len(df) - self.min_pivot_distance):
|
||||
current_high = df.iloc[i]['high']
|
||||
current_low = df.iloc[i]['low']
|
||||
current_timestamp = df.iloc[i]['timestamp']
|
||||
|
||||
# Check for swing high
|
||||
is_swing_high = True
|
||||
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
||||
if j != i and df.iloc[j]['high'] >= current_high:
|
||||
is_swing_high = False
|
||||
break
|
||||
|
||||
if is_swing_high:
|
||||
pivot = PivotPoint(
|
||||
timestamp=current_timestamp,
|
||||
price=current_high,
|
||||
pivot_type='high',
|
||||
level=1,
|
||||
index=i,
|
||||
strength=self._calculate_pivot_strength(df, i, 'high'),
|
||||
confirmed=True
|
||||
)
|
||||
pivots.append(pivot)
|
||||
continue
|
||||
|
||||
# Check for swing low
|
||||
is_swing_low = True
|
||||
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
||||
if j != i and df.iloc[j]['low'] <= current_low:
|
||||
is_swing_low = False
|
||||
break
|
||||
|
||||
if is_swing_low:
|
||||
pivot = PivotPoint(
|
||||
timestamp=current_timestamp,
|
||||
price=current_low,
|
||||
pivot_type='low',
|
||||
level=1,
|
||||
index=i,
|
||||
strength=self._calculate_pivot_strength(df, i, 'low'),
|
||||
confirmed=True
|
||||
)
|
||||
pivots.append(pivot)
|
||||
|
||||
logger.debug(f"Level 1: Found {len(pivots)} pivot points")
|
||||
return pivots
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating Level 1 pivots: {e}")
|
||||
return []
|
||||
|
||||
def _calculate_higher_level_pivots(self, level: int) -> List[PivotPoint]:
|
||||
"""
|
||||
Calculate higher level pivot points using previous level's pivots as "candles"
|
||||
|
||||
This is the recursive part of Williams Market Structure where we treat
|
||||
pivot points from the previous level as if they were OHLCV candles
|
||||
"""
|
||||
if level - 1 not in self.pivot_levels:
|
||||
return []
|
||||
|
||||
previous_level_pivots = self.pivot_levels[level - 1].pivot_points
|
||||
if len(previous_level_pivots) < self.min_pivot_distance * 2 + 1:
|
||||
return []
|
||||
|
||||
pivots = []
|
||||
|
||||
try:
|
||||
# Group pivots by type to find swing points
|
||||
highs = [p for p in previous_level_pivots if p.pivot_type == 'high']
|
||||
lows = [p for p in previous_level_pivots if p.pivot_type == 'low']
|
||||
|
||||
# Find swing highs among the high pivots
|
||||
for i in range(self.min_pivot_distance, len(highs) - self.min_pivot_distance):
|
||||
current_pivot = highs[i]
|
||||
|
||||
# Check if this high is surrounded by lower highs
|
||||
is_swing_high = True
|
||||
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
||||
if j != i and j < len(highs) and highs[j].price >= current_pivot.price:
|
||||
is_swing_high = False
|
||||
break
|
||||
|
||||
if is_swing_high:
|
||||
pivot = PivotPoint(
|
||||
timestamp=current_pivot.timestamp,
|
||||
price=current_pivot.price,
|
||||
pivot_type='high',
|
||||
level=level,
|
||||
index=current_pivot.index,
|
||||
strength=current_pivot.strength * 0.8, # Reduce strength at higher levels
|
||||
confirmed=True
|
||||
)
|
||||
pivots.append(pivot)
|
||||
|
||||
# Find swing lows among the low pivots
|
||||
for i in range(self.min_pivot_distance, len(lows) - self.min_pivot_distance):
|
||||
current_pivot = lows[i]
|
||||
|
||||
# Check if this low is surrounded by higher lows
|
||||
is_swing_low = True
|
||||
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
||||
if j != i and j < len(lows) and lows[j].price <= current_pivot.price:
|
||||
is_swing_low = False
|
||||
break
|
||||
|
||||
if is_swing_low:
|
||||
pivot = PivotPoint(
|
||||
timestamp=current_pivot.timestamp,
|
||||
price=current_pivot.price,
|
||||
pivot_type='low',
|
||||
level=level,
|
||||
index=current_pivot.index,
|
||||
strength=current_pivot.strength * 0.8, # Reduce strength at higher levels
|
||||
confirmed=True
|
||||
)
|
||||
pivots.append(pivot)
|
||||
|
||||
# Sort pivots by timestamp
|
||||
pivots.sort(key=lambda x: x.timestamp)
|
||||
|
||||
logger.debug(f"Level {level}: Found {len(pivots)} pivot points")
|
||||
return pivots
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating Level {level} pivots: {e}")
|
||||
return []
|
||||
|
||||
def _calculate_pivot_strength(self, df: pd.DataFrame, index: int, pivot_type: str) -> float:
|
||||
"""
|
||||
Calculate the strength of a pivot point based on surrounding price action
|
||||
|
||||
Strength is determined by:
|
||||
- Distance from surrounding highs/lows
|
||||
- Volume at the pivot point
|
||||
- Duration of the pivot formation
|
||||
"""
|
||||
try:
|
||||
if pivot_type == 'high':
|
||||
current_price = df.iloc[index]['high']
|
||||
# Calculate average of surrounding highs
|
||||
surrounding_prices = []
|
||||
for i in range(max(0, index - self.min_pivot_distance),
|
||||
min(len(df), index + self.min_pivot_distance + 1)):
|
||||
if i != index:
|
||||
surrounding_prices.append(df.iloc[i]['high'])
|
||||
|
||||
if surrounding_prices:
|
||||
avg_surrounding = np.mean(surrounding_prices)
|
||||
strength = min(1.0, (current_price - avg_surrounding) / avg_surrounding * 10)
|
||||
else:
|
||||
strength = 0.5
|
||||
else: # pivot_type == 'low'
|
||||
current_price = df.iloc[index]['low']
|
||||
# Calculate average of surrounding lows
|
||||
surrounding_prices = []
|
||||
for i in range(max(0, index - self.min_pivot_distance),
|
||||
min(len(df), index + self.min_pivot_distance + 1)):
|
||||
if i != index:
|
||||
surrounding_prices.append(df.iloc[i]['low'])
|
||||
|
||||
if surrounding_prices:
|
||||
avg_surrounding = np.mean(surrounding_prices)
|
||||
strength = min(1.0, (avg_surrounding - current_price) / avg_surrounding * 10)
|
||||
else:
|
||||
strength = 0.5
|
||||
|
||||
# Factor in volume if available
|
||||
if 'volume' in df.columns and df.iloc[index]['volume'] > 0:
|
||||
avg_volume = df['volume'].rolling(window=20, center=True).mean().iloc[index]
|
||||
if avg_volume > 0:
|
||||
volume_factor = min(2.0, df.iloc[index]['volume'] / avg_volume)
|
||||
strength *= volume_factor
|
||||
|
||||
return max(0.0, min(1.0, strength))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating pivot strength: {e}")
|
||||
return 0.5
|
||||
|
||||
def _determine_trend_direction(self, pivots: List[PivotPoint]) -> str:
|
||||
"""
|
||||
Determine the overall trend direction based on pivot points
|
||||
|
||||
Trend is determined by comparing recent highs and lows:
|
||||
- Uptrend: Higher highs and higher lows
|
||||
- Downtrend: Lower highs and lower lows
|
||||
- Sideways: Mixed or insufficient data
|
||||
"""
|
||||
if len(pivots) < 4:
|
||||
return 'sideways'
|
||||
|
||||
try:
|
||||
# Get recent pivots (last 10 or all if less than 10)
|
||||
recent_pivots = pivots[-10:] if len(pivots) >= 10 else pivots
|
||||
|
||||
highs = [p for p in recent_pivots if p.pivot_type == 'high']
|
||||
lows = [p for p in recent_pivots if p.pivot_type == 'low']
|
||||
|
||||
if len(highs) < 2 or len(lows) < 2:
|
||||
return 'sideways'
|
||||
|
||||
# Sort by timestamp
|
||||
highs.sort(key=lambda x: x.timestamp)
|
||||
lows.sort(key=lambda x: x.timestamp)
|
||||
|
||||
# Check for higher highs and higher lows (uptrend)
|
||||
higher_highs = highs[-1].price > highs[-2].price if len(highs) >= 2 else False
|
||||
higher_lows = lows[-1].price > lows[-2].price if len(lows) >= 2 else False
|
||||
|
||||
# Check for lower highs and lower lows (downtrend)
|
||||
lower_highs = highs[-1].price < highs[-2].price if len(highs) >= 2 else False
|
||||
lower_lows = lows[-1].price < lows[-2].price if len(lows) >= 2 else False
|
||||
|
||||
if higher_highs and higher_lows:
|
||||
return 'up'
|
||||
elif lower_highs and lower_lows:
|
||||
return 'down'
|
||||
else:
|
||||
return 'sideways'
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error determining trend direction: {e}")
|
||||
return 'sideways'
|
||||
|
||||
def _calculate_trend_strength(self, pivots: List[PivotPoint]) -> float:
|
||||
"""
|
||||
Calculate the strength of the current trend
|
||||
|
||||
Strength is based on:
|
||||
- Consistency of pivot point progression
|
||||
- Average strength of individual pivots
|
||||
- Number of confirming pivots
|
||||
"""
|
||||
if not pivots:
|
||||
return 0.0
|
||||
|
||||
try:
|
||||
# Average individual pivot strengths
|
||||
avg_pivot_strength = np.mean([p.strength for p in pivots])
|
||||
|
||||
# Factor in number of pivots (more pivots = stronger trend)
|
||||
pivot_count_factor = min(1.0, len(pivots) / 10.0)
|
||||
|
||||
# Calculate consistency (how well pivots follow the trend)
|
||||
trend_direction = self._determine_trend_direction(pivots)
|
||||
consistency_score = self._calculate_trend_consistency(pivots, trend_direction)
|
||||
|
||||
# Combine factors
|
||||
trend_strength = (avg_pivot_strength * 0.4 +
|
||||
pivot_count_factor * 0.3 +
|
||||
consistency_score * 0.3)
|
||||
|
||||
return max(0.0, min(1.0, trend_strength))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating trend strength: {e}")
|
||||
return 0.0
|
||||
|
||||
def _calculate_trend_consistency(self, pivots: List[PivotPoint], trend_direction: str) -> float:
|
||||
"""
|
||||
Calculate how consistently the pivots follow the expected trend direction
|
||||
"""
|
||||
if len(pivots) < 4 or trend_direction == 'sideways':
|
||||
return 0.5
|
||||
|
||||
try:
|
||||
highs = [p for p in pivots if p.pivot_type == 'high']
|
||||
lows = [p for p in pivots if p.pivot_type == 'low']
|
||||
|
||||
if len(highs) < 2 or len(lows) < 2:
|
||||
return 0.5
|
||||
|
||||
# Sort by timestamp
|
||||
highs.sort(key=lambda x: x.timestamp)
|
||||
lows.sort(key=lambda x: x.timestamp)
|
||||
|
||||
consistent_moves = 0
|
||||
total_moves = 0
|
||||
|
||||
# Check high-to-high moves
|
||||
for i in range(1, len(highs)):
|
||||
total_moves += 1
|
||||
if trend_direction == 'up' and highs[i].price > highs[i-1].price:
|
||||
consistent_moves += 1
|
||||
elif trend_direction == 'down' and highs[i].price < highs[i-1].price:
|
||||
consistent_moves += 1
|
||||
|
||||
# Check low-to-low moves
|
||||
for i in range(1, len(lows)):
|
||||
total_moves += 1
|
||||
if trend_direction == 'up' and lows[i].price > lows[i-1].price:
|
||||
consistent_moves += 1
|
||||
elif trend_direction == 'down' and lows[i].price < lows[i-1].price:
|
||||
consistent_moves += 1
|
||||
|
||||
if total_moves == 0:
|
||||
return 0.5
|
||||
|
||||
return consistent_moves / total_moves
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating trend consistency: {e}")
|
||||
return 0.5
|
||||
|
||||
def get_pivot_features_for_ml(self, symbol: str = "ETH/USDT") -> np.ndarray:
|
||||
"""
|
||||
Extract pivot point features for machine learning models
|
||||
|
||||
Returns a feature vector containing:
|
||||
- Recent pivot points (price, strength, type)
|
||||
- Trend direction and strength for each level
|
||||
- Time since last pivot for each level
|
||||
|
||||
Total features: 250 (50 features per level * 5 levels)
|
||||
"""
|
||||
features = []
|
||||
|
||||
try:
|
||||
for level in range(1, self.max_levels + 1):
|
||||
level_features = []
|
||||
|
||||
if level in self.pivot_levels:
|
||||
trend_level = self.pivot_levels[level]
|
||||
pivots = trend_level.pivot_points
|
||||
|
||||
# Get last 5 pivots for this level
|
||||
recent_pivots = pivots[-5:] if len(pivots) >= 5 else pivots
|
||||
|
||||
# Pad with zeros if we have fewer than 5 pivots
|
||||
while len(recent_pivots) < 5:
|
||||
recent_pivots.insert(0, PivotPoint(
|
||||
timestamp=datetime.now(),
|
||||
price=0.0,
|
||||
pivot_type='high',
|
||||
level=level,
|
||||
index=0,
|
||||
strength=0.0
|
||||
))
|
||||
|
||||
# Extract features for each pivot (8 features per pivot)
|
||||
for pivot in recent_pivots:
|
||||
level_features.extend([
|
||||
pivot.price,
|
||||
pivot.strength,
|
||||
1.0 if pivot.pivot_type == 'high' else 0.0, # Pivot type
|
||||
float(pivot.level),
|
||||
1.0 if pivot.confirmed else 0.0, # Confirmation status
|
||||
float((datetime.now() - pivot.timestamp).total_seconds() / 3600), # Hours since pivot
|
||||
float(pivot.index), # Position in data
|
||||
0.0 # Reserved for future use
|
||||
])
|
||||
|
||||
# Add trend features (10 features)
|
||||
trend_direction_encoded = {
|
||||
'up': [1.0, 0.0, 0.0],
|
||||
'down': [0.0, 1.0, 0.0],
|
||||
'sideways': [0.0, 0.0, 1.0]
|
||||
}.get(trend_level.trend_direction, [0.0, 0.0, 1.0])
|
||||
|
||||
level_features.extend(trend_direction_encoded)
|
||||
level_features.append(trend_level.trend_strength)
|
||||
level_features.extend([0.0] * 6) # Reserved for future use
|
||||
|
||||
else:
|
||||
# No data for this level, fill with zeros
|
||||
level_features = [0.0] * 50
|
||||
|
||||
features.extend(level_features)
|
||||
|
||||
return np.array(features, dtype=np.float32)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting pivot features for ML: {e}")
|
||||
return np.zeros(250, dtype=np.float32)
|
||||
|
||||
def get_current_market_structure(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get current market structure summary for dashboard display
|
||||
"""
|
||||
try:
|
||||
structure = {
|
||||
'levels': {},
|
||||
'overall_trend': 'sideways',
|
||||
'overall_strength': 0.0,
|
||||
'last_update': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Aggregate information from all levels
|
||||
trend_votes = {'up': 0, 'down': 0, 'sideways': 0}
|
||||
total_strength = 0.0
|
||||
active_levels = 0
|
||||
|
||||
for level, trend_level in self.pivot_levels.items():
|
||||
structure['levels'][level] = {
|
||||
'trend_direction': trend_level.trend_direction,
|
||||
'trend_strength': trend_level.trend_strength,
|
||||
'pivot_count': len(trend_level.pivot_points),
|
||||
'last_pivot': {
|
||||
'timestamp': trend_level.pivot_points[-1].timestamp.isoformat() if trend_level.pivot_points else None,
|
||||
'price': trend_level.pivot_points[-1].price if trend_level.pivot_points else 0.0,
|
||||
'type': trend_level.pivot_points[-1].pivot_type if trend_level.pivot_points else 'none'
|
||||
} if trend_level.pivot_points else None
|
||||
}
|
||||
|
||||
# Vote for overall trend
|
||||
trend_votes[trend_level.trend_direction] += trend_level.trend_strength
|
||||
total_strength += trend_level.trend_strength
|
||||
active_levels += 1
|
||||
|
||||
# Determine overall trend
|
||||
if active_levels > 0:
|
||||
structure['overall_trend'] = max(trend_votes, key=trend_votes.get)
|
||||
structure['overall_strength'] = total_strength / active_levels
|
||||
|
||||
return structure
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting current market structure: {e}")
|
||||
return {
|
||||
'levels': {},
|
||||
'overall_trend': 'sideways',
|
||||
'overall_strength': 0.0,
|
||||
'last_update': datetime.now().isoformat(),
|
||||
'error': str(e)
|
||||
}
|
Reference in New Issue
Block a user