Shared Pattern Encoder

fix T training
This commit is contained in:
Dobromir Popov
2025-11-06 14:27:52 +02:00
parent 07d97100c0
commit 738c7cb854
5 changed files with 1276 additions and 180 deletions

View File

@@ -1086,18 +1086,92 @@ class RealTrainingAdapter:
state_size = agent.state_size if hasattr(agent, 'state_size') else 100
return [0.0] * state_size
def _extract_timeframe_data(self, tf_data: Dict, target_seq_len: int = 600) -> Optional[torch.Tensor]:
"""
Extract and normalize OHLCV data from a single timeframe
Args:
tf_data: Timeframe data dictionary with 'open', 'high', 'low', 'close', 'volume'
target_seq_len: Target sequence length (default 600)
Returns:
Tensor of shape [1, seq_len, 5] or None if no data
"""
import torch
import numpy as np
try:
# Extract OHLCV arrays
opens = np.array(tf_data.get('open', []), dtype=np.float32)
highs = np.array(tf_data.get('high', []), dtype=np.float32)
lows = np.array(tf_data.get('low', []), dtype=np.float32)
closes = np.array(tf_data.get('close', []), dtype=np.float32)
volumes = np.array(tf_data.get('volume', []), dtype=np.float32)
if len(closes) == 0:
return None
# Take last target_seq_len candles or pad if needed
if len(closes) >= target_seq_len:
# Truncate to target length
opens = opens[-target_seq_len:]
highs = highs[-target_seq_len:]
lows = lows[-target_seq_len:]
closes = closes[-target_seq_len:]
volumes = volumes[-target_seq_len:]
else:
# Pad with last candle
pad_len = target_seq_len - len(closes)
last_open = opens[-1] if len(opens) > 0 else 0.0
last_high = highs[-1] if len(highs) > 0 else 0.0
last_low = lows[-1] if len(lows) > 0 else 0.0
last_close = closes[-1] if len(closes) > 0 else 0.0
last_volume = volumes[-1] if len(volumes) > 0 else 0.0
opens = np.pad(opens, (0, pad_len), constant_values=last_open)
highs = np.pad(highs, (0, pad_len), constant_values=last_high)
lows = np.pad(lows, (0, pad_len), constant_values=last_low)
closes = np.pad(closes, (0, pad_len), constant_values=last_close)
volumes = np.pad(volumes, (0, pad_len), constant_values=last_volume)
# Stack OHLCV [seq_len, 5]
ohlcv = np.stack([opens, highs, lows, closes, volumes], axis=-1)
# Normalize prices to [0, 1] range
price_min = np.min(ohlcv[:, :4]) # Min of OHLC
price_max = np.max(ohlcv[:, :4]) # Max of OHLC
if price_max > price_min:
ohlcv[:, :4] = (ohlcv[:, :4] - price_min) / (price_max - price_min)
# Normalize volume to [0, 1] range
volume_min = np.min(ohlcv[:, 4])
volume_max = np.max(ohlcv[:, 4])
if volume_max > volume_min:
ohlcv[:, 4] = (ohlcv[:, 4] - volume_min) / (volume_max - volume_min)
# Convert to tensor and add batch dimension [1, seq_len, 5]
return torch.tensor(ohlcv, dtype=torch.float32).unsqueeze(0)
except Exception as e:
logger.error(f"Error extracting timeframe data: {e}")
return None
def _convert_annotation_to_transformer_batch(self, training_sample: Dict) -> Dict[str, 'torch.Tensor']:
"""
Convert annotation training sample to transformer model input format
Convert annotation training sample to multi-timeframe transformer input
The transformer expects:
- price_data: [batch, seq_len, features] - OHLCV sequences
- cob_data: [batch, seq_len, cob_features] - Change of Bid data
- tech_data: [batch, features] - Technical indicators
- market_data: [batch, features] - Market context
- actions: [batch] - Target actions (0=HOLD, 1=BUY, 2=SELL)
- future_prices: [batch] - Future price targets
- trade_success: [batch] - Whether trade was successful
The transformer now expects:
- price_data_1s, price_data_1m, price_data_1h, price_data_1d: [batch, 600, 5]
- btc_data_1m: [batch, 600, 5]
- cob_data: [batch, 600, 100]
- tech_data: [batch, 40]
- market_data: [batch, 30]
- position_state: [batch, 5]
- actions: [batch]
- future_prices: [batch]
- trade_success: [batch, 1]
"""
import torch
import numpy as np
@@ -1105,106 +1179,54 @@ class RealTrainingAdapter:
try:
market_state = training_sample.get('market_state', {})
# Extract OHLCV data from ALL timeframes
# Extract ALL timeframes
timeframes = market_state.get('timeframes', {})
secondary_timeframes = market_state.get('secondary_timeframes', {})
# Collect data from all available timeframes
all_price_data = []
timeframe_order = ['1s', '1m', '1h', '1d'] # Process in order
# Target sequence length for all timeframes
target_seq_len = 600
for tf in timeframe_order:
if tf not in timeframes:
continue
tf_data = timeframes[tf]
# Convert to numpy arrays
opens = np.array(tf_data.get('open', []), dtype=np.float32)
highs = np.array(tf_data.get('high', []), dtype=np.float32)
lows = np.array(tf_data.get('low', []), dtype=np.float32)
closes = np.array(tf_data.get('close', []), dtype=np.float32)
volumes = np.array(tf_data.get('volume', []), dtype=np.float32)
if len(closes) > 0:
# Stack OHLCV for this timeframe [seq_len, 5]
tf_price_data = np.stack([opens, highs, lows, closes, volumes], axis=-1)
all_price_data.append(tf_price_data)
# Extract each timeframe (returns None if not available)
price_data_1s = self._extract_timeframe_data(timeframes.get('1s', {}), target_seq_len) if '1s' in timeframes else None
price_data_1m = self._extract_timeframe_data(timeframes.get('1m', {}), target_seq_len) if '1m' in timeframes else None
price_data_1h = self._extract_timeframe_data(timeframes.get('1h', {}), target_seq_len) if '1h' in timeframes else None
price_data_1d = self._extract_timeframe_data(timeframes.get('1d', {}), target_seq_len) if '1d' in timeframes else None
if not all_price_data:
logger.warning("No price data in any timeframe")
# Extract BTC reference data
btc_data_1m = None
if 'BTC/USDT' in secondary_timeframes and '1m' in secondary_timeframes['BTC/USDT']:
btc_data_1m = self._extract_timeframe_data(secondary_timeframes['BTC/USDT']['1m'], target_seq_len)
# Ensure at least one timeframe is available
# Check if all are None (can't use any() with tensors)
if price_data_1s is None and price_data_1m is None and price_data_1h is None and price_data_1d is None:
logger.warning("No price data available in any timeframe")
return None
# Use only the primary timeframe (1m) for transformer training
# The transformer expects a fixed sequence length of 150
primary_tf = '1m' if '1m' in timeframes else timeframe_order[0]
# Get reference timeframe for other features (prefer 1m, fallback to any available)
ref_data = price_data_1m if price_data_1m is not None else (
price_data_1h if price_data_1h is not None else (
price_data_1d if price_data_1d is not None else price_data_1s
)
)
if primary_tf not in timeframes:
logger.warning(f"Primary timeframe {primary_tf} not available")
return None
# Get primary timeframe data
primary_data = timeframes[primary_tf]
closes = np.array(primary_data.get('close', []), dtype=np.float32)
# Get closes from reference timeframe for technical indicators
ref_tf = '1m' if '1m' in timeframes else ('1h' if '1h' in timeframes else ('1d' if '1d' in timeframes else '1s'))
closes = np.array(timeframes[ref_tf].get('close', []), dtype=np.float32)
if len(closes) == 0:
logger.warning("No data in primary timeframe")
logger.warning("No data in reference timeframe")
return None
# Use the last 150 candles (or pad/truncate to exactly 150)
target_seq_len = 150 # Transformer expects exactly 150 sequence length
if len(closes) >= target_seq_len:
# Take the last 150 candles
price_data = np.stack([
np.array(primary_data.get('open', [])[-target_seq_len:], dtype=np.float32),
np.array(primary_data.get('high', [])[-target_seq_len:], dtype=np.float32),
np.array(primary_data.get('low', [])[-target_seq_len:], dtype=np.float32),
np.array(primary_data.get('close', [])[-target_seq_len:], dtype=np.float32),
np.array(primary_data.get('volume', [])[-target_seq_len:], dtype=np.float32)
], axis=-1)
else:
# Pad with the last available candle
last_open = primary_data.get('open', [0])[-1] if primary_data.get('open') else 0
last_high = primary_data.get('high', [0])[-1] if primary_data.get('high') else 0
last_low = primary_data.get('low', [0])[-1] if primary_data.get('low') else 0
last_close = primary_data.get('close', [0])[-1] if primary_data.get('close') else 0
last_volume = primary_data.get('volume', [0])[-1] if primary_data.get('volume') else 0
# Pad arrays to target length
opens = np.array(primary_data.get('open', []), dtype=np.float32)
highs = np.array(primary_data.get('high', []), dtype=np.float32)
lows = np.array(primary_data.get('low', []), dtype=np.float32)
closes = np.array(primary_data.get('close', []), dtype=np.float32)
volumes = np.array(primary_data.get('volume', []), dtype=np.float32)
# Pad with last values
while len(opens) < target_seq_len:
opens = np.append(opens, last_open)
highs = np.append(highs, last_high)
lows = np.append(lows, last_low)
closes = np.append(closes, last_close)
volumes = np.append(volumes, last_volume)
price_data = np.stack([opens, highs, lows, closes, volumes], axis=-1)
# Add batch dimension [1, 150, 5]
price_data = torch.tensor(price_data, dtype=torch.float32).unsqueeze(0)
# Sequence length is now exactly 150
total_seq_len = 150
# Create placeholder COB data (zeros if not available)
# COB data shape: [1, 150, cob_features]
# MUST match the total sequence length from price_data (150)
# Transformer expects 100 COB features (as defined in TransformerConfig)
cob_data = torch.zeros(1, 150, 100, dtype=torch.float32) # Match price seq_len (150)
# COB data shape: [1, 600, 100] to match new sequence length
cob_data = torch.zeros(1, 600, 100, dtype=torch.float32)
# Create technical indicators (simple ones for now)
# tech_data shape: [1, features]
# Create technical indicators from reference timeframe
tech_features = []
# Use the closes data from the price_data we just created
closes_for_tech = price_data[0, :, 3].numpy() # Close prices from OHLCV data
# Use closes from reference timeframe
closes_for_tech = closes[-600:] if len(closes) >= 600 else closes
# Add simple technical indicators
if len(closes_for_tech) >= 20:
@@ -1236,17 +1258,17 @@ class RealTrainingAdapter:
# market_data shape: [1, features]
market_features = []
# Add volume profile
volumes_for_tech = price_data[0, :, 4].numpy() # Volume from OHLCV data
# Add volume profile from reference timeframe
volumes_for_tech = np.array(timeframes[ref_tf].get('volume', []), dtype=np.float32)
if len(volumes_for_tech) >= 20:
vol_ratio = volumes_for_tech[-1] / np.mean(volumes_for_tech[-20:])
market_features.append(vol_ratio)
else:
market_features.append(1.0)
# Add price range
highs_for_tech = price_data[0, :, 1].numpy() # High from OHLCV data
lows_for_tech = price_data[0, :, 2].numpy() # Low from OHLCV data
# Add price range from reference timeframe
highs_for_tech = np.array(timeframes[ref_tf].get('high', []), dtype=np.float32)
lows_for_tech = np.array(timeframes[ref_tf].get('low', []), dtype=np.float32)
if len(highs_for_tech) >= 20 and len(lows_for_tech) >= 20:
price_range = (np.max(highs_for_tech[-20:]) - np.min(lows_for_tech[-20:])) / closes_for_tech[-1]
market_features.append(price_range)
@@ -1386,16 +1408,28 @@ class RealTrainingAdapter:
profit_loss_pct = training_sample.get('profit_loss_pct', 0.0)
trade_success = torch.tensor([[1.0 if profit_loss_pct > 0 else 0.0]], dtype=torch.float32)
# Return batch dictionary with position state
# Return batch dictionary with ALL timeframes
batch = {
'price_data': price_data,
'cob_data': cob_data,
'tech_data': tech_data,
'market_data': market_data,
'actions': actions,
'future_prices': future_prices,
'trade_success': trade_success,
'position_state': position_state # NEW: Position tracking for loss minimization
# Multi-timeframe price data
'price_data_1s': price_data_1s, # [1, 600, 5] or None
'price_data_1m': price_data_1m, # [1, 600, 5] or None
'price_data_1h': price_data_1h, # [1, 600, 5] or None
'price_data_1d': price_data_1d, # [1, 600, 5] or None
'btc_data_1m': btc_data_1m, # [1, 600, 5] or None
# Other features
'cob_data': cob_data, # [1, 600, 100]
'tech_data': tech_data, # [1, 40]
'market_data': market_data, # [1, 30]
'position_state': position_state, # [1, 5]
# Training targets
'actions': actions, # [1]
'future_prices': future_prices, # [1]
'trade_success': trade_success, # [1, 1]
# Legacy support (use 1m as default)
'price_data': price_data_1m if price_data_1m is not None else ref_data
}
return batch
@@ -1461,7 +1495,11 @@ class RealTrainingAdapter:
combined: Dict[str, 'torch.Tensor'] = {}
keys = batch_list[0].keys()
for key in keys:
tensors = [b[key] for b in batch_list]
tensors = [b[key] for b in batch_list if b[key] is not None]
# Skip keys where all values are None
if not tensors:
combined[key] = None
continue
try:
combined[key] = torch.cat(tensors, dim=0)
except RuntimeError as concat_error:
@@ -1506,11 +1544,18 @@ class RealTrainingAdapter:
logger.info(f" Batch {i + 1}/{len(converted_batches)}, Loss: {batch_loss:.6f}, Accuracy: {batch_accuracy:.4f}")
else:
logger.warning(f" Batch {i + 1} returned None result - skipping")
# Clear CUDA cache periodically to prevent memory leak
if torch.cuda.is_available() and (i + 1) % 5 == 0:
torch.cuda.empty_cache()
except Exception as e:
logger.error(f" Error in batch {i + 1}: {e}")
import traceback
logger.error(traceback.format_exc())
# Clear CUDA cache after error
if torch.cuda.is_available():
torch.cuda.empty_cache()
continue
avg_loss = epoch_loss / num_batches if num_batches > 0 else 0.0
@@ -1518,6 +1563,10 @@ class RealTrainingAdapter:
session.current_epoch = epoch + 1
session.current_loss = avg_loss
# Clear CUDA cache after each epoch
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info(f" Epoch {epoch + 1}/{session.total_epochs}, Loss: {avg_loss:.6f}, Accuracy: {avg_accuracy:.2%} ({num_batches} batches)")
session.final_loss = session.current_loss