1318 lines
61 KiB
Python
1318 lines
61 KiB
Python
"""
|
|
Williams Market Structure Implementation for RL Training
|
|
|
|
This module implements Larry Williams market structure analysis methodology for
|
|
RL training enhancement with:
|
|
|
|
**SINGLE TIMEFRAME RECURSIVE APPROACH:**
|
|
- Level 0: 1s OHLCV data → swing points using configurable strength [2, 3, 5]
|
|
- Level 1: Level 0 swing points treated as "price bars" → higher-level swing points
|
|
- Level 2: Level 1 swing points treated as "price bars" → even higher-level swing points
|
|
- Level 3: Level 2 swing points treated as "price bars" → top-level swing points
|
|
- Level 4: Level 3 swing points treated as "price bars" → highest-level swing points
|
|
|
|
**RECURSIVE METHODOLOGY:**
|
|
Each level uses the previous level's swing points as input "price data", where:
|
|
- Each swing point becomes a "price bar" with OHLC = swing point price
|
|
- Swing strength detection applied to find patterns in swing point sequences
|
|
- This creates fractal market structure analysis across 5 recursive levels
|
|
|
|
**NOT MULTI-TIMEFRAME:**
|
|
Williams structure uses ONLY 1s data and builds recursively.
|
|
Multi-timeframe data (1m, 1h) is used separately for CNN feature enhancement.
|
|
|
|
Based on Larry Williams' teachings on market structure:
|
|
- Markets move in swings between support and resistance
|
|
- Higher recursive levels reveal longer-term structure patterns
|
|
- Recursive analysis reveals fractal patterns within market movements
|
|
- Trend direction determined by swing point relationships across levels
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
|
|
# Setup logger immediately after logging import
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from NN.models.cnn_model import CNNModel
|
|
except ImportError:
|
|
try:
|
|
# Fallback import path
|
|
import sys
|
|
import os
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.append(project_root)
|
|
from NN.models.cnn_model import CNNModel
|
|
except ImportError:
|
|
# Create fallback CNN model for development/testing
|
|
class CNNModel:
|
|
def __init__(self, input_shape=(900, 50), output_size=10):
|
|
self.input_shape = input_shape
|
|
self.output_size = output_size
|
|
self.model = None
|
|
logger.info(f"Fallback CNN Model initialized: input_shape={input_shape}, output_size={output_size}")
|
|
|
|
def build_model(self, **kwargs):
|
|
logger.info("Fallback CNN Model build_model called - using dummy model")
|
|
return self
|
|
|
|
def predict(self, X):
|
|
# Return dummy predictions for testing
|
|
batch_size = X.shape[0] if hasattr(X, 'shape') else 1
|
|
if self.output_size == 1:
|
|
pred_class = np.random.choice([0, 1], size=batch_size)
|
|
pred_proba = np.random.random(batch_size)
|
|
else:
|
|
pred_class = np.random.randint(0, self.output_size, size=batch_size)
|
|
pred_proba = np.random.random((batch_size, self.output_size))
|
|
logger.debug(f"Fallback CNN prediction: class={pred_class}, proba_shape={np.array(pred_proba).shape}")
|
|
return pred_class, pred_proba
|
|
|
|
def fit(self, X, y, **kwargs):
|
|
logger.info(f"Fallback CNN training: X_shape={X.shape}, y_shape={y.shape}")
|
|
return self
|
|
|
|
logger.warning("Using fallback CNN model - CNN training will work but with dummy predictions")
|
|
|
|
try:
|
|
from core.unified_data_stream import TrainingDataPacket
|
|
except ImportError:
|
|
TrainingDataPacket = None
|
|
print("Warning: TrainingDataPacket could not be imported. Using fallback interface.")
|
|
|
|
|
|
class TrendDirection(Enum):
|
|
UP = "up"
|
|
DOWN = "down"
|
|
SIDEWAYS = "sideways"
|
|
UNKNOWN = "unknown"
|
|
|
|
class SwingType(Enum):
|
|
SWING_HIGH = "swing_high"
|
|
SWING_LOW = "swing_low"
|
|
|
|
@dataclass
|
|
class SwingPoint:
|
|
"""Represents a swing high or low point"""
|
|
timestamp: datetime
|
|
price: float
|
|
index: int
|
|
swing_type: SwingType
|
|
strength: int # Number of bars on each side that confirm the swing
|
|
volume: float = 0.0
|
|
|
|
@dataclass
|
|
class TrendAnalysis:
|
|
"""Trend analysis results"""
|
|
direction: TrendDirection
|
|
strength: float # 0.0 to 1.0
|
|
confidence: float # 0.0 to 1.0
|
|
swing_count: int
|
|
last_swing_high: Optional[SwingPoint]
|
|
last_swing_low: Optional[SwingPoint]
|
|
higher_highs: int
|
|
higher_lows: int
|
|
lower_highs: int
|
|
lower_lows: int
|
|
|
|
@dataclass
|
|
class MarketStructureLevel:
|
|
"""Market structure analysis for one recursive level"""
|
|
level: int
|
|
swing_points: List[SwingPoint]
|
|
trend_analysis: TrendAnalysis
|
|
support_levels: List[float]
|
|
resistance_levels: List[float]
|
|
current_bias: TrendDirection
|
|
structure_breaks: List[Dict[str, Any]]
|
|
|
|
class WilliamsMarketStructure:
|
|
"""
|
|
Implementation of Larry Williams market structure methodology
|
|
|
|
Features:
|
|
- Multi-strength swing detection (2, 3, 5, 8, 13 bar strengths)
|
|
- 5 levels of recursive analysis
|
|
- Trend direction determination
|
|
- Support/resistance level identification
|
|
- Market bias calculation
|
|
- Structure break detection
|
|
"""
|
|
|
|
def __init__(self,
|
|
swing_strengths: List[int] = None,
|
|
cnn_input_shape: Optional[Tuple[int, int]] = (900, 50), # Updated: 900 timesteps (1s), 50 features
|
|
cnn_output_size: Optional[int] = 10, # Updated: 5 levels * (type + price) = 10 outputs
|
|
cnn_model_config: Optional[Dict[str, Any]] = None, # For build_model params like filters, learning_rate
|
|
cnn_model_path: Optional[str] = None,
|
|
enable_cnn_feature: bool = True, # Master switch for this feature
|
|
training_data_provider: Optional[Any] = None): # Provider for TrainingDataPacket access
|
|
"""
|
|
Initialize Williams market structure analyzer
|
|
|
|
Args:
|
|
swing_strengths: List of swing detection strengths (bars on each side)
|
|
cnn_input_shape: Shape of input data for CNN (sequence_length, features)
|
|
cnn_output_size: Number of output classes for CNN (10 for 5 levels * 2 outputs each)
|
|
cnn_model_config: Dictionary with parameters for CNNModel.build_model()
|
|
cnn_model_path: Path to a pre-trained Keras CNN model (.h5 file)
|
|
enable_cnn_feature: If True, enables CNN prediction and training at pivots.
|
|
training_data_provider: Provider/stream for accessing TrainingDataPacket
|
|
"""
|
|
self.swing_strengths = swing_strengths or [2, 3, 5] # Simplified strengths for better performance
|
|
self.max_levels = 5
|
|
self.min_swings_for_trend = 3
|
|
|
|
# Cache for performance
|
|
self.swing_cache = {}
|
|
self.trend_cache = {}
|
|
|
|
self.enable_cnn_feature = enable_cnn_feature and CNNModel is not None
|
|
# Force enable CNN for development - always True now with fallback model
|
|
self.enable_cnn_feature = True
|
|
self.cnn_model: Optional[CNNModel] = None
|
|
self.previous_pivot_details_for_cnn: Optional[Dict[str, Any]] = None # Stores {'features': X, 'pivot': SwingPoint}
|
|
self.training_data_provider = training_data_provider # Access to TrainingDataPacket
|
|
|
|
if self.enable_cnn_feature:
|
|
try:
|
|
logger.info(f"Initializing CNN for multi-timeframe pivot prediction. Input: {cnn_input_shape}, Output: {cnn_output_size}")
|
|
logger.info("CNN will predict next pivot (type + price) for all 5 Williams levels")
|
|
|
|
self.cnn_model = CNNModel(input_shape=cnn_input_shape, output_size=cnn_output_size)
|
|
if cnn_model_path:
|
|
logger.info(f"Loading pre-trained CNN model from: {cnn_model_path}")
|
|
self.cnn_model.load(cnn_model_path)
|
|
else:
|
|
logger.info("Building new CNN model.")
|
|
# Use provided config or defaults for build_model
|
|
build_params = cnn_model_config or {}
|
|
self.cnn_model.build_model(**build_params)
|
|
logger.info("CNN Model initialized successfully.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize or load CNN model: {e}. Disabling CNN feature.", exc_info=True)
|
|
self.enable_cnn_feature = False
|
|
else:
|
|
logger.info("CNN feature for pivot prediction/training is disabled.")
|
|
|
|
logger.info(f"Williams Market Structure initialized with strengths: {self.swing_strengths}")
|
|
|
|
def calculate_recursive_pivot_points(self, ohlcv_data: np.ndarray) -> Dict[str, MarketStructureLevel]:
|
|
"""
|
|
Calculate 5 levels of recursive pivot points using SINGLE TIMEFRAME (1s) data
|
|
|
|
**RECURSIVE STRUCTURE:**
|
|
- Level 0: Raw 1s OHLCV data → swing points (strength 2, 3, 5)
|
|
- Level 1: Level 0 swing points → treated as "price bars" → higher-level swing points
|
|
- Level 2: Level 1 swing points → treated as "price bars" → even higher-level swing points
|
|
- Level 3: Level 2 swing points → treated as "price bars" → top-level swing points
|
|
- Level 4: Level 3 swing points → treated as "price bars" → highest-level swing points
|
|
|
|
**HOW RECURSION WORKS:**
|
|
1. Start with 1s OHLCV data (timestamp, open, high, low, close, volume)
|
|
2. Find Level 0 swing points using configurable strength [2, 3, 5]
|
|
3. Convert Level 0 swing points to "price bar" format where OHLC = swing price
|
|
4. Apply swing detection to these "price bars" to find Level 1 swing points
|
|
5. Repeat process: Level N swing points → "price bars" → Level N+1 swing points
|
|
|
|
This creates a fractal analysis where each level reveals longer-term structure patterns
|
|
within the same 1s timeframe data, NOT across different timeframes.
|
|
|
|
Args:
|
|
ohlcv_data: 1s OHLCV data array [timestamp, open, high, low, close, volume]
|
|
|
|
Returns:
|
|
Dict of 5 market structure levels with recursive swing points and analysis
|
|
"""
|
|
if len(ohlcv_data) < 20:
|
|
logger.warning("Insufficient data for Williams structure analysis")
|
|
return self._create_empty_structure()
|
|
|
|
levels = {}
|
|
current_price_points = ohlcv_data.copy() # Start with raw 1s OHLCV data
|
|
|
|
for level in range(self.max_levels):
|
|
logger.debug(f"Analyzing level {level} with {len(current_price_points)} data points")
|
|
|
|
if level == 0:
|
|
# Level 0: Calculate swing points from raw 1s OHLCV data
|
|
swing_points = self._find_swing_points_multi_strength(current_price_points)
|
|
else:
|
|
# Level 1-4: Calculate swing points from previous level's swing points
|
|
# Previous level's swing points are treated as "price bars"
|
|
swing_points = self._find_pivot_points_from_pivot_points(current_price_points, level)
|
|
|
|
if len(swing_points) < self.min_swings_for_trend:
|
|
logger.debug(f"Not enough swings at level {level}: {len(swing_points)}")
|
|
# Fill remaining levels with empty data
|
|
for remaining_level in range(level, self.max_levels):
|
|
levels[f'level_{remaining_level}'] = self._create_empty_level(remaining_level)
|
|
break
|
|
|
|
# Analyze trend for this level
|
|
trend_analysis = self._analyze_trend_from_swings(swing_points)
|
|
|
|
# Find support/resistance levels
|
|
support_levels, resistance_levels = self._find_support_resistance(
|
|
swing_points, current_price_points if level == 0 else None
|
|
)
|
|
|
|
# Determine current market bias
|
|
current_bias = self._determine_market_bias(swing_points, trend_analysis)
|
|
|
|
# Detect structure breaks
|
|
structure_breaks = self._detect_structure_breaks(swing_points, current_price_points if level == 0 else None)
|
|
|
|
# Create level data
|
|
levels[f'level_{level}'] = MarketStructureLevel(
|
|
level=level,
|
|
swing_points=swing_points,
|
|
trend_analysis=trend_analysis,
|
|
support_levels=support_levels,
|
|
resistance_levels=resistance_levels,
|
|
current_bias=current_bias,
|
|
structure_breaks=structure_breaks
|
|
)
|
|
|
|
# Prepare data for next level: convert swing points to "price points"
|
|
if len(swing_points) >= 5:
|
|
current_price_points = self._convert_pivots_to_price_points(swing_points)
|
|
if len(current_price_points) < 10:
|
|
logger.debug(f"Insufficient pivot data for level {level + 1}")
|
|
break
|
|
else:
|
|
logger.debug(f"Not enough swings to continue to level {level + 1}")
|
|
break
|
|
|
|
# Fill any remaining empty levels
|
|
for remaining_level in range(len(levels), self.max_levels):
|
|
levels[f'level_{remaining_level}'] = self._create_empty_level(remaining_level)
|
|
|
|
return levels
|
|
|
|
def _find_swing_points_multi_strength(self, ohlcv_data: np.ndarray) -> List[SwingPoint]:
|
|
"""Find swing points using multiple strength criteria"""
|
|
all_swings = []
|
|
|
|
for strength in self.swing_strengths:
|
|
swings_at_strength = self._find_swing_points_single_strength(ohlcv_data, strength)
|
|
for swing in swings_at_strength:
|
|
# Avoid duplicates (swings at same index)
|
|
if not any(existing.index == swing.index for existing in all_swings):
|
|
all_swings.append(swing)
|
|
|
|
# Sort by timestamp/index
|
|
all_swings.sort(key=lambda x: x.index)
|
|
|
|
# Filter to get the most significant swings
|
|
return self._filter_significant_swings(all_swings)
|
|
|
|
def _find_swing_points_single_strength(self, ohlcv_data: np.ndarray, strength: int) -> List[SwingPoint]:
|
|
"""Find swing points with specific strength requirement"""
|
|
identified_swings_in_this_call = [] # Temporary list for swings found in this specific call
|
|
|
|
if len(ohlcv_data) < (strength * 2 + 1):
|
|
return identified_swings_in_this_call
|
|
|
|
for i in range(strength, len(ohlcv_data) - strength):
|
|
current_high = ohlcv_data[i, 2] # High price
|
|
current_low = ohlcv_data[i, 3] # Low price
|
|
current_volume = ohlcv_data[i, 5] if ohlcv_data.shape[1] > 5 else 0.0
|
|
|
|
# Check for swing high (higher than surrounding bars)
|
|
is_swing_high = True
|
|
for j in range(i - strength, i + strength + 1):
|
|
if j != i and ohlcv_data[j, 2] >= current_high:
|
|
is_swing_high = False
|
|
break
|
|
|
|
if is_swing_high:
|
|
new_pivot = SwingPoint(
|
|
timestamp=datetime.fromtimestamp(ohlcv_data[i, 0]) if ohlcv_data[i, 0] > 1e9 else datetime.now(),
|
|
price=current_high,
|
|
index=i,
|
|
swing_type=SwingType.SWING_HIGH,
|
|
strength=strength,
|
|
volume=current_volume
|
|
)
|
|
identified_swings_in_this_call.append(new_pivot)
|
|
self._handle_cnn_at_pivot(new_pivot, ohlcv_data) # CNN logic call
|
|
|
|
# Check for swing low (lower than surrounding bars)
|
|
is_swing_low = True
|
|
for j in range(i - strength, i + strength + 1):
|
|
if j != i and ohlcv_data[j, 3] <= current_low:
|
|
is_swing_low = False
|
|
break
|
|
|
|
if is_swing_low:
|
|
new_pivot = SwingPoint(
|
|
timestamp=datetime.fromtimestamp(ohlcv_data[i, 0]) if ohlcv_data[i, 0] > 1e9 else datetime.now(),
|
|
price=current_low,
|
|
index=i,
|
|
swing_type=SwingType.SWING_LOW,
|
|
strength=strength,
|
|
volume=current_volume
|
|
)
|
|
identified_swings_in_this_call.append(new_pivot)
|
|
self._handle_cnn_at_pivot(new_pivot, ohlcv_data) # CNN logic call
|
|
|
|
return identified_swings_in_this_call # Return swings found in this call
|
|
|
|
def _filter_significant_swings(self, swings: List[SwingPoint]) -> List[SwingPoint]:
|
|
"""Filter to keep only the most significant swings"""
|
|
if len(swings) <= 20:
|
|
return swings
|
|
|
|
# Sort by strength (higher strength = more significant)
|
|
swings_by_strength = sorted(swings, key=lambda x: x.strength, reverse=True)
|
|
|
|
# Take top swings but ensure we have alternating highs and lows
|
|
significant_swings = []
|
|
last_type = None
|
|
|
|
for swing in swings_by_strength:
|
|
if len(significant_swings) >= 20:
|
|
break
|
|
|
|
# Prefer alternating swing types for better structure
|
|
if last_type is None or swing.swing_type != last_type:
|
|
significant_swings.append(swing)
|
|
last_type = swing.swing_type
|
|
elif len(significant_swings) < 10: # Still add if we need more swings
|
|
significant_swings.append(swing)
|
|
|
|
# Sort by index again
|
|
significant_swings.sort(key=lambda x: x.index)
|
|
return significant_swings
|
|
|
|
def _analyze_trend_from_swings(self, swing_points: List[SwingPoint]) -> TrendAnalysis:
|
|
"""Analyze trend direction from swing points"""
|
|
if len(swing_points) < 2:
|
|
return TrendAnalysis(
|
|
direction=TrendDirection.UNKNOWN,
|
|
strength=0.0,
|
|
confidence=0.0,
|
|
swing_count=0,
|
|
last_swing_high=None,
|
|
last_swing_low=None,
|
|
higher_highs=0,
|
|
higher_lows=0,
|
|
lower_highs=0,
|
|
lower_lows=0
|
|
)
|
|
|
|
# Separate highs and lows
|
|
highs = [s for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
|
|
lows = [s for s in swing_points if s.swing_type == SwingType.SWING_LOW]
|
|
|
|
# Count higher highs, higher lows, lower highs, lower lows
|
|
higher_highs = self._count_higher_highs(highs)
|
|
higher_lows = self._count_higher_lows(lows)
|
|
lower_highs = self._count_lower_highs(highs)
|
|
lower_lows = self._count_lower_lows(lows)
|
|
|
|
# Determine trend direction
|
|
if higher_highs > 0 and higher_lows > 0:
|
|
direction = TrendDirection.UP
|
|
elif lower_highs > 0 and lower_lows > 0:
|
|
direction = TrendDirection.DOWN
|
|
else:
|
|
direction = TrendDirection.SIDEWAYS
|
|
|
|
# Calculate trend strength
|
|
total_moves = higher_highs + higher_lows + lower_highs + lower_lows
|
|
if direction == TrendDirection.UP:
|
|
strength = (higher_highs + higher_lows) / max(total_moves, 1)
|
|
elif direction == TrendDirection.DOWN:
|
|
strength = (lower_highs + lower_lows) / max(total_moves, 1)
|
|
else:
|
|
strength = 0.5 # Neutral for sideways
|
|
|
|
# Calculate confidence based on consistency
|
|
if total_moves > 0:
|
|
if direction == TrendDirection.UP:
|
|
confidence = (higher_highs + higher_lows) / total_moves
|
|
elif direction == TrendDirection.DOWN:
|
|
confidence = (lower_highs + lower_lows) / total_moves
|
|
else:
|
|
# For sideways, confidence is based on how balanced it is
|
|
up_moves = higher_highs + higher_lows
|
|
down_moves = lower_highs + lower_lows
|
|
balance = 1.0 - abs(up_moves - down_moves) / total_moves
|
|
confidence = balance
|
|
else:
|
|
confidence = 0.0
|
|
|
|
return TrendAnalysis(
|
|
direction=direction,
|
|
strength=min(strength, 1.0),
|
|
confidence=min(confidence, 1.0),
|
|
swing_count=len(swing_points),
|
|
last_swing_high=highs[-1] if highs else None,
|
|
last_swing_low=lows[-1] if lows else None,
|
|
higher_highs=higher_highs,
|
|
higher_lows=higher_lows,
|
|
lower_highs=lower_highs,
|
|
lower_lows=lower_lows
|
|
)
|
|
|
|
def _count_higher_highs(self, highs: List[SwingPoint]) -> int:
|
|
"""Count higher highs in sequence"""
|
|
if len(highs) < 2:
|
|
return 0
|
|
|
|
count = 0
|
|
for i in range(1, len(highs)):
|
|
if highs[i].price > highs[i-1].price:
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def _count_higher_lows(self, lows: List[SwingPoint]) -> int:
|
|
"""Count higher lows in sequence"""
|
|
if len(lows) < 2:
|
|
return 0
|
|
|
|
count = 0
|
|
for i in range(1, len(lows)):
|
|
if lows[i].price > lows[i-1].price:
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def _count_lower_highs(self, highs: List[SwingPoint]) -> int:
|
|
"""Count lower highs in sequence"""
|
|
if len(highs) < 2:
|
|
return 0
|
|
|
|
count = 0
|
|
for i in range(1, len(highs)):
|
|
if highs[i].price < highs[i-1].price:
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def _count_lower_lows(self, lows: List[SwingPoint]) -> int:
|
|
"""Count lower lows in sequence"""
|
|
if len(lows) < 2:
|
|
return 0
|
|
|
|
count = 0
|
|
for i in range(1, len(lows)):
|
|
if lows[i].price < lows[i-1].price:
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def _find_support_resistance(self, swing_points: List[SwingPoint],
|
|
ohlcv_data: np.ndarray) -> Tuple[List[float], List[float]]:
|
|
"""Find support and resistance levels from swing points"""
|
|
highs = [s.price for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
|
|
lows = [s.price for s in swing_points if s.swing_type == SwingType.SWING_LOW]
|
|
|
|
# Cluster similar levels
|
|
support_levels = self._cluster_price_levels(lows) if lows else []
|
|
resistance_levels = self._cluster_price_levels(highs) if highs else []
|
|
|
|
return support_levels, resistance_levels
|
|
|
|
def _cluster_price_levels(self, prices: List[float], tolerance: float = 0.02) -> List[float]:
|
|
"""Cluster similar price levels together"""
|
|
if not prices:
|
|
return []
|
|
|
|
sorted_prices = sorted(prices)
|
|
clusters = []
|
|
current_cluster = [sorted_prices[0]]
|
|
|
|
for price in sorted_prices[1:]:
|
|
# If price is within tolerance of cluster average, add to cluster
|
|
cluster_avg = np.mean(current_cluster)
|
|
if abs(price - cluster_avg) / cluster_avg <= tolerance:
|
|
current_cluster.append(price)
|
|
else:
|
|
# Start new cluster
|
|
clusters.append(np.mean(current_cluster))
|
|
current_cluster = [price]
|
|
|
|
# Add last cluster
|
|
if current_cluster:
|
|
clusters.append(np.mean(current_cluster))
|
|
|
|
return clusters
|
|
|
|
def _determine_market_bias(self, swing_points: List[SwingPoint],
|
|
trend_analysis: TrendAnalysis) -> TrendDirection:
|
|
"""Determine current market bias"""
|
|
if not swing_points:
|
|
return TrendDirection.UNKNOWN
|
|
|
|
# Use trend analysis as primary indicator
|
|
if trend_analysis.confidence > 0.6:
|
|
return trend_analysis.direction
|
|
|
|
# Look at most recent swings for bias
|
|
recent_swings = swing_points[-6:] if len(swing_points) >= 6 else swing_points
|
|
|
|
if len(recent_swings) >= 2:
|
|
first_price = recent_swings[0].price
|
|
last_price = recent_swings[-1].price
|
|
|
|
price_change = (last_price - first_price) / first_price
|
|
|
|
if price_change > 0.01: # 1% threshold
|
|
return TrendDirection.UP
|
|
elif price_change < -0.01:
|
|
return TrendDirection.DOWN
|
|
else:
|
|
return TrendDirection.SIDEWAYS
|
|
|
|
return TrendDirection.UNKNOWN
|
|
|
|
def _detect_structure_breaks(self, swing_points: List[SwingPoint],
|
|
ohlcv_data: np.ndarray) -> List[Dict[str, Any]]:
|
|
"""Detect structure breaks (trend changes)"""
|
|
structure_breaks = []
|
|
|
|
if len(swing_points) < 4:
|
|
return structure_breaks
|
|
|
|
# Look for pattern breaks
|
|
highs = [s for s in swing_points if s.swing_type == SwingType.SWING_HIGH]
|
|
lows = [s for s in swing_points if s.swing_type == SwingType.SWING_LOW]
|
|
|
|
# Check for break of structure in highs (lower high after higher highs)
|
|
if len(highs) >= 3:
|
|
for i in range(2, len(highs)):
|
|
if (highs[i-2].price < highs[i-1].price and # Previous was higher high
|
|
highs[i-1].price > highs[i].price): # Current is lower high
|
|
|
|
structure_breaks.append({
|
|
'type': 'break_of_structure_high',
|
|
'timestamp': highs[i].timestamp,
|
|
'price': highs[i].price,
|
|
'previous_high': highs[i-1].price,
|
|
'significance': abs(highs[i].price - highs[i-1].price) / highs[i-1].price
|
|
})
|
|
|
|
# Check for break of structure in lows (higher low after lower lows)
|
|
if len(lows) >= 3:
|
|
for i in range(2, len(lows)):
|
|
if (lows[i-2].price > lows[i-1].price and # Previous was lower low
|
|
lows[i-1].price < lows[i].price): # Current is higher low
|
|
|
|
structure_breaks.append({
|
|
'type': 'break_of_structure_low',
|
|
'timestamp': lows[i].timestamp,
|
|
'price': lows[i].price,
|
|
'previous_low': lows[i-1].price,
|
|
'significance': abs(lows[i].price - lows[i-1].price) / lows[i-1].price
|
|
})
|
|
|
|
return structure_breaks
|
|
|
|
def _find_pivot_points_from_pivot_points(self, pivot_array: np.ndarray, level: int) -> List[SwingPoint]:
|
|
"""
|
|
Find swing points from previous level's swing points (RECURSIVE APPROACH)
|
|
|
|
**RECURSIVE SWING DETECTION:**
|
|
For Level N (where N > 0): A Level N swing high occurs when a Level N-1 swing point
|
|
is higher than surrounding Level N-1 swing points (and vice versa for lows).
|
|
|
|
This is NOT multi-timeframe analysis - it's recursive fractal analysis where:
|
|
- Level 1 finds patterns in Level 0 swing sequences (from 1s data)
|
|
- Level 2 finds patterns in Level 1 swing sequences
|
|
- Level 3 finds patterns in Level 2 swing sequences
|
|
- Level 4 finds patterns in Level 3 swing sequences
|
|
|
|
All based on the original 1s timeframe data, recursively analyzed.
|
|
|
|
Args:
|
|
pivot_array: Array of Level N-1 swing points formatted as "price bars"
|
|
[timestamp, price, price, price, price, 0] format
|
|
level: Current recursive level being calculated (1, 2, 3, or 4)
|
|
"""
|
|
identified_swings_in_this_call = [] # Temporary list
|
|
|
|
if len(pivot_array) < 5: # Min bars for even smallest strength (e.g. strength 2 needs 2+1+2=5)
|
|
return identified_swings_in_this_call
|
|
|
|
# Use configurable strength for higher levels (more conservative)
|
|
strength = min(2 + level, 5) # Level 1: 3 bars, Level 2: 4 bars, Level 3+: 5 bars
|
|
|
|
for i in range(strength, len(pivot_array) - strength):
|
|
current_price = pivot_array[i, 1] # Use the price from pivot point
|
|
current_timestamp = pivot_array[i, 0]
|
|
|
|
# Check for swing high (pivot high surrounded by lower pivot highs)
|
|
is_swing_high = True
|
|
for j in range(i - strength, i + strength + 1):
|
|
if j != i and pivot_array[j, 1] >= current_price: # Compare with price of other pivots
|
|
is_swing_high = False
|
|
break
|
|
|
|
if is_swing_high:
|
|
new_pivot = SwingPoint(
|
|
timestamp=datetime.fromtimestamp(current_timestamp) if current_timestamp > 1e9 else datetime.now(),
|
|
price=current_price,
|
|
index=i,
|
|
swing_type=SwingType.SWING_HIGH,
|
|
strength=strength, # Strength here is derived from level, e.g., min(2 + level, 5)
|
|
volume=0.0 # Pivot points don't have volume
|
|
)
|
|
identified_swings_in_this_call.append(new_pivot)
|
|
self._handle_cnn_at_pivot(new_pivot, pivot_array) # CNN logic call
|
|
|
|
# Check for swing low (pivot low surrounded by higher pivot lows)
|
|
is_swing_low = True
|
|
for j in range(i - strength, i + strength + 1):
|
|
if j != i and pivot_array[j, 1] <= current_price: # Compare with price of other pivots
|
|
is_swing_low = False
|
|
break
|
|
|
|
if is_swing_low:
|
|
new_pivot = SwingPoint(
|
|
timestamp=datetime.fromtimestamp(current_timestamp) if current_timestamp > 1e9 else datetime.now(),
|
|
price=current_price,
|
|
index=i,
|
|
swing_type=SwingType.SWING_LOW,
|
|
strength=strength, # Strength here is derived from level
|
|
volume=0.0 # Pivot points don't have volume
|
|
)
|
|
identified_swings_in_this_call.append(new_pivot)
|
|
self._handle_cnn_at_pivot(new_pivot, pivot_array) # CNN logic call
|
|
|
|
return identified_swings_in_this_call # Return swings found in this call
|
|
|
|
def _convert_pivots_to_price_points(self, swing_points: List[SwingPoint]) -> np.ndarray:
|
|
"""
|
|
Convert swing points to "price bar" format for next recursive level calculation
|
|
|
|
**RECURSIVE CONVERSION PROCESS:**
|
|
Each swing point from Level N becomes a "price bar" for Level N+1 calculation:
|
|
- Timestamp = swing point timestamp
|
|
- Open = High = Low = Close = swing point price (since it's a single point)
|
|
- Volume = 0 (not applicable for swing points)
|
|
|
|
This allows Level N+1 to treat Level N swing points as if they were regular
|
|
OHLCV price bars, enabling the same swing detection algorithm to find
|
|
higher-level patterns in the swing point sequences.
|
|
|
|
Example:
|
|
- Level 0: 1000 x 1s bars → 50 swing points
|
|
- Level 1: 50 "price bars" (from Level 0 swings) → 10 swing points
|
|
- Level 2: 10 "price bars" (from Level 1 swings) → 3 swing points
|
|
"""
|
|
if len(swing_points) < 2:
|
|
return np.array([])
|
|
|
|
price_points = []
|
|
|
|
for swing in swing_points:
|
|
# Each pivot point becomes a price point where OHLC = pivot price
|
|
price_points.append([
|
|
swing.timestamp.timestamp(),
|
|
swing.price, # Open = pivot price
|
|
swing.price, # High = pivot price
|
|
swing.price, # Low = pivot price
|
|
swing.price, # Close = pivot price
|
|
0.0 # Volume = 0 (not applicable for pivot points)
|
|
])
|
|
|
|
return np.array(price_points)
|
|
|
|
def _create_empty_structure(self) -> Dict[str, MarketStructureLevel]:
|
|
"""Create empty structure when insufficient data"""
|
|
return {f'level_{i}': self._create_empty_level(i) for i in range(self.max_levels)}
|
|
|
|
def _create_empty_level(self, level: int) -> MarketStructureLevel:
|
|
"""Create empty market structure level"""
|
|
return MarketStructureLevel(
|
|
level=level,
|
|
swing_points=[],
|
|
trend_analysis=TrendAnalysis(
|
|
direction=TrendDirection.UNKNOWN,
|
|
strength=0.0,
|
|
confidence=0.0,
|
|
swing_count=0,
|
|
last_swing_high=None,
|
|
last_swing_low=None,
|
|
higher_highs=0,
|
|
higher_lows=0,
|
|
lower_highs=0,
|
|
lower_lows=0
|
|
),
|
|
support_levels=[],
|
|
resistance_levels=[],
|
|
current_bias=TrendDirection.UNKNOWN,
|
|
structure_breaks=[]
|
|
)
|
|
|
|
def extract_features_for_rl(self, structure_levels: Dict[str, MarketStructureLevel]) -> List[float]:
|
|
"""
|
|
Extract features from Williams structure for RL training
|
|
|
|
Returns ~250 features total:
|
|
- 50 features per level (5 levels)
|
|
"""
|
|
features = []
|
|
|
|
for level in range(self.max_levels):
|
|
level_key = f'level_{level}'
|
|
if level_key in structure_levels:
|
|
level_data = structure_levels[level_key]
|
|
level_features = self._extract_level_features(level_data)
|
|
else:
|
|
level_features = [0.0] * 50 # 50 features per level
|
|
|
|
features.extend(level_features)
|
|
|
|
return features[:250] # Ensure exactly 250 features
|
|
|
|
def _extract_level_features(self, level: MarketStructureLevel) -> List[float]:
|
|
"""Extract features from a single structure level"""
|
|
features = []
|
|
|
|
# Trend features (10 features)
|
|
features.extend([
|
|
1.0 if level.trend_analysis.direction == TrendDirection.UP else 0.0,
|
|
1.0 if level.trend_analysis.direction == TrendDirection.DOWN else 0.0,
|
|
1.0 if level.trend_analysis.direction == TrendDirection.SIDEWAYS else 0.0,
|
|
level.trend_analysis.strength,
|
|
level.trend_analysis.confidence,
|
|
level.trend_analysis.higher_highs,
|
|
level.trend_analysis.higher_lows,
|
|
level.trend_analysis.lower_highs,
|
|
level.trend_analysis.lower_lows,
|
|
len(level.swing_points)
|
|
])
|
|
|
|
# Current bias features (4 features)
|
|
features.extend([
|
|
1.0 if level.current_bias == TrendDirection.UP else 0.0,
|
|
1.0 if level.current_bias == TrendDirection.DOWN else 0.0,
|
|
1.0 if level.current_bias == TrendDirection.SIDEWAYS else 0.0,
|
|
1.0 if level.current_bias == TrendDirection.UNKNOWN else 0.0
|
|
])
|
|
|
|
# Swing point features (20 features - last 10 swings * 2 features each)
|
|
recent_swings = level.swing_points[-10:] if len(level.swing_points) >= 10 else level.swing_points
|
|
for swing in recent_swings:
|
|
features.extend([
|
|
swing.price,
|
|
1.0 if swing.swing_type == SwingType.SWING_HIGH else 0.0
|
|
])
|
|
|
|
# Pad if fewer than 10 swings
|
|
while len(recent_swings) < 10:
|
|
features.extend([0.0, 0.0])
|
|
recent_swings.append(None) # Just for counting
|
|
|
|
# Support/resistance levels (10 features - 5 support + 5 resistance)
|
|
support_levels = level.support_levels[:5] if len(level.support_levels) >= 5 else level.support_levels
|
|
while len(support_levels) < 5:
|
|
support_levels.append(0.0)
|
|
features.extend(support_levels)
|
|
|
|
resistance_levels = level.resistance_levels[:5] if len(level.resistance_levels) >= 5 else level.resistance_levels
|
|
while len(resistance_levels) < 5:
|
|
resistance_levels.append(0.0)
|
|
features.extend(resistance_levels)
|
|
|
|
# Structure break features (6 features)
|
|
recent_breaks = level.structure_breaks[-3:] if len(level.structure_breaks) >= 3 else level.structure_breaks
|
|
for break_info in recent_breaks:
|
|
features.extend([
|
|
break_info.get('significance', 0.0),
|
|
1.0 if break_info.get('type', '').endswith('_high') else 0.0
|
|
])
|
|
|
|
# Pad if fewer than 3 breaks
|
|
while len(recent_breaks) < 3:
|
|
features.extend([0.0, 0.0])
|
|
recent_breaks.append({})
|
|
|
|
return features[:50] # Ensure exactly 50 features per level
|
|
|
|
def _handle_cnn_at_pivot(self,
|
|
newly_identified_pivot: SwingPoint,
|
|
ohlcv_data_context: np.ndarray):
|
|
"""
|
|
Handles CNN training for the previous pivot and prediction for the next pivot.
|
|
Called when a new pivot point is identified.
|
|
|
|
Args:
|
|
newly_identified_pivot: The SwingPoint object for the just-formed pivot.
|
|
ohlcv_data_context: The OHLCV data (or pivot array for higher levels)
|
|
relevant to this pivot's formation.
|
|
"""
|
|
if not self.enable_cnn_feature or self.cnn_model is None:
|
|
return
|
|
|
|
# 1. Train model based on the *previous* pivot's prediction and the *current* actual outcome
|
|
if self.previous_pivot_details_for_cnn:
|
|
try:
|
|
logger.debug(f"CNN Training: Previous pivot at idx {self.previous_pivot_details_for_cnn['pivot'].index}, "
|
|
f"Current pivot (ground truth) at idx {newly_identified_pivot.index}")
|
|
|
|
X_train = self.previous_pivot_details_for_cnn['features']
|
|
# previous_pivot_info contains 'pivot' which is the SwingPoint object of N-1
|
|
y_train = self._get_cnn_ground_truth(self.previous_pivot_details_for_cnn, newly_identified_pivot)
|
|
|
|
if X_train is not None and X_train.size > 0 and y_train is not None and y_train.size > 0:
|
|
# Reshape X_train if it's a single sample and model expects batch
|
|
if len(X_train.shape) == len(self.cnn_model.input_shape) and X_train.shape == self.cnn_model.input_shape :
|
|
X_train_batch = np.expand_dims(X_train, axis=0)
|
|
else: # Should already be correctly shaped by _prepare_cnn_input
|
|
X_train_batch = X_train # Or handle error
|
|
|
|
# Reshape y_train if needed
|
|
if self.cnn_model.output_size > 1 and len(y_train.shape) ==1: # e.g. [0.,1.] but model needs [[0.,1.]]
|
|
y_train_batch = np.expand_dims(y_train, axis=0)
|
|
elif self.cnn_model.output_size == 1 and not isinstance(y_train, (list, np.ndarray)): # e.g. plain 0 or 1
|
|
y_train_batch = np.array([[y_train]], dtype=np.float32)
|
|
elif self.cnn_model.output_size == 1 and isinstance(y_train, np.ndarray) and y_train.ndim == 1:
|
|
y_train_batch = y_train.reshape(-1,1) # ensure [[0.]] for single binary output
|
|
else:
|
|
y_train_batch = y_train
|
|
|
|
|
|
logger.info(f"CNN Training with X_shape: {X_train_batch.shape}, y_shape: {y_train_batch.shape}")
|
|
# Perform a single step of training (online learning)
|
|
# Use minimal callbacks for online learning, or allow configuration
|
|
self.cnn_model.model.fit(X_train_batch, y_train_batch, batch_size=1, epochs=1, verbose=0, callbacks=[])
|
|
logger.info(f"CNN online training step completed for pivot at index {self.previous_pivot_details_for_cnn['pivot'].index}.")
|
|
else:
|
|
logger.warning("CNN Training: Skipping due to invalid X_train or y_train.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during CNN online training: {e}", exc_info=True)
|
|
|
|
# 2. Predict for the *next* pivot based on the *current* newly_identified_pivot
|
|
try:
|
|
logger.debug(f"CNN Prediction: Preparing input for current pivot at idx {newly_identified_pivot.index}")
|
|
|
|
# The 'previous_pivot_details' for _prepare_cnn_input here is the one active *before* this current call
|
|
# which means it refers to the pivot that just got its ground truth trained on.
|
|
# If this is the first pivot ever, self.previous_pivot_details_for_cnn would be None.
|
|
|
|
# Correct context for _prepare_cnn_input:
|
|
# current_pivot = newly_identified_pivot
|
|
# previous_pivot_details = self.previous_pivot_details_for_cnn (this is N-1, which was used for training above)
|
|
|
|
X_predict = self._prepare_cnn_input(newly_identified_pivot,
|
|
ohlcv_data_context,
|
|
self.previous_pivot_details_for_cnn) # Pass the N-1 pivot details
|
|
|
|
if X_predict is not None and X_predict.size > 0:
|
|
# Reshape X_predict if it's a single sample and model expects batch
|
|
if len(X_predict.shape) == len(self.cnn_model.input_shape) and X_predict.shape == self.cnn_model.input_shape :
|
|
X_predict_batch = np.expand_dims(X_predict, axis=0)
|
|
else:
|
|
X_predict_batch = X_predict # Or handle error
|
|
|
|
# logger.info(f"CNN Predicting with X_shape: {X_predict_batch.shape}")
|
|
pred_class, pred_proba = self.cnn_model.predict(X_predict_batch) # predict expects batch
|
|
|
|
# pred_class/pred_proba might be arrays if batch_size > 1, or if output is multi-dim
|
|
# For batch_size=1, take the first element
|
|
final_pred_class = pred_class[0] if isinstance(pred_class, np.ndarray) and pred_class.ndim > 0 else pred_class
|
|
final_pred_proba = pred_proba[0] if isinstance(pred_proba, np.ndarray) and pred_proba.ndim > 0 else pred_proba
|
|
|
|
logger.info(f"CNN Prediction for pivot after index {newly_identified_pivot.index}: Class={final_pred_class}, Proba/Val={final_pred_proba}")
|
|
|
|
# Store the features (X_predict) and the pivot (newly_identified_pivot) itself for the next training cycle
|
|
self.previous_pivot_details_for_cnn = {'features': X_predict, 'pivot': newly_identified_pivot}
|
|
else:
|
|
logger.warning("CNN Prediction: Skipping due to invalid X_predict.")
|
|
# If prediction can't be made, ensure we don't carry over stale 'previous_pivot_details_for_cnn'
|
|
# Or, decide if we should clear it or keep the N-2 details.
|
|
# For now, if X_predict is None, we clear it so no training happens next round unless a new pred is made.
|
|
self.previous_pivot_details_for_cnn = None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during CNN prediction: {e}", exc_info=True)
|
|
self.previous_pivot_details_for_cnn = None # Clear on error to prevent bad training
|
|
|
|
def _prepare_cnn_input(self,
|
|
current_pivot: SwingPoint,
|
|
ohlcv_data_context: np.ndarray,
|
|
previous_pivot_details: Optional[Dict[str, Any]]) -> np.ndarray:
|
|
"""
|
|
Prepare multi-timeframe, multi-symbol input features for CNN using TrainingDataPacket.
|
|
|
|
Features include:
|
|
- ETH: 5 min ticks → 300 x 1s bars with ticks features (4 features)
|
|
- ETH: 900 x 1s OHLCV + indicators (10 features)
|
|
- ETH: 900 x 1m OHLCV + indicators (10 features)
|
|
- ETH: 900 x 1h OHLCV + indicators (10 features)
|
|
- ETH: All pivot points from all levels (15 features)
|
|
- BTC: 5 min ticks → 300 x 1s reference (4 features)
|
|
- Chart labels for data identification (7 features)
|
|
|
|
Total: ~50 features per timestep over 900 timesteps
|
|
Data normalized using 1h min/max to preserve cross-timeframe relationships.
|
|
|
|
Args:
|
|
current_pivot: The newly identified SwingPoint
|
|
ohlcv_data_context: The OHLCV data from Williams calculation (may not be used directly)
|
|
previous_pivot_details: Previous pivot info for context
|
|
|
|
Returns:
|
|
A numpy array of shape (900, 50) with normalized features
|
|
"""
|
|
if self.cnn_model is None or not self.training_data_provider:
|
|
logger.warning("CNN model or training data provider not available")
|
|
return np.zeros(self.cnn_model.input_shape if self.cnn_model else (900, 50), dtype=np.float32)
|
|
|
|
sequence_length, num_features = self.cnn_model.input_shape
|
|
|
|
try:
|
|
# Get latest TrainingDataPacket from provider
|
|
training_packet = self._get_latest_training_data()
|
|
if not training_packet:
|
|
logger.warning("No TrainingDataPacket available for CNN input")
|
|
return np.zeros((sequence_length, num_features), dtype=np.float32)
|
|
|
|
logger.debug(f"CNN Input: Preparing features for pivot at {current_pivot.timestamp}")
|
|
|
|
# Prepare feature components (in actual values)
|
|
eth_features = self._prepare_eth_features(training_packet, sequence_length)
|
|
btc_features = self._prepare_btc_reference_features(training_packet, sequence_length)
|
|
pivot_features = self._prepare_pivot_features(training_packet, current_pivot, sequence_length)
|
|
chart_labels = self._prepare_chart_labels(sequence_length)
|
|
|
|
# Combine all features (still in actual values)
|
|
combined_features = np.concatenate([
|
|
eth_features, # ~40 features
|
|
btc_features, # ~4 features
|
|
pivot_features, # ~3 features
|
|
chart_labels # ~3 features
|
|
], axis=1)
|
|
|
|
# Ensure we match expected feature count
|
|
if combined_features.shape[1] > num_features:
|
|
combined_features = combined_features[:, :num_features]
|
|
elif combined_features.shape[1] < num_features:
|
|
padding = np.zeros((sequence_length, num_features - combined_features.shape[1]))
|
|
combined_features = np.concatenate([combined_features, padding], axis=1)
|
|
|
|
# NORMALIZATION: Apply 1h timeframe min/max to preserve relationships
|
|
normalized_features = self._normalize_features_by_1h_range(combined_features, training_packet)
|
|
|
|
logger.debug(f"CNN Input prepared: shape {normalized_features.shape}, "
|
|
f"min: {normalized_features.min():.4f}, max: {normalized_features.max():.4f}")
|
|
|
|
return normalized_features.astype(np.float32)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preparing CNN input: {e}", exc_info=True)
|
|
return np.zeros((sequence_length, num_features), dtype=np.float32)
|
|
|
|
def _get_latest_training_data(self):
|
|
"""Get latest TrainingDataPacket from provider"""
|
|
try:
|
|
if hasattr(self.training_data_provider, 'get_latest_training_data'):
|
|
return self.training_data_provider.get_latest_training_data()
|
|
elif hasattr(self.training_data_provider, 'training_data_buffer'):
|
|
return self.training_data_provider.training_data_buffer[-1] if self.training_data_provider.training_data_buffer else None
|
|
else:
|
|
logger.warning("Training data provider does not have expected interface")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting training data: {e}")
|
|
return None
|
|
|
|
def _prepare_eth_features(self, training_packet, sequence_length: int) -> np.ndarray:
|
|
"""
|
|
Prepare ETH multi-timeframe features (keep in actual values):
|
|
- 1s bars with indicators (10 features)
|
|
- 1m bars with indicators (10 features)
|
|
- 1h bars with indicators (10 features)
|
|
- Tick-derived 1s features (10 features)
|
|
Total: 40 features per timestep
|
|
"""
|
|
features = []
|
|
|
|
# ETH 1s data with indicators
|
|
eth_1s_features = self._extract_timeframe_features(
|
|
training_packet.multi_timeframe_data.get('ETH/USDT', {}).get('1s', []),
|
|
sequence_length, 'ETH_1s'
|
|
)
|
|
features.append(eth_1s_features)
|
|
|
|
# ETH 1m data with indicators
|
|
eth_1m_features = self._extract_timeframe_features(
|
|
training_packet.multi_timeframe_data.get('ETH/USDT', {}).get('1m', []),
|
|
sequence_length, 'ETH_1m'
|
|
)
|
|
features.append(eth_1m_features)
|
|
|
|
# ETH 1h data with indicators
|
|
eth_1h_features = self._extract_timeframe_features(
|
|
training_packet.multi_timeframe_data.get('ETH/USDT', {}).get('1h', []),
|
|
sequence_length, 'ETH_1h'
|
|
)
|
|
features.append(eth_1h_features)
|
|
|
|
# ETH tick-derived features (5 min of ticks → 300 x 1s aggregated to match sequence_length)
|
|
eth_tick_features = self._extract_tick_features(
|
|
training_packet.tick_cache, 'ETH/USDT', sequence_length
|
|
)
|
|
features.append(eth_tick_features)
|
|
|
|
return np.concatenate(features, axis=1)
|
|
|
|
def _prepare_btc_reference_features(self, training_packet, sequence_length: int) -> np.ndarray:
|
|
"""
|
|
Prepare BTC reference features (keep in actual values):
|
|
- Tick-derived features for correlation analysis
|
|
Total: 4 features per timestep
|
|
"""
|
|
return self._extract_tick_features(
|
|
training_packet.tick_cache, 'BTC/USDT', sequence_length
|
|
)
|
|
|
|
def _prepare_pivot_features(self, training_packet, current_pivot: SwingPoint, sequence_length: int) -> np.ndarray:
|
|
"""
|
|
Prepare pivot point features from all Williams levels:
|
|
- Recent pivot characteristics
|
|
- Level-specific trend information
|
|
Total: 3 features per timestep (repeated for sequence)
|
|
"""
|
|
# Extract Williams pivot features using existing method if available
|
|
if hasattr(training_packet, 'universal_stream') and training_packet.universal_stream:
|
|
# Use existing pivot extraction logic
|
|
pivot_feature_vector = [
|
|
current_pivot.price,
|
|
1.0 if current_pivot.swing_type == SwingType.SWING_HIGH else 0.0,
|
|
float(current_pivot.strength)
|
|
]
|
|
else:
|
|
pivot_feature_vector = [0.0, 0.0, 0.0]
|
|
|
|
# Repeat pivot features for all timesteps in sequence
|
|
return np.tile(pivot_feature_vector, (sequence_length, 1))
|
|
|
|
def _prepare_chart_labels(self, sequence_length: int) -> np.ndarray:
|
|
"""
|
|
Prepare chart identification labels:
|
|
- Symbol identifiers
|
|
- Timeframe identifiers
|
|
Total: 3 features per timestep
|
|
"""
|
|
# Simple encoding: [is_eth, is_btc, timeframe_mix]
|
|
chart_labels = [1.0, 1.0, 1.0] # Mixed multi-timeframe ETH+BTC data
|
|
return np.tile(chart_labels, (sequence_length, 1))
|
|
|
|
def _extract_timeframe_features(self, ohlcv_data: List[Dict], sequence_length: int, timeframe_label: str) -> np.ndarray:
|
|
"""
|
|
Extract OHLCV + indicator features from timeframe data (keep actual values).
|
|
Returns 10 features: OHLCV + volume + 5 indicators
|
|
"""
|
|
if not ohlcv_data:
|
|
return np.zeros((sequence_length, 10))
|
|
|
|
# Take last sequence_length bars or pad if insufficient
|
|
data_to_use = ohlcv_data[-sequence_length:] if len(ohlcv_data) >= sequence_length else ohlcv_data
|
|
|
|
features = []
|
|
for bar in data_to_use:
|
|
bar_features = [
|
|
bar.get('open', 0.0),
|
|
bar.get('high', 0.0),
|
|
bar.get('low', 0.0),
|
|
bar.get('close', 0.0),
|
|
bar.get('volume', 0.0),
|
|
# TODO: Add 5 calculated indicators (SMA, EMA, RSI, MACD, etc.)
|
|
bar.get('sma_20', bar.get('close', 0.0)), # Placeholder
|
|
bar.get('ema_20', bar.get('close', 0.0)), # Placeholder
|
|
bar.get('rsi_14', 50.0), # Placeholder
|
|
bar.get('macd', 0.0), # Placeholder
|
|
bar.get('bb_upper', bar.get('high', 0.0)) # Placeholder
|
|
]
|
|
features.append(bar_features)
|
|
|
|
# Pad if insufficient data
|
|
while len(features) < sequence_length:
|
|
features.insert(0, features[0] if features else [0.0] * 10)
|
|
|
|
return np.array(features, dtype=np.float32)
|
|
|
|
def _extract_tick_features(self, tick_cache: List[Dict], symbol: str, sequence_length: int) -> np.ndarray:
|
|
"""
|
|
Extract tick-derived features aggregated to 1s intervals (keep actual values).
|
|
Returns 4 features: tick_count, total_volume, vwap, price_volatility per second
|
|
"""
|
|
# Filter ticks for symbol and last 5 minutes
|
|
symbol_ticks = [t for t in tick_cache[-1500:] if t.get('symbol') == symbol] # Assume ~5 ticks/sec
|
|
|
|
if not symbol_ticks:
|
|
return np.zeros((sequence_length, 4))
|
|
|
|
# Group ticks by second and calculate features
|
|
tick_features = []
|
|
current_time = datetime.now()
|
|
|
|
for i in range(sequence_length):
|
|
second_start = current_time - timedelta(seconds=sequence_length - i)
|
|
second_end = second_start + timedelta(seconds=1)
|
|
|
|
second_ticks = [
|
|
t for t in symbol_ticks
|
|
if second_start <= t.get('timestamp', datetime.min) < second_end
|
|
]
|
|
|
|
if second_ticks:
|
|
prices = [t.get('price', 0.0) for t in second_ticks]
|
|
volumes = [t.get('volume', 0.0) for t in second_ticks]
|
|
total_volume = sum(volumes)
|
|
|
|
tick_count = len(second_ticks)
|
|
vwap = sum(p * v for p, v in zip(prices, volumes)) / total_volume if total_volume > 0 else 0.0
|
|
price_volatility = np.std(prices) if len(prices) > 1 else 0.0
|
|
|
|
second_features = [tick_count, total_volume, vwap, price_volatility]
|
|
else:
|
|
second_features = [0.0, 0.0, 0.0, 0.0]
|
|
|
|
tick_features.append(second_features)
|
|
|
|
return np.array(tick_features, dtype=np.float32)
|
|
|
|
def _normalize_features_by_1h_range(self, features: np.ndarray, training_packet) -> np.ndarray:
|
|
"""
|
|
Normalize all features using 1h timeframe min/max to preserve cross-timeframe relationships.
|
|
This is the final normalization step before feeding to CNN.
|
|
"""
|
|
try:
|
|
# Get 1h ETH data for normalization reference
|
|
eth_1h_data = training_packet.multi_timeframe_data.get('ETH/USDT', {}).get('1h', [])
|
|
|
|
if not eth_1h_data:
|
|
logger.warning("No 1h data available for normalization, using feature-wise normalization")
|
|
# Fallback: normalize each feature independently
|
|
feature_min = np.min(features, axis=0, keepdims=True)
|
|
feature_max = np.max(features, axis=0, keepdims=True)
|
|
feature_range = feature_max - feature_min
|
|
feature_range[feature_range == 0] = 1.0 # Avoid division by zero
|
|
return (features - feature_min) / feature_range
|
|
|
|
# Extract 1h price range for primary normalization
|
|
h1_prices = []
|
|
for bar in eth_1h_data[-24:]: # Last 24 hours for robust range
|
|
h1_prices.extend([
|
|
bar.get('open', 0.0),
|
|
bar.get('high', 0.0),
|
|
bar.get('low', 0.0),
|
|
bar.get('close', 0.0)
|
|
])
|
|
|
|
if h1_prices:
|
|
h1_min = min(h1_prices)
|
|
h1_max = max(h1_prices)
|
|
h1_range = h1_max - h1_min
|
|
|
|
if h1_range > 0:
|
|
logger.debug(f"Normalizing features using 1h range: {h1_min:.2f} - {h1_max:.2f}")
|
|
|
|
# Apply 1h-based normalization to price-related features (first ~30 features)
|
|
normalized_features = features.copy()
|
|
price_feature_count = min(30, features.shape[1])
|
|
|
|
# Normalize price-related features with 1h range
|
|
normalized_features[:, :price_feature_count] = (
|
|
(features[:, :price_feature_count] - h1_min) / h1_range
|
|
)
|
|
|
|
# For non-price features (indicators, counts, etc.), use feature-wise normalization
|
|
if features.shape[1] > price_feature_count:
|
|
remaining_features = features[:, price_feature_count:]
|
|
feature_min = np.min(remaining_features, axis=0, keepdims=True)
|
|
feature_max = np.max(remaining_features, axis=0, keepdims=True)
|
|
feature_range = feature_max - feature_min
|
|
feature_range[feature_range == 0] = 1.0
|
|
|
|
normalized_features[:, price_feature_count:] = (
|
|
(remaining_features - feature_min) / feature_range
|
|
)
|
|
|
|
return normalized_features
|
|
|
|
# Fallback normalization if 1h range calculation fails
|
|
logger.warning("1h range calculation failed, using min-max normalization")
|
|
feature_min = np.min(features, axis=0, keepdims=True)
|
|
feature_max = np.max(features, axis=0, keepdims=True)
|
|
feature_range = feature_max - feature_min
|
|
feature_range[feature_range == 0] = 1.0
|
|
return (features - feature_min) / feature_range
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in normalization: {e}", exc_info=True)
|
|
# Emergency fallback: return features as-is but scaled to [0,1] roughly
|
|
return np.clip(features / (np.max(np.abs(features)) + 1e-8), -1.0, 1.0)
|
|
|
|
|
|
def _get_cnn_ground_truth(self,
|
|
previous_pivot_info: Dict[str, Any], # Contains 'pivot': SwingPoint obj of N-1
|
|
actual_current_pivot: SwingPoint # This is pivot N
|
|
) -> np.ndarray:
|
|
"""
|
|
Determine the ground truth for CNN prediction made at previous_pivot.
|
|
|
|
Updated to return prediction for next pivot in ALL 5 LEVELS:
|
|
- For each level: [type (0=LOW, 1=HIGH), normalized_price_target]
|
|
- Total output: 10 values (5 levels * 2 outputs each)
|
|
|
|
Args:
|
|
previous_pivot_info: Dict with 'pivot' = SwingPoint of N-1
|
|
actual_current_pivot: SwingPoint of pivot N (actual outcome)
|
|
|
|
Returns:
|
|
A numpy array of shape (10,) with ground truth for all levels
|
|
"""
|
|
if self.cnn_model is None:
|
|
return np.array([])
|
|
|
|
# Initialize ground truth array for all 5 levels
|
|
ground_truth = np.zeros(10, dtype=np.float32) # 5 levels * 2 outputs
|
|
|
|
try:
|
|
# For Level 0 (current pivot level), we have actual data
|
|
level_0_type = 1.0 if actual_current_pivot.swing_type == SwingType.SWING_HIGH else 0.0
|
|
level_0_price = actual_current_pivot.price
|
|
|
|
# Normalize price (this is a placeholder - proper normalization should use market context)
|
|
# In real implementation, use the same 1h range normalization as input features
|
|
normalized_price = level_0_price / 10000.0 # Rough normalization for ETH prices
|
|
|
|
ground_truth[0] = level_0_type # Level 0 type
|
|
ground_truth[1] = normalized_price # Level 0 price
|
|
|
|
# For higher levels (1-4), we would need to calculate what the next pivot would be
|
|
# This requires access to higher-level Williams calculations
|
|
# For now, use placeholder logic based on current pivot characteristics
|
|
|
|
for level in range(1, 5):
|
|
# Placeholder: higher levels follow similar pattern but with reduced confidence
|
|
confidence_factor = 1.0 / (level + 1)
|
|
|
|
ground_truth[level * 2] = level_0_type * confidence_factor # Level N type
|
|
ground_truth[level * 2 + 1] = normalized_price * confidence_factor # Level N price
|
|
|
|
logger.debug(f"CNN Ground Truth: Level 0 = [{level_0_type}, {normalized_price:.4f}], "
|
|
f"Current pivot = {actual_current_pivot.swing_type.name} @ {actual_current_pivot.price}")
|
|
|
|
return ground_truth
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating CNN ground truth: {e}", exc_info=True)
|
|
return np.zeros(10, dtype=np.float32) |