555 lines
23 KiB
Python
555 lines
23 KiB
Python
"""
|
|
Williams Market Structure Implementation
|
|
|
|
This module implements Larry Williams' market structure analysis with recursive pivot points.
|
|
The system identifies swing highs and swing lows, then uses these pivot points to determine
|
|
higher-level trends recursively.
|
|
|
|
Key Features:
|
|
- Recursive pivot point calculation (5 levels)
|
|
- Swing high/low identification
|
|
- Trend direction and strength analysis
|
|
- Integration with CNN model for pivot prediction
|
|
"""
|
|
|
|
import logging
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class PivotPoint:
|
|
"""Represents a pivot point in the market structure"""
|
|
timestamp: datetime
|
|
price: float
|
|
pivot_type: str # 'high' or 'low'
|
|
level: int # Pivot level (1-5)
|
|
index: int # Index in the original data
|
|
strength: float = 0.0 # Strength of the pivot (0.0 to 1.0)
|
|
confirmed: bool = False # Whether the pivot is confirmed
|
|
|
|
@dataclass
|
|
class TrendLevel:
|
|
"""Represents a trend level in the Williams Market Structure"""
|
|
level: int
|
|
pivot_points: List[PivotPoint]
|
|
trend_direction: str # 'up', 'down', 'sideways'
|
|
trend_strength: float # 0.0 to 1.0
|
|
last_pivot_high: Optional[PivotPoint] = None
|
|
last_pivot_low: Optional[PivotPoint] = None
|
|
|
|
class WilliamsMarketStructure:
|
|
"""
|
|
Implementation of Larry Williams Market Structure Analysis
|
|
|
|
This class implements the recursive pivot point calculation system where:
|
|
1. Level 1: Direct swing highs/lows from 1s OHLCV data
|
|
2. Level 2-5: Recursive analysis using previous level's pivot points as "candles"
|
|
"""
|
|
|
|
def __init__(self, min_pivot_distance: int = 3):
|
|
"""
|
|
Initialize Williams Market Structure analyzer
|
|
|
|
Args:
|
|
min_pivot_distance: Minimum distance between pivot points
|
|
"""
|
|
self.min_pivot_distance = min_pivot_distance
|
|
self.pivot_levels: Dict[int, TrendLevel] = {}
|
|
self.max_levels = 5
|
|
|
|
logger.info(f"Williams Market Structure initialized with {self.max_levels} levels")
|
|
|
|
def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[int, TrendLevel]:
|
|
"""
|
|
Calculate recursive pivot points following Williams Market Structure methodology
|
|
|
|
Args:
|
|
ohlcv_data: OHLCV data array with shape (N, 6) [timestamp, O, H, L, C, V]
|
|
|
|
Returns:
|
|
Dictionary of trend levels with pivot points
|
|
"""
|
|
try:
|
|
if len(ohlcv_data) < self.min_pivot_distance * 2 + 1:
|
|
logger.warning(f"Insufficient data for pivot calculation: {len(ohlcv_data)} bars")
|
|
return {}
|
|
|
|
# Convert to DataFrame for easier processing
|
|
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
|
|
# Initialize pivot levels
|
|
self.pivot_levels = {}
|
|
|
|
# Level 1: Calculate pivot points from raw OHLCV data
|
|
level_1_pivots = self._calculate_level_1_pivots(df)
|
|
if level_1_pivots:
|
|
self.pivot_levels[1] = TrendLevel(
|
|
level=1,
|
|
pivot_points=level_1_pivots,
|
|
trend_direction=self._determine_trend_direction(level_1_pivots),
|
|
trend_strength=self._calculate_trend_strength(level_1_pivots)
|
|
)
|
|
|
|
# Levels 2-5: Recursive calculation using previous level's pivots
|
|
for level in range(2, self.max_levels + 1):
|
|
higher_level_pivots = self._calculate_higher_level_pivots(level)
|
|
if higher_level_pivots:
|
|
self.pivot_levels[level] = TrendLevel(
|
|
level=level,
|
|
pivot_points=higher_level_pivots,
|
|
trend_direction=self._determine_trend_direction(higher_level_pivots),
|
|
trend_strength=self._calculate_trend_strength(higher_level_pivots)
|
|
)
|
|
else:
|
|
break # No more higher level pivots possible
|
|
|
|
logger.debug(f"Calculated {len(self.pivot_levels)} pivot levels")
|
|
return self.pivot_levels
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating recursive pivot points: {e}")
|
|
return {}
|
|
|
|
def _calculate_level_1_pivots(self, df: pd.DataFrame) -> List[PivotPoint]:
|
|
"""
|
|
Calculate Level 1 pivot points from raw OHLCV data
|
|
|
|
A swing high is a candle with lower highs on both sides
|
|
A swing low is a candle with higher lows on both sides
|
|
"""
|
|
pivots = []
|
|
|
|
try:
|
|
for i in range(self.min_pivot_distance, len(df) - self.min_pivot_distance):
|
|
current_high = df.iloc[i]['high']
|
|
current_low = df.iloc[i]['low']
|
|
current_timestamp = df.iloc[i]['timestamp']
|
|
|
|
# Check for swing high
|
|
is_swing_high = True
|
|
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
|
if j != i and df.iloc[j]['high'] >= current_high:
|
|
is_swing_high = False
|
|
break
|
|
|
|
if is_swing_high:
|
|
pivot = PivotPoint(
|
|
timestamp=current_timestamp,
|
|
price=current_high,
|
|
pivot_type='high',
|
|
level=1,
|
|
index=i,
|
|
strength=self._calculate_pivot_strength(df, i, 'high'),
|
|
confirmed=True
|
|
)
|
|
pivots.append(pivot)
|
|
continue
|
|
|
|
# Check for swing low
|
|
is_swing_low = True
|
|
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
|
if j != i and df.iloc[j]['low'] <= current_low:
|
|
is_swing_low = False
|
|
break
|
|
|
|
if is_swing_low:
|
|
pivot = PivotPoint(
|
|
timestamp=current_timestamp,
|
|
price=current_low,
|
|
pivot_type='low',
|
|
level=1,
|
|
index=i,
|
|
strength=self._calculate_pivot_strength(df, i, 'low'),
|
|
confirmed=True
|
|
)
|
|
pivots.append(pivot)
|
|
|
|
logger.debug(f"Level 1: Found {len(pivots)} pivot points")
|
|
return pivots
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating Level 1 pivots: {e}")
|
|
return []
|
|
|
|
def _calculate_higher_level_pivots(self, level: int) -> List[PivotPoint]:
|
|
"""
|
|
Calculate higher level pivot points using previous level's pivots as "candles"
|
|
|
|
This is the recursive part of Williams Market Structure where we treat
|
|
pivot points from the previous level as if they were OHLCV candles
|
|
"""
|
|
if level - 1 not in self.pivot_levels:
|
|
return []
|
|
|
|
previous_level_pivots = self.pivot_levels[level - 1].pivot_points
|
|
if len(previous_level_pivots) < self.min_pivot_distance * 2 + 1:
|
|
return []
|
|
|
|
pivots = []
|
|
|
|
try:
|
|
# Group pivots by type to find swing points
|
|
highs = [p for p in previous_level_pivots if p.pivot_type == 'high']
|
|
lows = [p for p in previous_level_pivots if p.pivot_type == 'low']
|
|
|
|
# Find swing highs among the high pivots
|
|
for i in range(self.min_pivot_distance, len(highs) - self.min_pivot_distance):
|
|
current_pivot = highs[i]
|
|
|
|
# Check if this high is surrounded by lower highs
|
|
is_swing_high = True
|
|
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
|
if j != i and j < len(highs) and highs[j].price >= current_pivot.price:
|
|
is_swing_high = False
|
|
break
|
|
|
|
if is_swing_high:
|
|
pivot = PivotPoint(
|
|
timestamp=current_pivot.timestamp,
|
|
price=current_pivot.price,
|
|
pivot_type='high',
|
|
level=level,
|
|
index=current_pivot.index,
|
|
strength=current_pivot.strength * 0.8, # Reduce strength at higher levels
|
|
confirmed=True
|
|
)
|
|
pivots.append(pivot)
|
|
|
|
# Find swing lows among the low pivots
|
|
for i in range(self.min_pivot_distance, len(lows) - self.min_pivot_distance):
|
|
current_pivot = lows[i]
|
|
|
|
# Check if this low is surrounded by higher lows
|
|
is_swing_low = True
|
|
for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1):
|
|
if j != i and j < len(lows) and lows[j].price <= current_pivot.price:
|
|
is_swing_low = False
|
|
break
|
|
|
|
if is_swing_low:
|
|
pivot = PivotPoint(
|
|
timestamp=current_pivot.timestamp,
|
|
price=current_pivot.price,
|
|
pivot_type='low',
|
|
level=level,
|
|
index=current_pivot.index,
|
|
strength=current_pivot.strength * 0.8, # Reduce strength at higher levels
|
|
confirmed=True
|
|
)
|
|
pivots.append(pivot)
|
|
|
|
# Sort pivots by timestamp
|
|
pivots.sort(key=lambda x: x.timestamp)
|
|
|
|
logger.debug(f"Level {level}: Found {len(pivots)} pivot points")
|
|
return pivots
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating Level {level} pivots: {e}")
|
|
return []
|
|
|
|
def _calculate_pivot_strength(self, df: pd.DataFrame, index: int, pivot_type: str) -> float:
|
|
"""
|
|
Calculate the strength of a pivot point based on surrounding price action
|
|
|
|
Strength is determined by:
|
|
- Distance from surrounding highs/lows
|
|
- Volume at the pivot point
|
|
- Duration of the pivot formation
|
|
"""
|
|
try:
|
|
if pivot_type == 'high':
|
|
current_price = df.iloc[index]['high']
|
|
# Calculate average of surrounding highs
|
|
surrounding_prices = []
|
|
for i in range(max(0, index - self.min_pivot_distance),
|
|
min(len(df), index + self.min_pivot_distance + 1)):
|
|
if i != index:
|
|
surrounding_prices.append(df.iloc[i]['high'])
|
|
|
|
if surrounding_prices:
|
|
avg_surrounding = np.mean(surrounding_prices)
|
|
strength = min(1.0, (current_price - avg_surrounding) / avg_surrounding * 10)
|
|
else:
|
|
strength = 0.5
|
|
else: # pivot_type == 'low'
|
|
current_price = df.iloc[index]['low']
|
|
# Calculate average of surrounding lows
|
|
surrounding_prices = []
|
|
for i in range(max(0, index - self.min_pivot_distance),
|
|
min(len(df), index + self.min_pivot_distance + 1)):
|
|
if i != index:
|
|
surrounding_prices.append(df.iloc[i]['low'])
|
|
|
|
if surrounding_prices:
|
|
avg_surrounding = np.mean(surrounding_prices)
|
|
strength = min(1.0, (avg_surrounding - current_price) / avg_surrounding * 10)
|
|
else:
|
|
strength = 0.5
|
|
|
|
# Factor in volume if available
|
|
if 'volume' in df.columns and df.iloc[index]['volume'] > 0:
|
|
avg_volume = df['volume'].rolling(window=20, center=True).mean().iloc[index]
|
|
if avg_volume > 0:
|
|
volume_factor = min(2.0, df.iloc[index]['volume'] / avg_volume)
|
|
strength *= volume_factor
|
|
|
|
return max(0.0, min(1.0, strength))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating pivot strength: {e}")
|
|
return 0.5
|
|
|
|
def _determine_trend_direction(self, pivots: List[PivotPoint]) -> str:
|
|
"""
|
|
Determine the overall trend direction based on pivot points
|
|
|
|
Trend is determined by comparing recent highs and lows:
|
|
- Uptrend: Higher highs and higher lows
|
|
- Downtrend: Lower highs and lower lows
|
|
- Sideways: Mixed or insufficient data
|
|
"""
|
|
if len(pivots) < 4:
|
|
return 'sideways'
|
|
|
|
try:
|
|
# Get recent pivots (last 10 or all if less than 10)
|
|
recent_pivots = pivots[-10:] if len(pivots) >= 10 else pivots
|
|
|
|
highs = [p for p in recent_pivots if p.pivot_type == 'high']
|
|
lows = [p for p in recent_pivots if p.pivot_type == 'low']
|
|
|
|
if len(highs) < 2 or len(lows) < 2:
|
|
return 'sideways'
|
|
|
|
# Sort by timestamp
|
|
highs.sort(key=lambda x: x.timestamp)
|
|
lows.sort(key=lambda x: x.timestamp)
|
|
|
|
# Check for higher highs and higher lows (uptrend)
|
|
higher_highs = highs[-1].price > highs[-2].price if len(highs) >= 2 else False
|
|
higher_lows = lows[-1].price > lows[-2].price if len(lows) >= 2 else False
|
|
|
|
# Check for lower highs and lower lows (downtrend)
|
|
lower_highs = highs[-1].price < highs[-2].price if len(highs) >= 2 else False
|
|
lower_lows = lows[-1].price < lows[-2].price if len(lows) >= 2 else False
|
|
|
|
if higher_highs and higher_lows:
|
|
return 'up'
|
|
elif lower_highs and lower_lows:
|
|
return 'down'
|
|
else:
|
|
return 'sideways'
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error determining trend direction: {e}")
|
|
return 'sideways'
|
|
|
|
def _calculate_trend_strength(self, pivots: List[PivotPoint]) -> float:
|
|
"""
|
|
Calculate the strength of the current trend
|
|
|
|
Strength is based on:
|
|
- Consistency of pivot point progression
|
|
- Average strength of individual pivots
|
|
- Number of confirming pivots
|
|
"""
|
|
if not pivots:
|
|
return 0.0
|
|
|
|
try:
|
|
# Average individual pivot strengths
|
|
avg_pivot_strength = np.mean([p.strength for p in pivots])
|
|
|
|
# Factor in number of pivots (more pivots = stronger trend)
|
|
pivot_count_factor = min(1.0, len(pivots) / 10.0)
|
|
|
|
# Calculate consistency (how well pivots follow the trend)
|
|
trend_direction = self._determine_trend_direction(pivots)
|
|
consistency_score = self._calculate_trend_consistency(pivots, trend_direction)
|
|
|
|
# Combine factors
|
|
trend_strength = (avg_pivot_strength * 0.4 +
|
|
pivot_count_factor * 0.3 +
|
|
consistency_score * 0.3)
|
|
|
|
return max(0.0, min(1.0, trend_strength))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating trend strength: {e}")
|
|
return 0.0
|
|
|
|
def _calculate_trend_consistency(self, pivots: List[PivotPoint], trend_direction: str) -> float:
|
|
"""
|
|
Calculate how consistently the pivots follow the expected trend direction
|
|
"""
|
|
if len(pivots) < 4 or trend_direction == 'sideways':
|
|
return 0.5
|
|
|
|
try:
|
|
highs = [p for p in pivots if p.pivot_type == 'high']
|
|
lows = [p for p in pivots if p.pivot_type == 'low']
|
|
|
|
if len(highs) < 2 or len(lows) < 2:
|
|
return 0.5
|
|
|
|
# Sort by timestamp
|
|
highs.sort(key=lambda x: x.timestamp)
|
|
lows.sort(key=lambda x: x.timestamp)
|
|
|
|
consistent_moves = 0
|
|
total_moves = 0
|
|
|
|
# Check high-to-high moves
|
|
for i in range(1, len(highs)):
|
|
total_moves += 1
|
|
if trend_direction == 'up' and highs[i].price > highs[i-1].price:
|
|
consistent_moves += 1
|
|
elif trend_direction == 'down' and highs[i].price < highs[i-1].price:
|
|
consistent_moves += 1
|
|
|
|
# Check low-to-low moves
|
|
for i in range(1, len(lows)):
|
|
total_moves += 1
|
|
if trend_direction == 'up' and lows[i].price > lows[i-1].price:
|
|
consistent_moves += 1
|
|
elif trend_direction == 'down' and lows[i].price < lows[i-1].price:
|
|
consistent_moves += 1
|
|
|
|
if total_moves == 0:
|
|
return 0.5
|
|
|
|
return consistent_moves / total_moves
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating trend consistency: {e}")
|
|
return 0.5
|
|
|
|
def get_pivot_features_for_ml(self, symbol: str = "ETH/USDT") -> np.ndarray:
|
|
"""
|
|
Extract pivot point features for machine learning models
|
|
|
|
Returns a feature vector containing:
|
|
- Recent pivot points (price, strength, type)
|
|
- Trend direction and strength for each level
|
|
- Time since last pivot for each level
|
|
|
|
Total features: 250 (50 features per level * 5 levels)
|
|
"""
|
|
features = []
|
|
|
|
try:
|
|
for level in range(1, self.max_levels + 1):
|
|
level_features = []
|
|
|
|
if level in self.pivot_levels:
|
|
trend_level = self.pivot_levels[level]
|
|
pivots = trend_level.pivot_points
|
|
|
|
# Get last 5 pivots for this level
|
|
recent_pivots = pivots[-5:] if len(pivots) >= 5 else pivots
|
|
|
|
# Pad with zeros if we have fewer than 5 pivots
|
|
while len(recent_pivots) < 5:
|
|
recent_pivots.insert(0, PivotPoint(
|
|
timestamp=datetime.now(),
|
|
price=0.0,
|
|
pivot_type='high',
|
|
level=level,
|
|
index=0,
|
|
strength=0.0
|
|
))
|
|
|
|
# Extract features for each pivot (8 features per pivot)
|
|
for pivot in recent_pivots:
|
|
level_features.extend([
|
|
pivot.price,
|
|
pivot.strength,
|
|
1.0 if pivot.pivot_type == 'high' else 0.0, # Pivot type
|
|
float(pivot.level),
|
|
1.0 if pivot.confirmed else 0.0, # Confirmation status
|
|
float((datetime.now() - pivot.timestamp).total_seconds() / 3600), # Hours since pivot
|
|
float(pivot.index), # Position in data
|
|
0.0 # Reserved for future use
|
|
])
|
|
|
|
# Add trend features (10 features)
|
|
trend_direction_encoded = {
|
|
'up': [1.0, 0.0, 0.0],
|
|
'down': [0.0, 1.0, 0.0],
|
|
'sideways': [0.0, 0.0, 1.0]
|
|
}.get(trend_level.trend_direction, [0.0, 0.0, 1.0])
|
|
|
|
level_features.extend(trend_direction_encoded)
|
|
level_features.append(trend_level.trend_strength)
|
|
level_features.extend([0.0] * 6) # Reserved for future use
|
|
|
|
else:
|
|
# No data for this level, fill with zeros
|
|
level_features = [0.0] * 50
|
|
|
|
features.extend(level_features)
|
|
|
|
return np.array(features, dtype=np.float32)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting pivot features for ML: {e}")
|
|
return np.zeros(250, dtype=np.float32)
|
|
|
|
def get_current_market_structure(self) -> Dict[str, Any]:
|
|
"""
|
|
Get current market structure summary for dashboard display
|
|
"""
|
|
try:
|
|
structure = {
|
|
'levels': {},
|
|
'overall_trend': 'sideways',
|
|
'overall_strength': 0.0,
|
|
'last_update': datetime.now().isoformat()
|
|
}
|
|
|
|
# Aggregate information from all levels
|
|
trend_votes = {'up': 0, 'down': 0, 'sideways': 0}
|
|
total_strength = 0.0
|
|
active_levels = 0
|
|
|
|
for level, trend_level in self.pivot_levels.items():
|
|
structure['levels'][level] = {
|
|
'trend_direction': trend_level.trend_direction,
|
|
'trend_strength': trend_level.trend_strength,
|
|
'pivot_count': len(trend_level.pivot_points),
|
|
'last_pivot': {
|
|
'timestamp': trend_level.pivot_points[-1].timestamp.isoformat() if trend_level.pivot_points else None,
|
|
'price': trend_level.pivot_points[-1].price if trend_level.pivot_points else 0.0,
|
|
'type': trend_level.pivot_points[-1].pivot_type if trend_level.pivot_points else 'none'
|
|
} if trend_level.pivot_points else None
|
|
}
|
|
|
|
# Vote for overall trend
|
|
trend_votes[trend_level.trend_direction] += trend_level.trend_strength
|
|
total_strength += trend_level.trend_strength
|
|
active_levels += 1
|
|
|
|
# Determine overall trend
|
|
if active_levels > 0:
|
|
structure['overall_trend'] = max(trend_votes, key=trend_votes.get)
|
|
structure['overall_strength'] = total_strength / active_levels
|
|
|
|
return structure
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting current market structure: {e}")
|
|
return {
|
|
'levels': {},
|
|
'overall_trend': 'sideways',
|
|
'overall_strength': 0.0,
|
|
'last_update': datetime.now().isoformat(),
|
|
'error': str(e)
|
|
} |