Files
gogo2/core/pivot_detector.py
2025-06-13 11:48:03 +03:00

296 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Pivot Detector Core Module
This module handles Williams Market Structure pivot detection logic.
"""
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
# Setup logging with ASCII-only output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class DetectedPivot:
"""Dataclass for detected pivot points"""
type: str # 'HIGH' or 'LOW'
price: float
timestamp: datetime
strength: int
index: int
confirmed: bool = False
williams_level: int = 1
class WilliamsPivotDetector:
"""Williams Market Structure Pivot Detection Engine"""
def __init__(self, config: Optional[Dict] = None):
self.config = config or self._default_config()
self.detected_pivots: List[DetectedPivot] = []
logger.info("Williams Pivot Detector initialized")
def _default_config(self) -> Dict:
"""Default configuration for pivot detection"""
return {
'lookback_periods': 5,
'confirmation_periods': 2,
'min_pivot_distance': 3,
'strength_levels': 5,
'price_threshold_pct': 0.1
}
def detect_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""
Detect pivot points in OHLCV data using Williams Market Structure
Args:
data: DataFrame with OHLCV columns
Returns:
List of detected pivot points
"""
try:
if len(data) < self.config['lookback_periods'] * 2 + 1:
return []
pivots = []
# Detect HIGH pivots
high_pivots = self._detect_high_pivots(data)
pivots.extend(high_pivots)
# Detect LOW pivots
low_pivots = self._detect_low_pivots(data)
pivots.extend(low_pivots)
# Sort by timestamp
pivots.sort(key=lambda x: x.timestamp)
# Filter by minimum distance
filtered_pivots = self._filter_by_distance(pivots)
# Update internal storage
self.detected_pivots = filtered_pivots
logger.info(f"Detected {len(filtered_pivots)} pivot points")
return filtered_pivots
except Exception as e:
logger.error(f"Error detecting pivots: {e}")
return []
def _detect_high_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""Detect HIGH pivot points"""
pivots = []
lookback = self.config['lookback_periods']
for i in range(lookback, len(data) - lookback):
current_high = data.iloc[i]['high']
# Check if current high is higher than surrounding highs
is_pivot = True
for j in range(i - lookback, i + lookback + 1):
if j != i and data.iloc[j]['high'] >= current_high:
is_pivot = False
break
if is_pivot:
# Calculate pivot strength
strength = self._calculate_pivot_strength(data, i, 'HIGH')
pivot = DetectedPivot(
type='HIGH',
price=current_high,
timestamp=data.iloc[i]['timestamp'] if 'timestamp' in data.columns else datetime.now(),
strength=strength,
index=i,
confirmed=i < len(data) - self.config['confirmation_periods'],
williams_level=min(strength, 5)
)
pivots.append(pivot)
return pivots
def _detect_low_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""Detect LOW pivot points"""
pivots = []
lookback = self.config['lookback_periods']
for i in range(lookback, len(data) - lookback):
current_low = data.iloc[i]['low']
# Check if current low is lower than surrounding lows
is_pivot = True
for j in range(i - lookback, i + lookback + 1):
if j != i and data.iloc[j]['low'] <= current_low:
is_pivot = False
break
if is_pivot:
# Calculate pivot strength
strength = self._calculate_pivot_strength(data, i, 'LOW')
pivot = DetectedPivot(
type='LOW',
price=current_low,
timestamp=data.iloc[i]['timestamp'] if 'timestamp' in data.columns else datetime.now(),
strength=strength,
index=i,
confirmed=i < len(data) - self.config['confirmation_periods'],
williams_level=min(strength, 5)
)
pivots.append(pivot)
return pivots
def _calculate_pivot_strength(self, data: pd.DataFrame, pivot_index: int, pivot_type: str) -> int:
"""Calculate the strength of a pivot point (1-5 scale)"""
try:
if pivot_type == 'HIGH':
pivot_price = data.iloc[pivot_index]['high']
price_column = 'high'
else:
pivot_price = data.iloc[pivot_index]['low']
price_column = 'low'
strength = 1
# Check increasing ranges around the pivot
for range_size in [3, 5, 8, 13, 21]: # Fibonacci-like sequence
if pivot_index >= range_size and pivot_index < len(data) - range_size:
is_extreme = True
for i in range(pivot_index - range_size, pivot_index + range_size + 1):
if i != pivot_index:
if pivot_type == 'HIGH' and data.iloc[i][price_column] >= pivot_price:
is_extreme = False
break
elif pivot_type == 'LOW' and data.iloc[i][price_column] <= pivot_price:
is_extreme = False
break
if is_extreme:
strength += 1
else:
break
return min(strength, 5)
except Exception as e:
logger.error(f"Error calculating pivot strength: {e}")
return 1
def _filter_by_distance(self, pivots: List[DetectedPivot]) -> List[DetectedPivot]:
"""Filter pivots that are too close to each other"""
if not pivots:
return []
filtered = [pivots[0]]
min_distance = self.config['min_pivot_distance']
for pivot in pivots[1:]:
# Check distance from all previously added pivots
too_close = False
for existing_pivot in filtered:
if abs(pivot.index - existing_pivot.index) < min_distance:
# Keep the stronger pivot
if pivot.strength > existing_pivot.strength:
filtered.remove(existing_pivot)
filtered.append(pivot)
too_close = True
break
if not too_close:
filtered.append(pivot)
return sorted(filtered, key=lambda x: x.timestamp)
def get_recent_pivots(self, hours: int = 24) -> List[DetectedPivot]:
"""Get pivots detected in the last N hours"""
cutoff_time = datetime.now() - timedelta(hours=hours)
return [pivot for pivot in self.detected_pivots if pivot.timestamp > cutoff_time]
def get_pivot_levels(self) -> Dict[int, List[DetectedPivot]]:
"""Group pivots by Williams strength levels"""
levels = {}
for pivot in self.detected_pivots:
level = pivot.williams_level
if level not in levels:
levels[level] = []
levels[level].append(pivot)
return levels
def is_potential_pivot(self, data: pd.DataFrame, current_index: int) -> Optional[Dict]:
"""Check if current position might be a pivot (for real-time detection)"""
try:
if current_index < self.config['lookback_periods']:
return None
lookback = self.config['lookback_periods']
current_high = data.iloc[current_index]['high']
current_low = data.iloc[current_index]['low']
# Check for potential HIGH pivot
is_high_pivot = True
for i in range(current_index - lookback, current_index):
if data.iloc[i]['high'] >= current_high:
is_high_pivot = False
break
# Check for potential LOW pivot
is_low_pivot = True
for i in range(current_index - lookback, current_index):
if data.iloc[i]['low'] <= current_low:
is_low_pivot = False
break
result = {}
if is_high_pivot:
result['HIGH'] = {
'price': current_high,
'confidence': 0.7, # Unconfirmed
'strength': self._calculate_pivot_strength(data, current_index, 'HIGH')
}
if is_low_pivot:
result['LOW'] = {
'price': current_low,
'confidence': 0.7, # Unconfirmed
'strength': self._calculate_pivot_strength(data, current_index, 'LOW')
}
return result if result else None
except Exception as e:
logger.error(f"Error checking potential pivot: {e}")
return None
def get_statistics(self) -> Dict:
"""Get pivot detection statistics"""
if not self.detected_pivots:
return {'total_pivots': 0, 'high_pivots': 0, 'low_pivots': 0}
high_count = len([p for p in self.detected_pivots if p.type == 'HIGH'])
low_count = len([p for p in self.detected_pivots if p.type == 'LOW'])
confirmed_count = len([p for p in self.detected_pivots if p.confirmed])
avg_strength = np.mean([p.strength for p in self.detected_pivots])
return {
'total_pivots': len(self.detected_pivots),
'high_pivots': high_count,
'low_pivots': low_count,
'confirmed_pivots': confirmed_count,
'average_strength': avg_strength,
'strength_distribution': {
i: len([p for p in self.detected_pivots if p.strength == i])
for i in range(1, 6)
}
}