gogo2/training/williams_market_structure.py
Dobromir Popov 1130e02f35 misc
2025-05-30 01:38:04 +03:00

698 lines
28 KiB
Python

"""
Williams Market Structure Implementation for RL Training
This module implements Larry Williams market structure analysis methodology for
RL training enhancement with:
- Swing high/low detection with configurable strength
- 5 levels of recursive pivot point calculation
- Trend analysis (higher highs/lows vs lower highs/lows)
- Market bias determination across multiple timeframes
- Feature extraction for RL training (250 features)
Based on Larry Williams' teachings on market structure:
- Markets move in swings between support and resistance
- Higher timeframe structure determines lower timeframe bias
- Recursive analysis reveals fractal patterns
- Trend direction determined by swing point relationships
"""
import numpy as np
import pandas as pd
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class TrendDirection(Enum):
UP = "up"
DOWN = "down"
SIDEWAYS = "sideways"
UNKNOWN = "unknown"
class SwingType(Enum):
SWING_HIGH = "swing_high"
SWING_LOW = "swing_low"
@dataclass
class SwingPoint:
"""Represents a swing high or low point"""
timestamp: datetime
price: float
index: int
swing_type: SwingType
strength: int # Number of bars on each side that confirm the swing
volume: float = 0.0
@dataclass
class TrendAnalysis:
"""Trend analysis results"""
direction: TrendDirection
strength: float # 0.0 to 1.0
confidence: float # 0.0 to 1.0
swing_count: int
last_swing_high: Optional[SwingPoint]
last_swing_low: Optional[SwingPoint]
higher_highs: int
higher_lows: int
lower_highs: int
lower_lows: int
@dataclass
class MarketStructureLevel:
"""Market structure analysis for one recursive level"""
level: int
swing_points: List[SwingPoint]
trend_analysis: TrendAnalysis
support_levels: List[float]
resistance_levels: List[float]
current_bias: TrendDirection
structure_breaks: List[Dict[str, Any]]
class WilliamsMarketStructure:
"""
Implementation of Larry Williams market structure methodology
Features:
- Multi-strength swing detection (2, 3, 5, 8, 13 bar strengths)
- 5 levels of recursive analysis
- Trend direction determination
- Support/resistance level identification
- Market bias calculation
- Structure break detection
"""
def __init__(self, swing_strengths: List[int] = None):
"""
Initialize Williams market structure analyzer
Args:
swing_strengths: List of swing detection strengths (bars on each side)
"""
self.swing_strengths = swing_strengths or [2, 3, 5, 8, 13] # Fibonacci-based strengths
self.max_levels = 5
self.min_swings_for_trend = 3
# Cache for performance
self.swing_cache = {}
self.trend_cache = {}
logger.info(f"Williams Market Structure initialized with strengths: {self.swing_strengths}")
def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[str, MarketStructureLevel]:
"""
Calculate 5 levels of recursive pivot points using TRUE recursion
Level 1: Calculated from 1s OHLCV data
Level 2: Calculated from Level 1 pivot points treated as individual price bars
Level 3: Calculated from Level 2 pivot points treated as individual price bars
etc.
Args:
ohlcv_data: OHLCV data array with columns [timestamp, open, high, low, close, volume]
Returns:
Dict of market structure levels with swing points and trend analysis
"""
if len(ohlcv_data) < 20:
logger.warning("Insufficient data for Williams structure analysis")
return self._create_empty_structure()
levels = {}
current_price_points = ohlcv_data.copy() # Start with raw price data
for level in range(self.max_levels):
logger.debug(f"Analyzing level {level} with {len(current_price_points)} data points")
if level == 0:
# Level 0 (Level 1): Calculate from raw OHLCV data
swing_points = self._find_swing_points_multi_strength(current_price_points)
else:
# Level 1+ (Level 2+): Calculate from previous level's pivot points
# Treat pivot points as individual price bars
swing_points = self._find_pivot_points_from_pivot_points(current_price_points, level)
if len(swing_points) < self.min_swings_for_trend:
logger.debug(f"Not enough swings at level {level}: {len(swing_points)}")
# Fill remaining levels with empty data
for remaining_level in range(level, self.max_levels):
levels[f'level_{remaining_level}'] = self._create_empty_level(remaining_level)
break
# Analyze trend for this level
trend_analysis = self._analyze_trend_from_swings(swing_points)
# Find support/resistance levels
support_levels, resistance_levels = self._find_support_resistance(
swing_points, current_price_points if level == 0 else None
)
# Determine current market bias
current_bias = self._determine_market_bias(swing_points, trend_analysis)
# Detect structure breaks
structure_breaks = self._detect_structure_breaks(swing_points, current_price_points if level == 0 else None)
# Create level data
levels[f'level_{level}'] = MarketStructureLevel(
level=level,
swing_points=swing_points,
trend_analysis=trend_analysis,
support_levels=support_levels,
resistance_levels=resistance_levels,
current_bias=current_bias,
structure_breaks=structure_breaks
)
# Prepare data for next level: convert swing points to "price points"
if len(swing_points) >= 5:
current_price_points = self._convert_pivots_to_price_points(swing_points)
if len(current_price_points) < 10:
logger.debug(f"Insufficient pivot data for level {level + 1}")
break
else:
logger.debug(f"Not enough swings to continue to level {level + 1}")
break
# Fill any remaining empty levels
for remaining_level in range(len(levels), self.max_levels):
levels[f'level_{remaining_level}'] = self._create_empty_level(remaining_level)
return levels
def _find_swing_points_multi_strength(self, ohlcv_data: np.ndarray) -> List[SwingPoint]:
"""Find swing points using multiple strength criteria"""
all_swings = []
for strength in self.swing_strengths:
swings = self._find_swing_points_single_strength(ohlcv_data, strength)
for swing in swings:
# Avoid duplicates (swings at same index)
if not any(existing.index == swing.index for existing in all_swings):
all_swings.append(swing)
# Sort by timestamp/index
all_swings.sort(key=lambda x: x.index)
# Filter to get the most significant swings
return self._filter_significant_swings(all_swings)
def _find_swing_points_single_strength(self, ohlcv_data: np.ndarray, strength: int) -> List[SwingPoint]:
"""Find swing points with specific strength requirement"""
swings = []
if len(ohlcv_data) < (strength * 2 + 1):
return swings
for i in range(strength, len(ohlcv_data) - strength):
current_high = ohlcv_data[i, 2] # High price
current_low = ohlcv_data[i, 3] # Low price
current_volume = ohlcv_data[i, 5] if ohlcv_data.shape[1] > 5 else 0.0
# Check for swing high (higher than surrounding bars)
is_swing_high = True
for j in range(i - strength, i + strength + 1):
if j != i and ohlcv_data[j, 2] >= current_high:
is_swing_high = False
break
if is_swing_high:
swings.append(SwingPoint(
timestamp=datetime.fromtimestamp(ohlcv_data[i, 0]) if ohlcv_data[i, 0] > 1e9 else datetime.now(),
price=current_high,
index=i,
swing_type=SwingType.SWING_HIGH,
strength=strength,
volume=current_volume
))
# Check for swing low (lower than surrounding bars)
is_swing_low = True
for j in range(i - strength, i + strength + 1):
if j != i and ohlcv_data[j, 3] <= current_low:
is_swing_low = False
break
if is_swing_low:
swings.append(SwingPoint(
timestamp=datetime.fromtimestamp(ohlcv_data[i, 0]) if ohlcv_data[i, 0] > 1e9 else datetime.now(),
price=current_low,
index=i,
swing_type=SwingType.SWING_LOW,
strength=strength,
volume=current_volume
))
return swings
def _filter_significant_swings(self, swings: List[SwingPoint]) -> List[SwingPoint]:
"""Filter to keep only the most significant swings"""
if len(swings) <= 20:
return swings
# Sort by strength (higher strength = more significant)
swings_by_strength = sorted(swings, key=lambda x: x.strength, reverse=True)
# Take top swings but ensure we have alternating highs and lows
significant_swings = []
last_type = None
for swing in swings_by_strength:
if len(significant_swings) >= 20:
break
# Prefer alternating swing types for better structure
if last_type is None or swing.swing_type != last_type:
significant_swings.append(swing)
last_type = swing.swing_type
elif len(significant_swings) < 10: # Still add if we need more swings
significant_swings.append(swing)
# Sort by index again
significant_swings.sort(key=lambda x: x.index)
return significant_swings
def _analyze_trend_from_swings(self, swing_points: List[SwingPoint]) -> TrendAnalysis:
"""Analyze trend direction from swing points"""
if len(swing_points) < 2:
return TrendAnalysis(
direction=TrendDirection.UNKNOWN,
strength=0.0,
confidence=0.0,
swing_count=0,
last_swing_high=None,
last_swing_low=None,
higher_highs=0,
higher_lows=0,
lower_highs=0,
lower_lows=0
)
# Separate highs and lows
highs = [s for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
lows = [s for s in swing_points if s.swing_type == SwingType.SWING_LOW]
# Count higher highs, higher lows, lower highs, lower lows
higher_highs = self._count_higher_highs(highs)
higher_lows = self._count_higher_lows(lows)
lower_highs = self._count_lower_highs(highs)
lower_lows = self._count_lower_lows(lows)
# Determine trend direction
if higher_highs > 0 and higher_lows > 0:
direction = TrendDirection.UP
elif lower_highs > 0 and lower_lows > 0:
direction = TrendDirection.DOWN
else:
direction = TrendDirection.SIDEWAYS
# Calculate trend strength
total_moves = higher_highs + higher_lows + lower_highs + lower_lows
if direction == TrendDirection.UP:
strength = (higher_highs + higher_lows) / max(total_moves, 1)
elif direction == TrendDirection.DOWN:
strength = (lower_highs + lower_lows) / max(total_moves, 1)
else:
strength = 0.5 # Neutral for sideways
# Calculate confidence based on consistency
if total_moves > 0:
if direction == TrendDirection.UP:
confidence = (higher_highs + higher_lows) / total_moves
elif direction == TrendDirection.DOWN:
confidence = (lower_highs + lower_lows) / total_moves
else:
# For sideways, confidence is based on how balanced it is
up_moves = higher_highs + higher_lows
down_moves = lower_highs + lower_lows
balance = 1.0 - abs(up_moves - down_moves) / total_moves
confidence = balance
else:
confidence = 0.0
return TrendAnalysis(
direction=direction,
strength=min(strength, 1.0),
confidence=min(confidence, 1.0),
swing_count=len(swing_points),
last_swing_high=highs[-1] if highs else None,
last_swing_low=lows[-1] if lows else None,
higher_highs=higher_highs,
higher_lows=higher_lows,
lower_highs=lower_highs,
lower_lows=lower_lows
)
def _count_higher_highs(self, highs: List[SwingPoint]) -> int:
"""Count higher highs in sequence"""
if len(highs) < 2:
return 0
count = 0
for i in range(1, len(highs)):
if highs[i].price > highs[i-1].price:
count += 1
return count
def _count_higher_lows(self, lows: List[SwingPoint]) -> int:
"""Count higher lows in sequence"""
if len(lows) < 2:
return 0
count = 0
for i in range(1, len(lows)):
if lows[i].price > lows[i-1].price:
count += 1
return count
def _count_lower_highs(self, highs: List[SwingPoint]) -> int:
"""Count lower highs in sequence"""
if len(highs) < 2:
return 0
count = 0
for i in range(1, len(highs)):
if highs[i].price < highs[i-1].price:
count += 1
return count
def _count_lower_lows(self, lows: List[SwingPoint]) -> int:
"""Count lower lows in sequence"""
if len(lows) < 2:
return 0
count = 0
for i in range(1, len(lows)):
if lows[i].price < lows[i-1].price:
count += 1
return count
def _find_support_resistance(self, swing_points: List[SwingPoint],
ohlcv_data: np.ndarray) -> Tuple[List[float], List[float]]:
"""Find support and resistance levels from swing points"""
highs = [s.price for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
lows = [s.price for s in swing_points if s.swing_type == SwingType.SWING_LOW]
# Cluster similar levels
support_levels = self._cluster_price_levels(lows) if lows else []
resistance_levels = self._cluster_price_levels(highs) if highs else []
return support_levels, resistance_levels
def _cluster_price_levels(self, prices: List[float], tolerance: float = 0.02) -> List[float]:
"""Cluster similar price levels together"""
if not prices:
return []
sorted_prices = sorted(prices)
clusters = []
current_cluster = [sorted_prices[0]]
for price in sorted_prices[1:]:
# If price is within tolerance of cluster average, add to cluster
cluster_avg = np.mean(current_cluster)
if abs(price - cluster_avg) / cluster_avg <= tolerance:
current_cluster.append(price)
else:
# Start new cluster
clusters.append(np.mean(current_cluster))
current_cluster = [price]
# Add last cluster
if current_cluster:
clusters.append(np.mean(current_cluster))
return clusters
def _determine_market_bias(self, swing_points: List[SwingPoint],
trend_analysis: TrendAnalysis) -> TrendDirection:
"""Determine current market bias"""
if not swing_points:
return TrendDirection.UNKNOWN
# Use trend analysis as primary indicator
if trend_analysis.confidence > 0.6:
return trend_analysis.direction
# Look at most recent swings for bias
recent_swings = swing_points[-6:] if len(swing_points) >= 6 else swing_points
if len(recent_swings) >= 2:
first_price = recent_swings[0].price
last_price = recent_swings[-1].price
price_change = (last_price - first_price) / first_price
if price_change > 0.01: # 1% threshold
return TrendDirection.UP
elif price_change < -0.01:
return TrendDirection.DOWN
else:
return TrendDirection.SIDEWAYS
return TrendDirection.UNKNOWN
def _detect_structure_breaks(self, swing_points: List[SwingPoint],
ohlcv_data: np.ndarray) -> List[Dict[str, Any]]:
"""Detect structure breaks (trend changes)"""
structure_breaks = []
if len(swing_points) < 4:
return structure_breaks
# Look for pattern breaks
highs = [s for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
lows = [s for s in swing_points if s.swing_type == SwingType.SWING_LOW]
# Check for break of structure in highs (lower high after higher highs)
if len(highs) >= 3:
for i in range(2, len(highs)):
if (highs[i-2].price < highs[i-1].price and # Previous was higher high
highs[i-1].price > highs[i].price): # Current is lower high
structure_breaks.append({
'type': 'break_of_structure_high',
'timestamp': highs[i].timestamp,
'price': highs[i].price,
'previous_high': highs[i-1].price,
'significance': abs(highs[i].price - highs[i-1].price) / highs[i-1].price
})
# Check for break of structure in lows (higher low after lower lows)
if len(lows) >= 3:
for i in range(2, len(lows)):
if (lows[i-2].price > lows[i-1].price and # Previous was lower low
lows[i-1].price < lows[i].price): # Current is higher low
structure_breaks.append({
'type': 'break_of_structure_low',
'timestamp': lows[i].timestamp,
'price': lows[i].price,
'previous_low': lows[i-1].price,
'significance': abs(lows[i].price - lows[i-1].price) / lows[i-1].price
})
return structure_breaks
def _find_pivot_points_from_pivot_points(self, pivot_array: np.ndarray, level: int) -> List[SwingPoint]:
"""
Find pivot points from previous level's pivot points
For Level 2+: A Level N low pivot is when a Level N-1 pivot low is surrounded
by higher Level N-1 pivot lows (and vice versa for highs)
Args:
pivot_array: Array of pivot points as [timestamp, price, price, price, price, 0] format
level: Current level being calculated
"""
swings = []
if len(pivot_array) < 5:
return swings
# Use configurable strength for higher levels (more conservative)
strength = min(2 + level, 5) # Level 1: 3 bars, Level 2: 4 bars, Level 3+: 5 bars
for i in range(strength, len(pivot_array) - strength):
current_price = pivot_array[i, 1] # Use the price from pivot point
current_timestamp = pivot_array[i, 0]
# Check for swing high (pivot high surrounded by lower pivot highs)
is_swing_high = True
for j in range(i - strength, i + strength + 1):
if j != i and pivot_array[j, 1] >= current_price:
is_swing_high = False
break
if is_swing_high:
swings.append(SwingPoint(
timestamp=datetime.fromtimestamp(current_timestamp) if current_timestamp > 1e9 else datetime.now(),
price=current_price,
index=i,
swing_type=SwingType.SWING_HIGH,
strength=strength,
volume=0.0 # Pivot points don't have volume
))
# Check for swing low (pivot low surrounded by higher pivot lows)
is_swing_low = True
for j in range(i - strength, i + strength + 1):
if j != i and pivot_array[j, 1] <= current_price:
is_swing_low = False
break
if is_swing_low:
swings.append(SwingPoint(
timestamp=datetime.fromtimestamp(current_timestamp) if current_timestamp > 1e9 else datetime.now(),
price=current_price,
index=i,
swing_type=SwingType.SWING_LOW,
strength=strength,
volume=0.0 # Pivot points don't have volume
))
return swings
def _convert_pivots_to_price_points(self, swing_points: List[SwingPoint]) -> np.ndarray:
"""
Convert swing points to price point array for next level calculation
Each swing point becomes a "price bar" where OHLC = pivot price
This allows the next level to treat pivot points as individual price data
"""
if len(swing_points) < 2:
return np.array([])
price_points = []
for swing in swing_points:
# Each pivot point becomes a price point where OHLC = pivot price
price_points.append([
swing.timestamp.timestamp(),
swing.price, # Open = pivot price
swing.price, # High = pivot price
swing.price, # Low = pivot price
swing.price, # Close = pivot price
0.0 # Volume = 0 (not applicable for pivot points)
])
return np.array(price_points)
def _create_empty_structure(self) -> Dict[str, MarketStructureLevel]:
"""Create empty structure when insufficient data"""
return {f'level_{i}': self._create_empty_level(i) for i in range(self.max_levels)}
def _create_empty_level(self, level: int) -> MarketStructureLevel:
"""Create empty market structure level"""
return MarketStructureLevel(
level=level,
swing_points=[],
trend_analysis=TrendAnalysis(
direction=TrendDirection.UNKNOWN,
strength=0.0,
confidence=0.0,
swing_count=0,
last_swing_high=None,
last_swing_low=None,
higher_highs=0,
higher_lows=0,
lower_highs=0,
lower_lows=0
),
support_levels=[],
resistance_levels=[],
current_bias=TrendDirection.UNKNOWN,
structure_breaks=[]
)
def extract_features_for_rl(self, structure_levels: Dict[str, MarketStructureLevel]) -> List[float]:
"""
Extract features from Williams structure for RL training
Returns ~250 features total:
- 50 features per level (5 levels)
"""
features = []
for level in range(self.max_levels):
level_key = f'level_{level}'
if level_key in structure_levels:
level_data = structure_levels[level_key]
level_features = self._extract_level_features(level_data)
else:
level_features = [0.0] * 50 # 50 features per level
features.extend(level_features)
return features[:250] # Ensure exactly 250 features
def _extract_level_features(self, level: MarketStructureLevel) -> List[float]:
"""Extract features from a single structure level"""
features = []
# Trend features (10 features)
features.extend([
1.0 if level.trend_analysis.direction == TrendDirection.UP else 0.0,
1.0 if level.trend_analysis.direction == TrendDirection.DOWN else 0.0,
1.0 if level.trend_analysis.direction == TrendDirection.SIDEWAYS else 0.0,
level.trend_analysis.strength,
level.trend_analysis.confidence,
level.trend_analysis.higher_highs,
level.trend_analysis.higher_lows,
level.trend_analysis.lower_highs,
level.trend_analysis.lower_lows,
len(level.swing_points)
])
# Current bias features (4 features)
features.extend([
1.0 if level.current_bias == TrendDirection.UP else 0.0,
1.0 if level.current_bias == TrendDirection.DOWN else 0.0,
1.0 if level.current_bias == TrendDirection.SIDEWAYS else 0.0,
1.0 if level.current_bias == TrendDirection.UNKNOWN else 0.0
])
# Swing point features (20 features - last 10 swings * 2 features each)
recent_swings = level.swing_points[-10:] if len(level.swing_points) >= 10 else level.swing_points
for swing in recent_swings:
features.extend([
swing.price,
1.0 if swing.swing_type == SwingType.SWING_HIGH else 0.0
])
# Pad if fewer than 10 swings
while len(recent_swings) < 10:
features.extend([0.0, 0.0])
recent_swings.append(None) # Just for counting
# Support/resistance levels (10 features - 5 support + 5 resistance)
support_levels = level.support_levels[:5] if len(level.support_levels) >= 5 else level.support_levels
while len(support_levels) < 5:
support_levels.append(0.0)
features.extend(support_levels)
resistance_levels = level.resistance_levels[:5] if len(level.resistance_levels) >= 5 else level.resistance_levels
while len(resistance_levels) < 5:
resistance_levels.append(0.0)
features.extend(resistance_levels)
# Structure break features (6 features)
recent_breaks = level.structure_breaks[-3:] if len(level.structure_breaks) >= 3 else level.structure_breaks
for break_info in recent_breaks:
features.extend([
break_info.get('significance', 0.0),
1.0 if break_info.get('type', '').endswith('_high') else 0.0
])
# Pad if fewer than 3 breaks
while len(recent_breaks) < 3:
features.extend([0.0, 0.0])
recent_breaks.append({})
return features[:50] # Ensure exactly 50 features per level