try to fix input dimentions

This commit is contained in:
Dobromir Popov
2025-07-13 23:41:47 +03:00
parent bcc13a5db3
commit ebf65494a8
7 changed files with 358 additions and 145 deletions

View File

@ -56,6 +56,7 @@ class EnhancedRealtimeTrainingSystem:
self.performance_history = {
'dqn_losses': deque(maxlen=1000),
'cnn_losses': deque(maxlen=1000),
'cob_rl_losses': deque(maxlen=1000), # Added COB RL loss tracking
'prediction_accuracy': deque(maxlen=500),
'trading_performance': deque(maxlen=200),
'validation_scores': deque(maxlen=100)
@ -553,18 +554,33 @@ class EnhancedRealtimeTrainingSystem:
# Statistical features across time for each aggregated dimension
for feature_idx in range(agg_matrix.shape[1]):
feature_series = agg_matrix[:, feature_idx]
combined_features.extend([
np.mean(feature_series),
np.std(feature_series),
np.min(feature_series),
np.max(feature_series),
feature_series[-1] - feature_series[0] if len(feature_series) > 1 else 0, # Total change
np.mean(np.diff(feature_series)) if len(feature_series) > 1 else 0, # Average momentum
np.std(np.diff(feature_series)) if len(feature_series) > 2 else 0, # Momentum volatility
np.percentile(feature_series, 25), # 25th percentile
np.percentile(feature_series, 75), # 75th percentile
len([x for x in np.diff(feature_series) if x > 0]) / max(len(feature_series) - 1, 1) if len(feature_series) > 1 else 0.5 # Positive change ratio
])
# Clean feature series to prevent division warnings
feature_series_clean = feature_series[np.isfinite(feature_series)]
if len(feature_series_clean) > 0:
# Safe percentile calculation
try:
percentile_25 = np.percentile(feature_series_clean, 25)
percentile_75 = np.percentile(feature_series_clean, 75)
except (ValueError, RuntimeWarning):
percentile_25 = np.median(feature_series_clean) if len(feature_series_clean) > 0 else 0
percentile_75 = np.median(feature_series_clean) if len(feature_series_clean) > 0 else 0
combined_features.extend([
np.mean(feature_series_clean),
np.std(feature_series_clean),
np.min(feature_series_clean),
np.max(feature_series_clean),
feature_series_clean[-1] - feature_series_clean[0] if len(feature_series_clean) > 1 else 0, # Total change
np.mean(np.diff(feature_series_clean)) if len(feature_series_clean) > 1 else 0, # Average momentum
np.std(np.diff(feature_series_clean)) if len(feature_series_clean) > 2 else 0, # Momentum volatility
percentile_25, # 25th percentile
percentile_75, # 75th percentile
len([x for x in np.diff(feature_series_clean) if x > 0]) / max(len(feature_series_clean) - 1, 1) if len(feature_series_clean) > 1 else 0.5 # Positive change ratio
])
else:
# All values are NaN or inf, use zeros
combined_features.extend([0.0] * 10)
else:
combined_features.extend([0.0] * (15 * 10)) # 15 features * 10 statistics
@ -702,13 +718,14 @@ class EnhancedRealtimeTrainingSystem:
lows = np.array([bar['low'] for bar in self.real_time_data['ohlcv_1m']])
# Update indicators
price_mean = np.mean(prices[-20:])
self.technical_indicators = {
'sma_10': np.mean(prices[-10:]),
'sma_20': np.mean(prices[-20:]),
'rsi': self._calculate_rsi(prices, 14),
'volatility': np.std(prices[-20:]) / np.mean(prices[-20:]),
'volatility': np.std(prices[-20:]) / price_mean if price_mean > 0 else 0,
'volume_sma': np.mean(volumes[-10:]),
'price_momentum': (prices[-1] - prices[-5]) / prices[-5] if len(prices) >= 5 else 0,
'price_momentum': (prices[-1] - prices[-5]) / prices[-5] if len(prices) >= 5 and prices[-5] > 0 else 0,
'atr': np.mean(highs[-14:] - lows[-14:]) if len(prices) >= 14 else 0
}
@ -724,8 +741,8 @@ class EnhancedRealtimeTrainingSystem:
current_time = time.time()
current_bar = self.real_time_data['ohlcv_1m'][-1]
# Create comprehensive state features
state_features = self._build_comprehensive_state()
# Create comprehensive state features with default dimensions
state_features = self._build_comprehensive_state(100) # Use default 100 for general experiences
# Create experience with proper reward calculation
experience = {
@ -748,8 +765,8 @@ class EnhancedRealtimeTrainingSystem:
except Exception as e:
logger.debug(f"Error creating training experiences: {e}")
def _build_comprehensive_state(self) -> np.ndarray:
"""Build comprehensive state vector for RL training"""
def _build_comprehensive_state(self, target_dimensions: int = 100) -> np.ndarray:
"""Build comprehensive state vector for RL training with adaptive dimensions"""
try:
state_features = []
@ -792,15 +809,138 @@ class EnhancedRealtimeTrainingSystem:
state_features.append(np.cos(2 * np.pi * now.hour / 24))
state_features.append(now.weekday() / 6.0) # Day of week
# Pad to fixed size (100 features)
while len(state_features) < 100:
# Current count: 10 (prices) + 7 (indicators) + 1 (volume) + 5 (COB) + 3 (time) = 26 base features
# 6. Enhanced features for larger dimensions
if target_dimensions > 50:
# Add more price history
if len(self.real_time_data['ohlcv_1m']) >= 20:
extended_prices = [bar['close'] for bar in list(self.real_time_data['ohlcv_1m'])[-20:]]
base_price = extended_prices[0]
extended_normalized = [(p - base_price) / base_price for p in extended_prices[10:]] # Additional 10
state_features.extend(extended_normalized)
else:
state_features.extend([0.0] * 10)
# Add volume history
if len(self.real_time_data['ohlcv_1m']) >= 10:
volume_history = [bar['volume'] for bar in list(self.real_time_data['ohlcv_1m'])[-10:]]
avg_vol = np.mean(volume_history) if volume_history else 1.0
# Prevent division by zero
if avg_vol == 0:
avg_vol = 1.0
normalized_volumes = [v / avg_vol for v in volume_history]
state_features.extend(normalized_volumes)
else:
state_features.extend([0.0] * 10)
# Add extended COB features
extended_cob = self._extract_cob_features()
state_features.extend(extended_cob[5:]) # Remaining COB features
# Add 5m timeframe data if available
if len(self.real_time_data['ohlcv_5m']) >= 5:
tf_5m_prices = [bar['close'] for bar in list(self.real_time_data['ohlcv_5m'])[-5:]]
if tf_5m_prices:
base_5m = tf_5m_prices[0]
# Prevent division by zero
if base_5m == 0:
base_5m = 1.0
normalized_5m = [(p - base_5m) / base_5m for p in tf_5m_prices]
state_features.extend(normalized_5m)
else:
state_features.extend([0.0] * 5)
else:
state_features.extend([0.0] * 5)
# 7. Adaptive padding/truncation based on target dimensions
current_length = len(state_features)
if target_dimensions > current_length:
# Pad with additional engineered features
remaining = target_dimensions - current_length
# Add statistical features if we have data
if len(self.real_time_data['ohlcv_1m']) >= 20:
all_prices = [bar['close'] for bar in list(self.real_time_data['ohlcv_1m'])[-20:]]
all_volumes = [bar['volume'] for bar in list(self.real_time_data['ohlcv_1m'])[-20:]]
# Statistical features
additional_features = [
np.std(all_prices) / np.mean(all_prices) if np.mean(all_prices) > 0 else 0, # Price CV
np.std(all_volumes) / np.mean(all_volumes) if np.mean(all_volumes) > 0 else 0, # Volume CV
(max(all_prices) - min(all_prices)) / np.mean(all_prices) if np.mean(all_prices) > 0 else 0, # Price range
# Safe correlation calculation
np.corrcoef(all_prices, all_volumes)[0, 1] if (len(all_prices) == len(all_volumes) and len(all_prices) > 1 and
np.std(all_prices) > 0 and np.std(all_volumes) > 0) else 0, # Price-volume correlation
]
# Add momentum features
for window in [3, 5, 10]:
if len(all_prices) >= window:
momentum = (all_prices[-1] - all_prices[-window]) / all_prices[-window] if all_prices[-window] > 0 else 0
additional_features.append(momentum)
else:
additional_features.append(0.0)
# Extend to fill remaining space
while len(additional_features) < remaining and len(additional_features) < 50:
additional_features.extend([
np.sin(len(additional_features) * 0.1), # Sine waves for variety
np.cos(len(additional_features) * 0.1),
np.tanh(len(additional_features) * 0.01)
])
state_features.extend(additional_features[:remaining])
else:
# Fill with structured zeros/patterns if no data
pattern_features = []
for i in range(remaining):
pattern_features.append(np.sin(i * 0.01)) # Small oscillating pattern
state_features.extend(pattern_features)
# Ensure exact target dimension
state_features = state_features[:target_dimensions]
while len(state_features) < target_dimensions:
state_features.append(0.0)
return np.array(state_features[:100])
return np.array(state_features)
except Exception as e:
logger.error(f"Error building state: {e}")
return np.zeros(100)
return np.zeros(target_dimensions)
def _get_model_expected_dimensions(self, model_type: str) -> int:
"""Get expected input dimensions for different model types"""
try:
if model_type == 'dqn':
# Try to get DQN expected dimensions from model
if (self.orchestrator and hasattr(self.orchestrator, 'rl_agent')
and self.orchestrator.rl_agent and hasattr(self.orchestrator.rl_agent, 'policy_net')):
# Get first layer input size
first_layer = list(self.orchestrator.rl_agent.policy_net.children())[0]
if hasattr(first_layer, 'in_features'):
return first_layer.in_features
return 403 # Default for DQN based on error logs
elif model_type == 'cnn':
# CNN might have different input expectations
if (self.orchestrator and hasattr(self.orchestrator, 'cnn_model')
and self.orchestrator.cnn_model):
# Try to get CNN input size
if hasattr(self.orchestrator.cnn_model, 'input_shape'):
return self.orchestrator.cnn_model.input_shape
return 300 # Default for CNN based on error logs
elif model_type == 'cob_rl':
return 2000 # COB RL expects 2000 features
else:
return 100 # Default
except Exception as e:
logger.debug(f"Error getting model dimensions for {model_type}: {e}")
return 100 # Fallback
def _extract_cob_features(self) -> List[float]:
"""Extract features from COB data"""
@ -964,6 +1104,18 @@ class EnhancedRealtimeTrainingSystem:
aggregated_matrix = self.get_cob_training_matrix(symbol, '1s_aggregated')
if combined_features is not None:
# Ensure features are exactly 2000 dimensions
if len(combined_features) != 2000:
logger.warning(f"COB features wrong size: {len(combined_features)}, padding/truncating to 2000")
if len(combined_features) < 2000:
# Pad with zeros
padded_features = np.zeros(2000, dtype=np.float32)
padded_features[:len(combined_features)] = combined_features
combined_features = padded_features
else:
# Truncate to 2000
combined_features = combined_features[:2000]
# Create enhanced COB training experience
current_price = self._get_current_price_from_data(symbol)
if current_price:
@ -973,7 +1125,7 @@ class EnhancedRealtimeTrainingSystem:
# Calculate reward based on COB prediction accuracy
reward = self._calculate_cob_reward(symbol, action, combined_features)
# Create comprehensive state vector for COB RL
# Create comprehensive state vector for COB RL (exactly 2000 dimensions)
state = combined_features # 2000-dimensional state
# Store experience in COB RL agent
@ -1268,16 +1420,29 @@ class EnhancedRealtimeTrainingSystem:
# Moving averages
if len(prev_prices) >= 5:
ma5 = sum(prev_prices[-5:]) / 5
tech_features.append((current_price - ma5) / ma5)
# Prevent division by zero
if ma5 != 0:
tech_features.append((current_price - ma5) / ma5)
else:
tech_features.append(0.0)
if len(prev_prices) >= 10:
ma10 = sum(prev_prices[-10:]) / 10
tech_features.append((current_price - ma10) / ma10)
# Prevent division by zero
if ma10 != 0:
tech_features.append((current_price - ma10) / ma10)
else:
tech_features.append(0.0)
# Volatility measure
if len(prev_prices) >= 5:
volatility = np.std(prev_prices[-5:]) / np.mean(prev_prices[-5:])
tech_features.append(volatility)
price_mean = np.mean(prev_prices[-5:])
# Prevent division by zero
if price_mean != 0:
volatility = np.std(prev_prices[-5:]) / price_mean
tech_features.append(volatility)
else:
tech_features.append(0.0)
# Pad technical features to 200
while len(tech_features) < 200:
@ -1978,8 +2143,9 @@ class EnhancedRealtimeTrainingSystem:
def _generate_forward_dqn_prediction(self, symbol: str, current_time: float):
"""Generate a DQN prediction for future price movement"""
try:
# Get current market state (only historical data)
current_state = self._build_comprehensive_state()
# Get current market state with DQN-specific dimensions
target_dims = self._get_model_expected_dimensions('dqn')
current_state = self._build_comprehensive_state(target_dims)
current_price = self._get_current_price_from_data(symbol)
# SKIP prediction if price is invalid
@ -2051,7 +2217,7 @@ class EnhancedRealtimeTrainingSystem:
self.last_prediction_time[symbol] = int(current_time)
logger.info(f"Forward DQN prediction: {symbol} action={['BUY','SELL','HOLD'][action]} confidence={confidence:.2f} price=${current_price:.2f} target={target_time.strftime('%H:%M:%S')}")
logger.info(f"Forward DQN prediction: {symbol} action={['BUY','SELL','HOLD'][action]} confidence={confidence:.2f} price=${current_price:.2f} target={target_time.strftime('%H:%M:%S')} dims={len(current_state)}")
except Exception as e:
logger.error(f"Error generating forward DQN prediction: {e}")