main cleanup

This commit is contained in:
Dobromir Popov
2025-09-30 23:56:36 +03:00
parent 468a2c2a66
commit 608da8233f
52 changed files with 5308 additions and 9985 deletions

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
Williams Market Structure Implementation
Recursive pivot point detection for nested market structure analysis
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SwingPoint:
"""Represents a swing high or low point"""
price: float
timestamp: int
index: int
swing_type: str # 'high' or 'low'
@dataclass
class PivotLevel:
"""Represents a complete pivot level with swing points and analysis"""
swing_points: List[SwingPoint]
support_levels: List[float]
resistance_levels: List[float]
trend_direction: str
trend_strength: float
class WilliamsMarketStructure:
"""Implementation of Larry Williams market structure analysis with recursive pivot detection"""
def __init__(self, swing_strengths: List[int] = None, enable_cnn_feature: bool = False):
"""
Initialize Williams Market Structure analyzer
Args:
swing_strengths: List of swing strengths to detect (e.g., [2, 3, 5, 8])
enable_cnn_feature: Whether to enable CNN training features
"""
self.swing_strengths = swing_strengths or [2, 3, 5, 8]
self.enable_cnn_feature = enable_cnn_feature
self.min_swing_points = 5 # Minimum points needed for recursive analysis
def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[str, PivotLevel]:
"""
Calculate 5 levels of recursive pivot points using Williams Market Structure
Args:
ohlcv_data: OHLCV data as numpy array with columns [timestamp, open, high, low, close, volume]
Returns:
Dict with keys 'level_0' through 'level_4' containing PivotLevel objects
"""
try:
logger.info(f"Starting recursive pivot analysis on {len(ohlcv_data)} candles")
levels = {}
current_data = ohlcv_data.copy()
for level in range(5):
logger.debug(f"Processing level {level} with {len(current_data)} data points")
# Find swing points for this level
swing_points = self._find_swing_points(current_data, strength=self.swing_strengths[min(level, len(self.swing_strengths)-1)])
if not swing_points or len(swing_points) < self.min_swing_points:
logger.warning(f"Insufficient swing points at level {level} ({len(swing_points) if swing_points else 0}), stopping recursion")
break
# Determine trend direction and strength
trend_direction = self._determine_trend_direction(swing_points)
trend_strength = self._calculate_trend_strength(swing_points)
# Extract support and resistance levels
support_levels, resistance_levels = self._extract_support_resistance(swing_points)
# Create pivot level
pivot_level = PivotLevel(
swing_points=swing_points,
support_levels=support_levels,
resistance_levels=resistance_levels,
trend_direction=trend_direction,
trend_strength=trend_strength
)
levels[f'level_{level}'] = pivot_level
# Prepare data for next level (convert swing points back to OHLCV format)
if level < 4 and len(swing_points) >= self.min_swing_points:
current_data = self._convert_swings_to_ohlcv(swing_points)
else:
break
logger.info(f"Completed recursive pivot analysis, generated {len(levels)} levels")
return levels
except Exception as e:
logger.error(f"Error in recursive pivot calculation: {e}")
return {}
def _find_swing_points(self, ohlcv_data: np.ndarray, strength: int = 3) -> List[SwingPoint]:
"""
Find swing high and low points using the specified strength
Args:
ohlcv_data: OHLCV data array
strength: Number of candles on each side to compare (higher = more significant swings)
Returns:
List of SwingPoint objects
"""
try:
if len(ohlcv_data) < strength * 2 + 1:
return []
swing_points = []
highs = ohlcv_data[:, 2] # High prices
lows = ohlcv_data[:, 3] # Low prices
timestamps = ohlcv_data[:, 0].astype(int)
for i in range(strength, len(ohlcv_data) - strength):
# Check for swing high
is_swing_high = True
for j in range(1, strength + 1):
if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
is_swing_high = False
break
if is_swing_high:
swing_points.append(SwingPoint(
price=float(highs[i]),
timestamp=int(timestamps[i]),
index=i,
swing_type='high'
))
# Check for swing low
is_swing_low = True
for j in range(1, strength + 1):
if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
is_swing_low = False
break
if is_swing_low:
swing_points.append(SwingPoint(
price=float(lows[i]),
timestamp=int(timestamps[i]),
index=i,
swing_type='low'
))
# Sort by timestamp
swing_points.sort(key=lambda x: x.timestamp)
logger.debug(f"Found {len(swing_points)} swing points with strength {strength}")
return swing_points
except Exception as e:
logger.error(f"Error finding swing points: {e}")
return []
def _determine_trend_direction(self, swing_points: List[SwingPoint]) -> str:
"""
Determine overall trend direction from swing points
Returns:
'UPTREND', 'DOWNTREND', or 'SIDEWAYS'
"""
try:
if len(swing_points) < 3:
return 'SIDEWAYS'
# Analyze the sequence of highs and lows
highs = [sp for sp in swing_points if sp.swing_type == 'high']
lows = [sp for sp in swing_points if sp.swing_type == 'low']
if len(highs) < 2 or len(lows) < 2:
return 'SIDEWAYS'
# Check if higher highs and higher lows (uptrend)
recent_highs = sorted(highs[-3:], key=lambda x: x.price)
recent_lows = sorted(lows[-3:], key=lambda x: x.price)
if (recent_highs[-1].price > recent_highs[0].price and
recent_lows[-1].price > recent_lows[0].price):
return 'UPTREND'
# Check if lower highs and lower lows (downtrend)
if (recent_highs[-1].price < recent_highs[0].price and
recent_lows[-1].price < recent_lows[0].price):
return 'DOWNTREND'
return 'SIDEWAYS'
except Exception as e:
logger.error(f"Error determining trend direction: {e}")
return 'SIDEWAYS'
def _calculate_trend_strength(self, swing_points: List[SwingPoint]) -> float:
"""
Calculate trend strength based on swing point consistency
Returns:
Float between 0.0 and 1.0 indicating trend strength
"""
try:
if len(swing_points) < 5:
return 0.0
# Calculate price movement consistency
prices = [sp.price for sp in swing_points]
direction_changes = 0
for i in range(2, len(prices)):
prev_diff = prices[i-1] - prices[i-2]
curr_diff = prices[i] - prices[i-1]
if (prev_diff > 0 and curr_diff < 0) or (prev_diff < 0 and curr_diff > 0):
direction_changes += 1
# Lower direction changes = stronger trend
consistency = 1.0 - (direction_changes / max(1, len(prices) - 2))
return max(0.0, min(1.0, consistency))
except Exception as e:
logger.error(f"Error calculating trend strength: {e}")
return 0.0
def _extract_support_resistance(self, swing_points: List[SwingPoint]) -> Tuple[List[float], List[float]]:
"""
Extract support and resistance levels from swing points
Returns:
Tuple of (support_levels, resistance_levels)
"""
try:
highs = [sp.price for sp in swing_points if sp.swing_type == 'high']
lows = [sp.price for sp in swing_points if sp.swing_type == 'low']
# Remove duplicates and sort
support_levels = sorted(list(set(lows)))
resistance_levels = sorted(list(set(highs)))
return support_levels, resistance_levels
except Exception as e:
logger.error(f"Error extracting support/resistance: {e}")
return [], []
def _convert_swings_to_ohlcv(self, swing_points: List[SwingPoint]) -> np.ndarray:
"""
Convert swing points back to OHLCV format for next level analysis
Args:
swing_points: List of swing points from current level
Returns:
OHLCV array for next level processing
"""
try:
if len(swing_points) < 2:
return np.array([])
# Sort by timestamp
swing_points.sort(key=lambda x: x.timestamp)
ohlcv_list = []
for i, swing in enumerate(swing_points):
# Create OHLCV bar from swing point
# Use swing price for O, H, L, C
ohlcv_bar = [
swing.timestamp, # timestamp
swing.price, # open
swing.price, # high
swing.price, # low
swing.price, # close
0.0 # volume (not applicable for swing points)
]
ohlcv_list.append(ohlcv_bar)
return np.array(ohlcv_list, dtype=np.float64)
except Exception as e:
logger.error(f"Error converting swings to OHLCV: {e}")
return np.array([])
def analyze_pivot_context(self, current_price: float, pivot_levels: Dict[str, PivotLevel]) -> Dict[str, Any]:
"""
Analyze current price position relative to pivot levels
Args:
current_price: Current market price
pivot_levels: Dictionary of pivot levels
Returns:
Analysis results including nearest supports/resistances and context
"""
try:
analysis = {
'current_price': current_price,
'nearest_support': None,
'nearest_resistance': None,
'support_distance': float('inf'),
'resistance_distance': float('inf'),
'pivot_context': 'NEUTRAL',
'nested_level': None
}
all_supports = []
all_resistances = []
# Collect all pivot levels
for level_name, level_data in pivot_levels.items():
all_supports.extend(level_data.support_levels)
all_resistances.extend(level_data.resistance_levels)
# Find nearest support
for support in sorted(set(all_supports)):
distance = current_price - support
if distance > 0 and distance < analysis['support_distance']:
analysis['nearest_support'] = support
analysis['support_distance'] = distance
# Find nearest resistance
for resistance in sorted(set(all_resistances)):
distance = resistance - current_price
if distance > 0 and distance < analysis['resistance_distance']:
analysis['nearest_resistance'] = resistance
analysis['resistance_distance'] = distance
# Determine pivot context
if analysis['nearest_resistance'] and analysis['nearest_support']:
resistance_dist = analysis['resistance_distance']
support_dist = analysis['support_distance']
if resistance_dist < support_dist * 0.5:
analysis['pivot_context'] = 'NEAR_RESISTANCE'
elif support_dist < resistance_dist * 0.5:
analysis['pivot_context'] = 'NEAR_SUPPORT'
else:
analysis['pivot_context'] = 'MID_RANGE'
return analysis
except Exception as e:
logger.error(f"Error analyzing pivot context: {e}")
return analysis