#!/usr/bin/env python3 """ Williams Market Structure Implementation Recursive pivot point detection for nested market structure analysis """ import numpy as np import pandas as pd from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class SwingPoint: """Represents a swing high or low point""" price: float timestamp: int index: int swing_type: str # 'high' or 'low' @dataclass class PivotLevel: """Represents a complete pivot level with swing points and analysis""" swing_points: List[SwingPoint] support_levels: List[float] resistance_levels: List[float] trend_direction: str trend_strength: float class WilliamsMarketStructure: """Implementation of Larry Williams market structure analysis with recursive pivot detection""" def __init__(self, swing_strengths: List[int] = None, enable_cnn_feature: bool = False): """ Initialize Williams Market Structure analyzer Args: swing_strengths: List of swing strengths to detect (e.g., [2, 3, 5, 8]) enable_cnn_feature: Whether to enable CNN training features """ self.swing_strengths = swing_strengths or [2, 3, 5, 8] self.enable_cnn_feature = enable_cnn_feature self.min_swing_points = 5 # Minimum points needed for recursive analysis def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[str, PivotLevel]: """ Calculate 5 levels of recursive pivot points using Williams Market Structure Args: ohlcv_data: OHLCV data as numpy array with columns [timestamp, open, high, low, close, volume] Returns: Dict with keys 'level_0' through 'level_4' containing PivotLevel objects """ try: logger.info(f"Starting recursive pivot analysis on {len(ohlcv_data)} candles") levels = {} current_data = ohlcv_data.copy() for level in range(5): logger.debug(f"Processing level {level} with {len(current_data)} data points") # Find swing points for this level swing_points = self._find_swing_points(current_data, strength=self.swing_strengths[min(level, len(self.swing_strengths)-1)]) if not swing_points or len(swing_points) < self.min_swing_points: logger.warning(f"Insufficient swing points at level {level} ({len(swing_points) if swing_points else 0}), stopping recursion") break # Determine trend direction and strength trend_direction = self._determine_trend_direction(swing_points) trend_strength = self._calculate_trend_strength(swing_points) # Extract support and resistance levels support_levels, resistance_levels = self._extract_support_resistance(swing_points) # Create pivot level pivot_level = PivotLevel( swing_points=swing_points, support_levels=support_levels, resistance_levels=resistance_levels, trend_direction=trend_direction, trend_strength=trend_strength ) levels[f'level_{level}'] = pivot_level # Prepare data for next level (convert swing points back to OHLCV format) if level < 4 and len(swing_points) >= self.min_swing_points: current_data = self._convert_swings_to_ohlcv(swing_points) else: break logger.info(f"Completed recursive pivot analysis, generated {len(levels)} levels") return levels except Exception as e: logger.error(f"Error in recursive pivot calculation: {e}") return {} def _find_swing_points(self, ohlcv_data: np.ndarray, strength: int = 3) -> List[SwingPoint]: """ Find swing high and low points using the specified strength Args: ohlcv_data: OHLCV data array strength: Number of candles on each side to compare (higher = more significant swings) Returns: List of SwingPoint objects """ try: if len(ohlcv_data) < strength * 2 + 1: return [] swing_points = [] highs = ohlcv_data[:, 2] # High prices lows = ohlcv_data[:, 3] # Low prices timestamps = ohlcv_data[:, 0].astype(int) for i in range(strength, len(ohlcv_data) - strength): # Check for swing high is_swing_high = True for j in range(1, strength + 1): if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]: is_swing_high = False break if is_swing_high: swing_points.append(SwingPoint( price=float(highs[i]), timestamp=int(timestamps[i]), index=i, swing_type='high' )) # Check for swing low is_swing_low = True for j in range(1, strength + 1): if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]: is_swing_low = False break if is_swing_low: swing_points.append(SwingPoint( price=float(lows[i]), timestamp=int(timestamps[i]), index=i, swing_type='low' )) # Sort by timestamp swing_points.sort(key=lambda x: x.timestamp) logger.debug(f"Found {len(swing_points)} swing points with strength {strength}") return swing_points except Exception as e: logger.error(f"Error finding swing points: {e}") return [] def _determine_trend_direction(self, swing_points: List[SwingPoint]) -> str: """ Determine overall trend direction from swing points Returns: 'UPTREND', 'DOWNTREND', or 'SIDEWAYS' """ try: if len(swing_points) < 3: return 'SIDEWAYS' # Analyze the sequence of highs and lows highs = [sp for sp in swing_points if sp.swing_type == 'high'] lows = [sp for sp in swing_points if sp.swing_type == 'low'] if len(highs) < 2 or len(lows) < 2: return 'SIDEWAYS' # Check if higher highs and higher lows (uptrend) recent_highs = sorted(highs[-3:], key=lambda x: x.price) recent_lows = sorted(lows[-3:], key=lambda x: x.price) if (recent_highs[-1].price > recent_highs[0].price and recent_lows[-1].price > recent_lows[0].price): return 'UPTREND' # Check if lower highs and lower lows (downtrend) if (recent_highs[-1].price < recent_highs[0].price and recent_lows[-1].price < recent_lows[0].price): return 'DOWNTREND' return 'SIDEWAYS' except Exception as e: logger.error(f"Error determining trend direction: {e}") return 'SIDEWAYS' def _calculate_trend_strength(self, swing_points: List[SwingPoint]) -> float: """ Calculate trend strength based on swing point consistency Returns: Float between 0.0 and 1.0 indicating trend strength """ try: if len(swing_points) < 5: return 0.0 # Calculate price movement consistency prices = [sp.price for sp in swing_points] direction_changes = 0 for i in range(2, len(prices)): prev_diff = prices[i-1] - prices[i-2] curr_diff = prices[i] - prices[i-1] if (prev_diff > 0 and curr_diff < 0) or (prev_diff < 0 and curr_diff > 0): direction_changes += 1 # Lower direction changes = stronger trend consistency = 1.0 - (direction_changes / max(1, len(prices) - 2)) return max(0.0, min(1.0, consistency)) except Exception as e: logger.error(f"Error calculating trend strength: {e}") return 0.0 def _extract_support_resistance(self, swing_points: List[SwingPoint]) -> Tuple[List[float], List[float]]: """ Extract support and resistance levels from swing points Returns: Tuple of (support_levels, resistance_levels) """ try: highs = [sp.price for sp in swing_points if sp.swing_type == 'high'] lows = [sp.price for sp in swing_points if sp.swing_type == 'low'] # Remove duplicates and sort support_levels = sorted(list(set(lows))) resistance_levels = sorted(list(set(highs))) return support_levels, resistance_levels except Exception as e: logger.error(f"Error extracting support/resistance: {e}") return [], [] def _convert_swings_to_ohlcv(self, swing_points: List[SwingPoint]) -> np.ndarray: """ Convert swing points back to OHLCV format for next level analysis Args: swing_points: List of swing points from current level Returns: OHLCV array for next level processing """ try: if len(swing_points) < 2: return np.array([]) # Sort by timestamp swing_points.sort(key=lambda x: x.timestamp) ohlcv_list = [] for i, swing in enumerate(swing_points): # Create OHLCV bar from swing point # Use swing price for O, H, L, C ohlcv_bar = [ swing.timestamp, # timestamp swing.price, # open swing.price, # high swing.price, # low swing.price, # close 0.0 # volume (not applicable for swing points) ] ohlcv_list.append(ohlcv_bar) return np.array(ohlcv_list, dtype=np.float64) except Exception as e: logger.error(f"Error converting swings to OHLCV: {e}") return np.array([]) def analyze_pivot_context(self, current_price: float, pivot_levels: Dict[str, PivotLevel]) -> Dict[str, Any]: """ Analyze current price position relative to pivot levels Args: current_price: Current market price pivot_levels: Dictionary of pivot levels Returns: Analysis results including nearest supports/resistances and context """ try: analysis = { 'current_price': current_price, 'nearest_support': None, 'nearest_resistance': None, 'support_distance': float('inf'), 'resistance_distance': float('inf'), 'pivot_context': 'NEUTRAL', 'nested_level': None } all_supports = [] all_resistances = [] # Collect all pivot levels for level_name, level_data in pivot_levels.items(): all_supports.extend(level_data.support_levels) all_resistances.extend(level_data.resistance_levels) # Find nearest support for support in sorted(set(all_supports)): distance = current_price - support if distance > 0 and distance < analysis['support_distance']: analysis['nearest_support'] = support analysis['support_distance'] = distance # Find nearest resistance for resistance in sorted(set(all_resistances)): distance = resistance - current_price if distance > 0 and distance < analysis['resistance_distance']: analysis['nearest_resistance'] = resistance analysis['resistance_distance'] = distance # Determine pivot context if analysis['nearest_resistance'] and analysis['nearest_support']: resistance_dist = analysis['resistance_distance'] support_dist = analysis['support_distance'] if resistance_dist < support_dist * 0.5: analysis['pivot_context'] = 'NEAR_RESISTANCE' elif support_dist < resistance_dist * 0.5: analysis['pivot_context'] = 'NEAR_SUPPORT' else: analysis['pivot_context'] = 'MID_RANGE' return analysis except Exception as e: logger.error(f"Error analyzing pivot context: {e}") return analysis