order flow WIP, chart broken

This commit is contained in:
Dobromir Popov
2025-06-18 13:51:08 +03:00
parent 5bce17a21a
commit e238ce374b
16 changed files with 1768 additions and 1333 deletions

View File

@ -1,378 +0,0 @@
#!/usr/bin/env python3
"""
Chart Data Provider Core Module
This module handles all chart data preparation and market data simulation,
separated from the web UI layer.
"""
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from .cnn_pivot_predictor import CNNPivotPredictor, PivotPrediction
from .pivot_detector import WilliamsPivotDetector, DetectedPivot
# Setup logging with ASCII-only output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ChartDataProvider:
"""Core chart data provider with market simulation and chart preparation"""
def __init__(self, config: Optional[Dict] = None):
self.config = config or self._default_config()
# Initialize core components
self.cnn_predictor = CNNPivotPredictor()
self.pivot_detector = WilliamsPivotDetector()
# Market data
self.current_price = 3500.0 # Starting ETH price
self.price_history: List[Dict] = []
# Initialize with sample data
self._generate_initial_data()
logger.info("Chart Data Provider initialized")
def _default_config(self) -> Dict:
"""Default configuration"""
return {
'initial_history_hours': 2,
'price_volatility': 5.0,
'volume_range': (100, 1000),
'chart_height': 600,
'subplots': True
}
def _generate_initial_data(self) -> None:
"""Generate initial price history for demonstration"""
base_time = datetime.now() - timedelta(hours=self.config['initial_history_hours'])
for i in range(120): # 2 hours of minute data
# Simulate realistic price movement
change = np.random.normal(0, self.config['price_volatility'])
self.current_price += change
# Ensure price doesn't go negative
self.current_price = max(self.current_price, 100.0)
timestamp = base_time + timedelta(minutes=i)
# Generate OHLC data
open_price = self.current_price - np.random.uniform(-2, 2)
high_price = max(open_price, self.current_price) + np.random.uniform(0, 8)
low_price = min(open_price, self.current_price) - np.random.uniform(0, 8)
close_price = self.current_price
volume = np.random.uniform(*self.config['volume_range'])
candle = {
'timestamp': timestamp,
'open': open_price,
'high': high_price,
'low': low_price,
'close': close_price,
'volume': volume
}
self.price_history.append(candle)
logger.info(f"Generated {len(self.price_history)} initial price candles")
def simulate_price_update(self) -> Dict:
"""Simulate real-time price update"""
try:
# Generate new price movement
change = np.random.normal(0, self.config['price_volatility'])
self.current_price += change
self.current_price = max(self.current_price, 100.0)
# Create new candle
timestamp = datetime.now()
open_price = self.price_history[-1]['close'] if self.price_history else self.current_price
high_price = max(open_price, self.current_price) + np.random.uniform(0, 5)
low_price = min(open_price, self.current_price) - np.random.uniform(0, 5)
close_price = self.current_price
volume = np.random.uniform(*self.config['volume_range'])
new_candle = {
'timestamp': timestamp,
'open': open_price,
'high': high_price,
'low': low_price,
'close': close_price,
'volume': volume
}
self.price_history.append(new_candle)
# Keep only last 200 candles to prevent memory growth
if len(self.price_history) > 200:
self.price_history = self.price_history[-200:]
return new_candle
except Exception as e:
logger.error(f"Error simulating price update: {e}")
return {}
def get_market_data_df(self) -> pd.DataFrame:
"""Convert price history to pandas DataFrame"""
try:
if not self.price_history:
return pd.DataFrame()
df = pd.DataFrame(self.price_history)
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
except Exception as e:
logger.error(f"Error creating DataFrame: {e}")
return pd.DataFrame()
def update_predictions_and_pivots(self) -> Tuple[List[PivotPrediction], List[DetectedPivot]]:
"""Update CNN predictions and detect new pivots"""
try:
market_df = self.get_market_data_df()
if market_df.empty:
return [], []
# Update CNN predictions
predictions = self.cnn_predictor.update_predictions(market_df, self.current_price)
# Detect pivots
detected_pivots = self.pivot_detector.detect_pivots(market_df)
# Capture training data if new pivots are found
for pivot in detected_pivots:
if pivot.confirmed:
actual_pivot = type('ActualPivot', (), {
'type': pivot.type,
'price': pivot.price,
'timestamp': pivot.timestamp,
'strength': pivot.strength
})()
self.cnn_predictor.capture_training_data(actual_pivot)
return predictions, detected_pivots
except Exception as e:
logger.error(f"Error updating predictions and pivots: {e}")
return [], []
def create_price_chart(self) -> go.Figure:
"""Create main price chart with candlesticks and volume"""
try:
market_df = self.get_market_data_df()
if market_df.empty:
return go.Figure()
# Create subplots
if self.config['subplots']:
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=('Price', 'Volume'),
row_width=[0.7, 0.3]
)
else:
fig = go.Figure()
# Add candlestick chart
candlestick = go.Candlestick(
x=market_df['timestamp'],
open=market_df['open'],
high=market_df['high'],
low=market_df['low'],
close=market_df['close'],
name='ETH/USDT',
increasing_line_color='#00ff88',
decreasing_line_color='#ff4444'
)
if self.config['subplots']:
fig.add_trace(candlestick, row=1, col=1)
else:
fig.add_trace(candlestick)
# Add volume bars if subplots enabled
if self.config['subplots']:
volume_colors = ['#00ff88' if close >= open else '#ff4444'
for close, open in zip(market_df['close'], market_df['open'])]
volume_bar = go.Bar(
x=market_df['timestamp'],
y=market_df['volume'],
name='Volume',
marker_color=volume_colors,
opacity=0.7
)
fig.add_trace(volume_bar, row=2, col=1)
# Update layout
fig.update_layout(
title='ETH/USDT Price Chart with CNN Predictions',
xaxis_title='Time',
yaxis_title='Price (USDT)',
height=self.config['chart_height'],
showlegend=True,
xaxis_rangeslider_visible=False
)
return fig
except Exception as e:
logger.error(f"Error creating price chart: {e}")
return go.Figure()
def add_cnn_predictions_to_chart(self, fig: go.Figure, predictions: List[PivotPrediction]) -> go.Figure:
"""Add CNN predictions as hollow circles to the chart"""
try:
if not predictions:
return fig
# Separate HIGH and LOW predictions
high_predictions = [p for p in predictions if p.type == 'HIGH']
low_predictions = [p for p in predictions if p.type == 'LOW']
# Add HIGH predictions (red hollow circles)
if high_predictions:
high_x = [p.timestamp for p in high_predictions]
high_y = [p.predicted_price for p in high_predictions]
high_sizes = [max(8, min(20, p.confidence * 25)) for p in high_predictions]
high_text = [f"HIGH Prediction<br>Price: ${p.predicted_price:.2f}<br>Confidence: {p.confidence:.1%}<br>Level: {p.level}"
for p in high_predictions]
fig.add_trace(go.Scatter(
x=high_x,
y=high_y,
mode='markers',
marker=dict(
symbol='circle-open',
size=high_sizes,
color='red',
line=dict(width=2)
),
name='CNN HIGH Predictions',
text=high_text,
hovertemplate='%{text}<extra></extra>'
))
# Add LOW predictions (green hollow circles)
if low_predictions:
low_x = [p.timestamp for p in low_predictions]
low_y = [p.predicted_price for p in low_predictions]
low_sizes = [max(8, min(20, p.confidence * 25)) for p in low_predictions]
low_text = [f"LOW Prediction<br>Price: ${p.predicted_price:.2f}<br>Confidence: {p.confidence:.1%}<br>Level: {p.level}"
for p in low_predictions]
fig.add_trace(go.Scatter(
x=low_x,
y=low_y,
mode='markers',
marker=dict(
symbol='circle-open',
size=low_sizes,
color='green',
line=dict(width=2)
),
name='CNN LOW Predictions',
text=low_text,
hovertemplate='%{text}<extra></extra>'
))
return fig
except Exception as e:
logger.error(f"Error adding CNN predictions to chart: {e}")
return fig
def add_actual_pivots_to_chart(self, fig: go.Figure, pivots: List[DetectedPivot]) -> go.Figure:
"""Add actual detected pivots as solid triangles to the chart"""
try:
if not pivots:
return fig
# Separate HIGH and LOW pivots
high_pivots = [p for p in pivots if p.type == 'HIGH']
low_pivots = [p for p in pivots if p.type == 'LOW']
# Add HIGH pivots (red triangles pointing down)
if high_pivots:
high_x = [p.timestamp for p in high_pivots]
high_y = [p.price for p in high_pivots]
high_sizes = [max(10, min(25, p.strength * 5)) for p in high_pivots]
high_text = [f"HIGH Pivot<br>Price: ${p.price:.2f}<br>Strength: {p.strength}<br>Confirmed: {p.confirmed}"
for p in high_pivots]
fig.add_trace(go.Scatter(
x=high_x,
y=high_y,
mode='markers',
marker=dict(
symbol='triangle-down',
size=high_sizes,
color='darkred',
line=dict(width=1, color='white')
),
name='Actual HIGH Pivots',
text=high_text,
hovertemplate='%{text}<extra></extra>'
))
# Add LOW pivots (green triangles pointing up)
if low_pivots:
low_x = [p.timestamp for p in low_pivots]
low_y = [p.price for p in low_pivots]
low_sizes = [max(10, min(25, p.strength * 5)) for p in low_pivots]
low_text = [f"LOW Pivot<br>Price: ${p.price:.2f}<br>Strength: {p.strength}<br>Confirmed: {p.confirmed}"
for p in low_pivots]
fig.add_trace(go.Scatter(
x=low_x,
y=low_y,
mode='markers',
marker=dict(
symbol='triangle-up',
size=low_sizes,
color='darkgreen',
line=dict(width=1, color='white')
),
name='Actual LOW Pivots',
text=low_text,
hovertemplate='%{text}<extra></extra>'
))
return fig
except Exception as e:
logger.error(f"Error adding actual pivots to chart: {e}")
return fig
def get_current_status(self) -> Dict:
"""Get current system status for dashboard display"""
try:
prediction_stats = self.cnn_predictor.get_prediction_stats()
pivot_stats = self.pivot_detector.get_statistics()
training_stats = self.cnn_predictor.get_training_stats()
return {
'current_price': self.current_price,
'total_candles': len(self.price_history),
'last_update': datetime.now().strftime('%H:%M:%S'),
'predictions': prediction_stats,
'pivots': pivot_stats,
'training': training_stats
}
except Exception as e:
logger.error(f"Error getting current status: {e}")
return {}

View File

@ -1,285 +0,0 @@
#!/usr/bin/env python3
"""
CNN Pivot Predictor Core Module
This module handles all CNN-based pivot prediction logic, separated from the web UI.
"""
import logging
import time
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import json
import os
from dataclasses import dataclass
# Setup logging with ASCII-only output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class PivotPrediction:
"""Dataclass for CNN pivot predictions"""
level: int
type: str # 'HIGH' or 'LOW'
predicted_price: float
confidence: float
timestamp: datetime
current_price: float
model_inputs: Optional[Dict] = None
@dataclass
class ActualPivot:
"""Dataclass for actual detected pivots"""
type: str # 'HIGH' or 'LOW'
price: float
timestamp: datetime
strength: int
confirmed: bool = False
@dataclass
class TrainingDataPoint:
"""Dataclass for capturing training comparison data"""
prediction: PivotPrediction
actual_pivot: Optional[ActualPivot]
prediction_accuracy: Optional[float]
time_accuracy: Optional[float]
captured_at: datetime
class CNNPivotPredictor:
"""Core CNN pivot prediction engine"""
def __init__(self, config: Optional[Dict] = None):
self.config = config or self._default_config()
self.current_predictions: List[PivotPrediction] = []
self.training_data: List[TrainingDataPoint] = []
self.model_available = False
# Initialize data storage paths
self.training_data_dir = "data/cnn_training"
os.makedirs(self.training_data_dir, exist_ok=True)
logger.info("CNN Pivot Predictor initialized")
def _default_config(self) -> Dict:
"""Default configuration for CNN predictor"""
return {
'prediction_levels': 5, # Williams Market Structure levels
'confidence_threshold': 0.3,
'model_timesteps': 900,
'model_features': 50,
'prediction_horizon_minutes': 30
}
def generate_predictions(self, market_data: pd.DataFrame, current_price: float) -> List[PivotPrediction]:
"""
Generate CNN pivot predictions based on current market data
Args:
market_data: DataFrame with OHLCV data
current_price: Current market price
Returns:
List of pivot predictions
"""
try:
current_time = datetime.now()
predictions = []
# For demo purposes, generate sample predictions
# In production, this would use the actual CNN model
for level in range(1, self.config['prediction_levels'] + 1):
# HIGH pivot prediction
high_confidence = np.random.uniform(0.4, 0.9)
if high_confidence > self.config['confidence_threshold']:
high_price = current_price + np.random.uniform(10, 50)
high_prediction = PivotPrediction(
level=level,
type='HIGH',
predicted_price=high_price,
confidence=high_confidence,
timestamp=current_time + timedelta(minutes=level*5),
current_price=current_price,
model_inputs=self._prepare_model_inputs(market_data)
)
predictions.append(high_prediction)
# LOW pivot prediction
low_confidence = np.random.uniform(0.3, 0.8)
if low_confidence > self.config['confidence_threshold']:
low_price = current_price - np.random.uniform(15, 40)
low_prediction = PivotPrediction(
level=level,
type='LOW',
predicted_price=low_price,
confidence=low_confidence,
timestamp=current_time + timedelta(minutes=level*7),
current_price=current_price,
model_inputs=self._prepare_model_inputs(market_data)
)
predictions.append(low_prediction)
self.current_predictions = predictions
logger.info(f"Generated {len(predictions)} CNN pivot predictions")
return predictions
except Exception as e:
logger.error(f"Error generating CNN predictions: {e}")
return []
def _prepare_model_inputs(self, market_data: pd.DataFrame) -> Dict:
"""Prepare model inputs for CNN prediction"""
if len(market_data) < self.config['model_timesteps']:
return {'insufficient_data': True}
# Extract last 900 timesteps with 50 features
recent_data = market_data.tail(self.config['model_timesteps'])
return {
'timesteps': len(recent_data),
'features': self.config['model_features'],
'price_range': (recent_data['low'].min(), recent_data['high'].max()),
'volume_avg': recent_data['volume'].mean(),
'timestamp': datetime.now().isoformat()
}
def update_predictions(self, market_data: pd.DataFrame, current_price: float) -> List[PivotPrediction]:
"""Update existing predictions or generate new ones"""
# Remove expired predictions
current_time = datetime.now()
self.current_predictions = [
pred for pred in self.current_predictions
if pred.timestamp > current_time - timedelta(minutes=60)
]
# Generate new predictions if needed
if len(self.current_predictions) < 5:
new_predictions = self.generate_predictions(market_data, current_price)
return new_predictions
return self.current_predictions
def capture_training_data(self, actual_pivot: ActualPivot) -> None:
"""
Capture training data by comparing predictions with actual pivots
Args:
actual_pivot: Detected actual pivot point
"""
try:
current_time = datetime.now()
# Find matching predictions within time window
matching_predictions = [
pred for pred in self.current_predictions
if (pred.type == actual_pivot.type and
abs((pred.timestamp - actual_pivot.timestamp).total_seconds()) < 1800) # 30 min window
]
for prediction in matching_predictions:
# Calculate accuracy metrics
price_accuracy = self._calculate_price_accuracy(prediction, actual_pivot)
time_accuracy = self._calculate_time_accuracy(prediction, actual_pivot)
training_point = TrainingDataPoint(
prediction=prediction,
actual_pivot=actual_pivot,
prediction_accuracy=price_accuracy,
time_accuracy=time_accuracy,
captured_at=current_time
)
self.training_data.append(training_point)
logger.info(f"Captured training data point: {prediction.type} pivot with {price_accuracy:.2%} accuracy")
# Save training data periodically
if len(self.training_data) % 5 == 0:
self._save_training_data()
except Exception as e:
logger.error(f"Error capturing training data: {e}")
def _calculate_price_accuracy(self, prediction: PivotPrediction, actual: ActualPivot) -> float:
"""Calculate price prediction accuracy"""
if actual.price == 0:
return 0.0
price_diff = abs(prediction.predicted_price - actual.price)
accuracy = max(0.0, 1.0 - (price_diff / actual.price))
return accuracy
def _calculate_time_accuracy(self, prediction: PivotPrediction, actual: ActualPivot) -> float:
"""Calculate timing prediction accuracy"""
time_diff_seconds = abs((prediction.timestamp - actual.timestamp).total_seconds())
max_acceptable_diff = 1800 # 30 minutes
accuracy = max(0.0, 1.0 - (time_diff_seconds / max_acceptable_diff))
return accuracy
def _save_training_data(self) -> None:
"""Save training data to JSON file"""
try:
filename = f"cnn_training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
filepath = os.path.join(self.training_data_dir, filename)
# Convert to serializable format
data_to_save = []
for point in self.training_data:
data_to_save.append({
'prediction': {
'level': point.prediction.level,
'type': point.prediction.type,
'predicted_price': point.prediction.predicted_price,
'confidence': point.prediction.confidence,
'timestamp': point.prediction.timestamp.isoformat(),
'current_price': point.prediction.current_price,
'model_inputs': point.prediction.model_inputs
},
'actual_pivot': {
'type': point.actual_pivot.type,
'price': point.actual_pivot.price,
'timestamp': point.actual_pivot.timestamp.isoformat(),
'strength': point.actual_pivot.strength
} if point.actual_pivot else None,
'prediction_accuracy': point.prediction_accuracy,
'time_accuracy': point.time_accuracy,
'captured_at': point.captured_at.isoformat()
})
with open(filepath, 'w') as f:
json.dump(data_to_save, f, indent=2)
logger.info(f"Saved {len(data_to_save)} training data points to {filepath}")
# Clear processed data
self.training_data = []
except Exception as e:
logger.error(f"Error saving training data: {e}")
def get_prediction_stats(self) -> Dict:
"""Get current prediction statistics"""
if not self.current_predictions:
return {'active_predictions': 0, 'high_confidence': 0, 'low_confidence': 0}
high_conf = len([p for p in self.current_predictions if p.confidence > 0.7])
low_conf = len([p for p in self.current_predictions if p.confidence <= 0.5])
return {
'active_predictions': len(self.current_predictions),
'high_confidence': high_conf,
'medium_confidence': len(self.current_predictions) - high_conf - low_conf,
'low_confidence': low_conf,
'avg_confidence': np.mean([p.confidence for p in self.current_predictions])
}
def get_training_stats(self) -> Dict:
"""Get training data capture statistics"""
return {
'captured_points': len(self.training_data),
'avg_price_accuracy': np.mean([p.prediction_accuracy for p in self.training_data if p.prediction_accuracy]) if self.training_data else 0,
'avg_time_accuracy': np.mean([p.time_accuracy for p in self.training_data if p.time_accuracy]) if self.training_data else 0
}

View File

@ -1,296 +0,0 @@
#!/usr/bin/env python3
"""
Pivot Detector Core Module
This module handles Williams Market Structure pivot detection logic.
"""
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
# Setup logging with ASCII-only output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class DetectedPivot:
"""Dataclass for detected pivot points"""
type: str # 'HIGH' or 'LOW'
price: float
timestamp: datetime
strength: int
index: int
confirmed: bool = False
williams_level: int = 1
class WilliamsPivotDetector:
"""Williams Market Structure Pivot Detection Engine"""
def __init__(self, config: Optional[Dict] = None):
self.config = config or self._default_config()
self.detected_pivots: List[DetectedPivot] = []
logger.info("Williams Pivot Detector initialized")
def _default_config(self) -> Dict:
"""Default configuration for pivot detection"""
return {
'lookback_periods': 5,
'confirmation_periods': 2,
'min_pivot_distance': 3,
'strength_levels': 5,
'price_threshold_pct': 0.1
}
def detect_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""
Detect pivot points in OHLCV data using Williams Market Structure
Args:
data: DataFrame with OHLCV columns
Returns:
List of detected pivot points
"""
try:
if len(data) < self.config['lookback_periods'] * 2 + 1:
return []
pivots = []
# Detect HIGH pivots
high_pivots = self._detect_high_pivots(data)
pivots.extend(high_pivots)
# Detect LOW pivots
low_pivots = self._detect_low_pivots(data)
pivots.extend(low_pivots)
# Sort by timestamp
pivots.sort(key=lambda x: x.timestamp)
# Filter by minimum distance
filtered_pivots = self._filter_by_distance(pivots)
# Update internal storage
self.detected_pivots = filtered_pivots
logger.info(f"Detected {len(filtered_pivots)} pivot points")
return filtered_pivots
except Exception as e:
logger.error(f"Error detecting pivots: {e}")
return []
def _detect_high_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""Detect HIGH pivot points"""
pivots = []
lookback = self.config['lookback_periods']
for i in range(lookback, len(data) - lookback):
current_high = data.iloc[i]['high']
# Check if current high is higher than surrounding highs
is_pivot = True
for j in range(i - lookback, i + lookback + 1):
if j != i and data.iloc[j]['high'] >= current_high:
is_pivot = False
break
if is_pivot:
# Calculate pivot strength
strength = self._calculate_pivot_strength(data, i, 'HIGH')
pivot = DetectedPivot(
type='HIGH',
price=current_high,
timestamp=data.iloc[i]['timestamp'] if 'timestamp' in data.columns else datetime.now(),
strength=strength,
index=i,
confirmed=i < len(data) - self.config['confirmation_periods'],
williams_level=min(strength, 5)
)
pivots.append(pivot)
return pivots
def _detect_low_pivots(self, data: pd.DataFrame) -> List[DetectedPivot]:
"""Detect LOW pivot points"""
pivots = []
lookback = self.config['lookback_periods']
for i in range(lookback, len(data) - lookback):
current_low = data.iloc[i]['low']
# Check if current low is lower than surrounding lows
is_pivot = True
for j in range(i - lookback, i + lookback + 1):
if j != i and data.iloc[j]['low'] <= current_low:
is_pivot = False
break
if is_pivot:
# Calculate pivot strength
strength = self._calculate_pivot_strength(data, i, 'LOW')
pivot = DetectedPivot(
type='LOW',
price=current_low,
timestamp=data.iloc[i]['timestamp'] if 'timestamp' in data.columns else datetime.now(),
strength=strength,
index=i,
confirmed=i < len(data) - self.config['confirmation_periods'],
williams_level=min(strength, 5)
)
pivots.append(pivot)
return pivots
def _calculate_pivot_strength(self, data: pd.DataFrame, pivot_index: int, pivot_type: str) -> int:
"""Calculate the strength of a pivot point (1-5 scale)"""
try:
if pivot_type == 'HIGH':
pivot_price = data.iloc[pivot_index]['high']
price_column = 'high'
else:
pivot_price = data.iloc[pivot_index]['low']
price_column = 'low'
strength = 1
# Check increasing ranges around the pivot
for range_size in [3, 5, 8, 13, 21]: # Fibonacci-like sequence
if pivot_index >= range_size and pivot_index < len(data) - range_size:
is_extreme = True
for i in range(pivot_index - range_size, pivot_index + range_size + 1):
if i != pivot_index:
if pivot_type == 'HIGH' and data.iloc[i][price_column] >= pivot_price:
is_extreme = False
break
elif pivot_type == 'LOW' and data.iloc[i][price_column] <= pivot_price:
is_extreme = False
break
if is_extreme:
strength += 1
else:
break
return min(strength, 5)
except Exception as e:
logger.error(f"Error calculating pivot strength: {e}")
return 1
def _filter_by_distance(self, pivots: List[DetectedPivot]) -> List[DetectedPivot]:
"""Filter pivots that are too close to each other"""
if not pivots:
return []
filtered = [pivots[0]]
min_distance = self.config['min_pivot_distance']
for pivot in pivots[1:]:
# Check distance from all previously added pivots
too_close = False
for existing_pivot in filtered:
if abs(pivot.index - existing_pivot.index) < min_distance:
# Keep the stronger pivot
if pivot.strength > existing_pivot.strength:
filtered.remove(existing_pivot)
filtered.append(pivot)
too_close = True
break
if not too_close:
filtered.append(pivot)
return sorted(filtered, key=lambda x: x.timestamp)
def get_recent_pivots(self, hours: int = 24) -> List[DetectedPivot]:
"""Get pivots detected in the last N hours"""
cutoff_time = datetime.now() - timedelta(hours=hours)
return [pivot for pivot in self.detected_pivots if pivot.timestamp > cutoff_time]
def get_pivot_levels(self) -> Dict[int, List[DetectedPivot]]:
"""Group pivots by Williams strength levels"""
levels = {}
for pivot in self.detected_pivots:
level = pivot.williams_level
if level not in levels:
levels[level] = []
levels[level].append(pivot)
return levels
def is_potential_pivot(self, data: pd.DataFrame, current_index: int) -> Optional[Dict]:
"""Check if current position might be a pivot (for real-time detection)"""
try:
if current_index < self.config['lookback_periods']:
return None
lookback = self.config['lookback_periods']
current_high = data.iloc[current_index]['high']
current_low = data.iloc[current_index]['low']
# Check for potential HIGH pivot
is_high_pivot = True
for i in range(current_index - lookback, current_index):
if data.iloc[i]['high'] >= current_high:
is_high_pivot = False
break
# Check for potential LOW pivot
is_low_pivot = True
for i in range(current_index - lookback, current_index):
if data.iloc[i]['low'] <= current_low:
is_low_pivot = False
break
result = {}
if is_high_pivot:
result['HIGH'] = {
'price': current_high,
'confidence': 0.7, # Unconfirmed
'strength': self._calculate_pivot_strength(data, current_index, 'HIGH')
}
if is_low_pivot:
result['LOW'] = {
'price': current_low,
'confidence': 0.7, # Unconfirmed
'strength': self._calculate_pivot_strength(data, current_index, 'LOW')
}
return result if result else None
except Exception as e:
logger.error(f"Error checking potential pivot: {e}")
return None
def get_statistics(self) -> Dict:
"""Get pivot detection statistics"""
if not self.detected_pivots:
return {'total_pivots': 0, 'high_pivots': 0, 'low_pivots': 0}
high_count = len([p for p in self.detected_pivots if p.type == 'HIGH'])
low_count = len([p for p in self.detected_pivots if p.type == 'LOW'])
confirmed_count = len([p for p in self.detected_pivots if p.confirmed])
avg_strength = np.mean([p.strength for p in self.detected_pivots])
return {
'total_pivots': len(self.detected_pivots),
'high_pivots': high_count,
'low_pivots': low_count,
'confirmed_pivots': confirmed_count,
'average_strength': avg_strength,
'strength_distribution': {
i: len([p for p in self.detected_pivots if p.strength == i])
for i in range(1, 6)
}
}