""" Williams Market Structure Implementation This module implements Larry Williams' market structure analysis with recursive pivot points. The system identifies swing highs and swing lows, then uses these pivot points to determine higher-level trends recursively. Key Features: - Recursive pivot point calculation (5 levels) - Swing high/low identification - Trend direction and strength analysis - Integration with CNN model for pivot prediction """ import logging import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, field from collections import deque logger = logging.getLogger(__name__) @dataclass class PivotPoint: """Represents a pivot point in the market structure""" timestamp: datetime price: float pivot_type: str # 'high' or 'low' level: int # Pivot level (1-5) index: int # Index in the original data strength: float = 0.0 # Strength of the pivot (0.0 to 1.0) confirmed: bool = False # Whether the pivot is confirmed @dataclass class TrendLevel: """Represents a trend level in the Williams Market Structure""" level: int pivot_points: List[PivotPoint] trend_direction: str # 'up', 'down', 'sideways' trend_strength: float # 0.0 to 1.0 last_pivot_high: Optional[PivotPoint] = None last_pivot_low: Optional[PivotPoint] = None class WilliamsMarketStructure: """ Implementation of Larry Williams Market Structure Analysis This class implements the recursive pivot point calculation system where: 1. Level 1: Direct swing highs/lows from 1s OHLCV data 2. Level 2-5: Recursive analysis using previous level's pivot points as "candles" """ def __init__(self, min_pivot_distance: int = 3): """ Initialize Williams Market Structure analyzer Args: min_pivot_distance: Minimum distance between pivot points """ self.min_pivot_distance = min_pivot_distance self.pivot_levels: Dict[int, TrendLevel] = {} self.max_levels = 5 logger.info(f"Williams Market Structure initialized with {self.max_levels} levels") def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[int, TrendLevel]: """ Calculate recursive pivot points following Williams Market Structure methodology Args: ohlcv_data: OHLCV data array with shape (N, 6) [timestamp, O, H, L, C, V] Returns: Dictionary of trend levels with pivot points """ try: if len(ohlcv_data) < self.min_pivot_distance * 2 + 1: logger.warning(f"Insufficient data for pivot calculation: {len(ohlcv_data)} bars") return {} # Convert to DataFrame for easier processing df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') # Initialize pivot levels self.pivot_levels = {} # Level 1: Calculate pivot points from raw OHLCV data level_1_pivots = self._calculate_level_1_pivots(df) if level_1_pivots: self.pivot_levels[1] = TrendLevel( level=1, pivot_points=level_1_pivots, trend_direction=self._determine_trend_direction(level_1_pivots), trend_strength=self._calculate_trend_strength(level_1_pivots) ) # Levels 2-5: Recursive calculation using previous level's pivots for level in range(2, self.max_levels + 1): higher_level_pivots = self._calculate_higher_level_pivots(level) if higher_level_pivots: self.pivot_levels[level] = TrendLevel( level=level, pivot_points=higher_level_pivots, trend_direction=self._determine_trend_direction(higher_level_pivots), trend_strength=self._calculate_trend_strength(higher_level_pivots) ) else: break # No more higher level pivots possible logger.debug(f"Calculated {len(self.pivot_levels)} pivot levels") return self.pivot_levels except Exception as e: logger.error(f"Error calculating recursive pivot points: {e}") return {} def _calculate_level_1_pivots(self, df: pd.DataFrame) -> List[PivotPoint]: """ Calculate Level 1 pivot points from raw OHLCV data A swing high is a candle with lower highs on both sides A swing low is a candle with higher lows on both sides """ pivots = [] try: for i in range(self.min_pivot_distance, len(df) - self.min_pivot_distance): current_high = df.iloc[i]['high'] current_low = df.iloc[i]['low'] current_timestamp = df.iloc[i]['timestamp'] # Check for swing high is_swing_high = True for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1): if j != i and df.iloc[j]['high'] >= current_high: is_swing_high = False break if is_swing_high: pivot = PivotPoint( timestamp=current_timestamp, price=current_high, pivot_type='high', level=1, index=i, strength=self._calculate_pivot_strength(df, i, 'high'), confirmed=True ) pivots.append(pivot) continue # Check for swing low is_swing_low = True for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1): if j != i and df.iloc[j]['low'] <= current_low: is_swing_low = False break if is_swing_low: pivot = PivotPoint( timestamp=current_timestamp, price=current_low, pivot_type='low', level=1, index=i, strength=self._calculate_pivot_strength(df, i, 'low'), confirmed=True ) pivots.append(pivot) logger.debug(f"Level 1: Found {len(pivots)} pivot points") return pivots except Exception as e: logger.error(f"Error calculating Level 1 pivots: {e}") return [] def _calculate_higher_level_pivots(self, level: int) -> List[PivotPoint]: """ Calculate higher level pivot points using previous level's pivots as "candles" This is the recursive part of Williams Market Structure where we treat pivot points from the previous level as if they were OHLCV candles """ if level - 1 not in self.pivot_levels: return [] previous_level_pivots = self.pivot_levels[level - 1].pivot_points if len(previous_level_pivots) < self.min_pivot_distance * 2 + 1: return [] pivots = [] try: # Group pivots by type to find swing points highs = [p for p in previous_level_pivots if p.pivot_type == 'high'] lows = [p for p in previous_level_pivots if p.pivot_type == 'low'] # Find swing highs among the high pivots for i in range(self.min_pivot_distance, len(highs) - self.min_pivot_distance): current_pivot = highs[i] # Check if this high is surrounded by lower highs is_swing_high = True for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1): if j != i and j < len(highs) and highs[j].price >= current_pivot.price: is_swing_high = False break if is_swing_high: pivot = PivotPoint( timestamp=current_pivot.timestamp, price=current_pivot.price, pivot_type='high', level=level, index=current_pivot.index, strength=current_pivot.strength * 0.8, # Reduce strength at higher levels confirmed=True ) pivots.append(pivot) # Find swing lows among the low pivots for i in range(self.min_pivot_distance, len(lows) - self.min_pivot_distance): current_pivot = lows[i] # Check if this low is surrounded by higher lows is_swing_low = True for j in range(i - self.min_pivot_distance, i + self.min_pivot_distance + 1): if j != i and j < len(lows) and lows[j].price <= current_pivot.price: is_swing_low = False break if is_swing_low: pivot = PivotPoint( timestamp=current_pivot.timestamp, price=current_pivot.price, pivot_type='low', level=level, index=current_pivot.index, strength=current_pivot.strength * 0.8, # Reduce strength at higher levels confirmed=True ) pivots.append(pivot) # Sort pivots by timestamp pivots.sort(key=lambda x: x.timestamp) logger.debug(f"Level {level}: Found {len(pivots)} pivot points") return pivots except Exception as e: logger.error(f"Error calculating Level {level} pivots: {e}") return [] def _calculate_pivot_strength(self, df: pd.DataFrame, index: int, pivot_type: str) -> float: """ Calculate the strength of a pivot point based on surrounding price action Strength is determined by: - Distance from surrounding highs/lows - Volume at the pivot point - Duration of the pivot formation """ try: if pivot_type == 'high': current_price = df.iloc[index]['high'] # Calculate average of surrounding highs surrounding_prices = [] for i in range(max(0, index - self.min_pivot_distance), min(len(df), index + self.min_pivot_distance + 1)): if i != index: surrounding_prices.append(df.iloc[i]['high']) if surrounding_prices: avg_surrounding = np.mean(surrounding_prices) strength = min(1.0, (current_price - avg_surrounding) / avg_surrounding * 10) else: strength = 0.5 else: # pivot_type == 'low' current_price = df.iloc[index]['low'] # Calculate average of surrounding lows surrounding_prices = [] for i in range(max(0, index - self.min_pivot_distance), min(len(df), index + self.min_pivot_distance + 1)): if i != index: surrounding_prices.append(df.iloc[i]['low']) if surrounding_prices: avg_surrounding = np.mean(surrounding_prices) strength = min(1.0, (avg_surrounding - current_price) / avg_surrounding * 10) else: strength = 0.5 # Factor in volume if available if 'volume' in df.columns and df.iloc[index]['volume'] > 0: avg_volume = df['volume'].rolling(window=20, center=True).mean().iloc[index] if avg_volume > 0: volume_factor = min(2.0, df.iloc[index]['volume'] / avg_volume) strength *= volume_factor return max(0.0, min(1.0, strength)) except Exception as e: logger.error(f"Error calculating pivot strength: {e}") return 0.5 def _determine_trend_direction(self, pivots: List[PivotPoint]) -> str: """ Determine the overall trend direction based on pivot points Trend is determined by comparing recent highs and lows: - Uptrend: Higher highs and higher lows - Downtrend: Lower highs and lower lows - Sideways: Mixed or insufficient data """ if len(pivots) < 4: return 'sideways' try: # Get recent pivots (last 10 or all if less than 10) recent_pivots = pivots[-10:] if len(pivots) >= 10 else pivots highs = [p for p in recent_pivots if p.pivot_type == 'high'] lows = [p for p in recent_pivots if p.pivot_type == 'low'] if len(highs) < 2 or len(lows) < 2: return 'sideways' # Sort by timestamp highs.sort(key=lambda x: x.timestamp) lows.sort(key=lambda x: x.timestamp) # Check for higher highs and higher lows (uptrend) higher_highs = highs[-1].price > highs[-2].price if len(highs) >= 2 else False higher_lows = lows[-1].price > lows[-2].price if len(lows) >= 2 else False # Check for lower highs and lower lows (downtrend) lower_highs = highs[-1].price < highs[-2].price if len(highs) >= 2 else False lower_lows = lows[-1].price < lows[-2].price if len(lows) >= 2 else False if higher_highs and higher_lows: return 'up' elif lower_highs and lower_lows: return 'down' else: return 'sideways' except Exception as e: logger.error(f"Error determining trend direction: {e}") return 'sideways' def _calculate_trend_strength(self, pivots: List[PivotPoint]) -> float: """ Calculate the strength of the current trend Strength is based on: - Consistency of pivot point progression - Average strength of individual pivots - Number of confirming pivots """ if not pivots: return 0.0 try: # Average individual pivot strengths avg_pivot_strength = np.mean([p.strength for p in pivots]) # Factor in number of pivots (more pivots = stronger trend) pivot_count_factor = min(1.0, len(pivots) / 10.0) # Calculate consistency (how well pivots follow the trend) trend_direction = self._determine_trend_direction(pivots) consistency_score = self._calculate_trend_consistency(pivots, trend_direction) # Combine factors trend_strength = (avg_pivot_strength * 0.4 + pivot_count_factor * 0.3 + consistency_score * 0.3) return max(0.0, min(1.0, trend_strength)) except Exception as e: logger.error(f"Error calculating trend strength: {e}") return 0.0 def _calculate_trend_consistency(self, pivots: List[PivotPoint], trend_direction: str) -> float: """ Calculate how consistently the pivots follow the expected trend direction """ if len(pivots) < 4 or trend_direction == 'sideways': return 0.5 try: highs = [p for p in pivots if p.pivot_type == 'high'] lows = [p for p in pivots if p.pivot_type == 'low'] if len(highs) < 2 or len(lows) < 2: return 0.5 # Sort by timestamp highs.sort(key=lambda x: x.timestamp) lows.sort(key=lambda x: x.timestamp) consistent_moves = 0 total_moves = 0 # Check high-to-high moves for i in range(1, len(highs)): total_moves += 1 if trend_direction == 'up' and highs[i].price > highs[i-1].price: consistent_moves += 1 elif trend_direction == 'down' and highs[i].price < highs[i-1].price: consistent_moves += 1 # Check low-to-low moves for i in range(1, len(lows)): total_moves += 1 if trend_direction == 'up' and lows[i].price > lows[i-1].price: consistent_moves += 1 elif trend_direction == 'down' and lows[i].price < lows[i-1].price: consistent_moves += 1 if total_moves == 0: return 0.5 return consistent_moves / total_moves except Exception as e: logger.error(f"Error calculating trend consistency: {e}") return 0.5 def get_pivot_features_for_ml(self, symbol: str = "ETH/USDT") -> np.ndarray: """ Extract pivot point features for machine learning models Returns a feature vector containing: - Recent pivot points (price, strength, type) - Trend direction and strength for each level - Time since last pivot for each level Total features: 250 (50 features per level * 5 levels) """ features = [] try: for level in range(1, self.max_levels + 1): level_features = [] if level in self.pivot_levels: trend_level = self.pivot_levels[level] pivots = trend_level.pivot_points # Get last 5 pivots for this level recent_pivots = pivots[-5:] if len(pivots) >= 5 else pivots # Pad with zeros if we have fewer than 5 pivots while len(recent_pivots) < 5: recent_pivots.insert(0, PivotPoint( timestamp=datetime.now(), price=0.0, pivot_type='high', level=level, index=0, strength=0.0 )) # Extract features for each pivot (8 features per pivot) for pivot in recent_pivots: level_features.extend([ pivot.price, pivot.strength, 1.0 if pivot.pivot_type == 'high' else 0.0, # Pivot type float(pivot.level), 1.0 if pivot.confirmed else 0.0, # Confirmation status float((datetime.now() - pivot.timestamp).total_seconds() / 3600), # Hours since pivot float(pivot.index), # Position in data 0.0 # Reserved for future use ]) # Add trend features (10 features) trend_direction_encoded = { 'up': [1.0, 0.0, 0.0], 'down': [0.0, 1.0, 0.0], 'sideways': [0.0, 0.0, 1.0] }.get(trend_level.trend_direction, [0.0, 0.0, 1.0]) level_features.extend(trend_direction_encoded) level_features.append(trend_level.trend_strength) level_features.extend([0.0] * 6) # Reserved for future use else: # No data for this level, fill with zeros level_features = [0.0] * 50 features.extend(level_features) return np.array(features, dtype=np.float32) except Exception as e: logger.error(f"Error extracting pivot features for ML: {e}") return np.zeros(250, dtype=np.float32) def get_current_market_structure(self) -> Dict[str, Any]: """ Get current market structure summary for dashboard display """ try: structure = { 'levels': {}, 'overall_trend': 'sideways', 'overall_strength': 0.0, 'last_update': datetime.now().isoformat() } # Aggregate information from all levels trend_votes = {'up': 0, 'down': 0, 'sideways': 0} total_strength = 0.0 active_levels = 0 for level, trend_level in self.pivot_levels.items(): structure['levels'][level] = { 'trend_direction': trend_level.trend_direction, 'trend_strength': trend_level.trend_strength, 'pivot_count': len(trend_level.pivot_points), 'last_pivot': { 'timestamp': trend_level.pivot_points[-1].timestamp.isoformat() if trend_level.pivot_points else None, 'price': trend_level.pivot_points[-1].price if trend_level.pivot_points else 0.0, 'type': trend_level.pivot_points[-1].pivot_type if trend_level.pivot_points else 'none' } if trend_level.pivot_points else None } # Vote for overall trend trend_votes[trend_level.trend_direction] += trend_level.trend_strength total_strength += trend_level.trend_strength active_levels += 1 # Determine overall trend if active_levels > 0: structure['overall_trend'] = max(trend_votes, key=trend_votes.get) structure['overall_strength'] = total_strength / active_levels return structure except Exception as e: logger.error(f"Error getting current market structure: {e}") return { 'levels': {}, 'overall_trend': 'sideways', 'overall_strength': 0.0, 'last_update': datetime.now().isoformat(), 'error': str(e) }