This commit is contained in:
Dobromir Popov
2025-12-09 00:34:51 +02:00
parent 8c3dc5423e
commit d6ada4b416
5 changed files with 534 additions and 429 deletions

View File

@@ -570,62 +570,6 @@ class RealTrainingAdapter:
except Exception as e:
logger.debug(f" {timeframe}: Replay failed: {e}")
# CRITICAL FIX: For 1s timeframe, if we still don't have enough data, generate from 1m candles
if timeframe == '1s' and (df is None or df.empty or len(df) < min_required_candles):
try:
df_1m = None
# First, try to get 1m data from already fetched timeframes
if '1m' in fetched_timeframes and '1m' in market_state.get('timeframes', {}):
# Convert dict format to DataFrame
tf_1m = market_state['timeframes']['1m']
try:
df_1m = pd.DataFrame({
'open': tf_1m['open'],
'high': tf_1m['high'],
'low': tf_1m['low'],
'close': tf_1m['close'],
'volume': tf_1m['volume']
}, index=pd.to_datetime(tf_1m['timestamps'], utc=True))
except Exception as e:
logger.debug(f" {timeframe}: Could not convert 1m dict to DataFrame: {e}")
# If we don't have 1m data yet, fetch it
if df_1m is None or df_1m.empty:
logger.debug(f" {timeframe}: Fetching 1m data to generate 1s candles...")
if duckdb_storage:
try:
df_1m = duckdb_storage.get_ohlcv_data(
symbol=symbol,
timeframe='1m',
start_time=tf_start_time,
end_time=tf_end_time,
limit=limit,
direction='before'
)
except Exception as e:
logger.debug(f" {timeframe}: Could not get 1m from DuckDB: {e}")
if df_1m is None or df_1m.empty:
# Try API for 1m
df_1m = self._fetch_historical_from_api(symbol, '1m', tf_start_time, tf_end_time, limit)
# Generate 1s candles from 1m if we have 1m data
if df_1m is not None and not df_1m.empty:
# Generate 1s candles from 1m
generated_1s = self._generate_1s_from_1m(df_1m, min_required_candles)
if generated_1s is not None and len(generated_1s) >= min_required_candles:
df = generated_1s
logger.info(f" {timeframe}: Generated {len(df)} candles from {len(df_1m)} 1m candles")
else:
logger.warning(f" {timeframe}: Generated only {len(generated_1s) if generated_1s is not None else 0} candles from 1m (need {min_required_candles})")
else:
logger.debug(f" {timeframe}: No 1m data available to generate 1s candles")
except Exception as e:
logger.warning(f" {timeframe}: Failed to generate from 1m: {e}")
import traceback
logger.debug(traceback.format_exc())
# Validate data quality before storing
if df is not None and not df.empty:
# Check minimum candle count
@@ -1446,122 +1390,6 @@ class RealTrainingAdapter:
state_size = agent.state_size if hasattr(agent, 'state_size') else 100
return [0.0] * state_size
def _generate_1s_from_1m(self, df_1m: pd.DataFrame, min_candles: int) -> Optional[pd.DataFrame]:
"""
Generate 1s candles from 1m candles by splitting each 1m candle into 60 1s candles.
Args:
df_1m: DataFrame with 1m OHLCV data
min_candles: Minimum number of 1s candles to generate
Returns:
DataFrame with 1s OHLCV data or None if generation fails
"""
import pandas as pd
from datetime import timedelta
try:
if df_1m is None or df_1m.empty:
return None
# Ensure we have required columns
required_cols = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df_1m.columns for col in required_cols):
logger.warning("1m DataFrame missing required columns for 1s generation")
return None
# Generate 1s candles from each 1m candle
# Each 1m candle becomes 60 1s candles
candles_1s = []
for idx, row in df_1m.iterrows():
# Get timestamp (handle both index and column)
if isinstance(df_1m.index, pd.DatetimeIndex):
timestamp = idx
elif 'timestamp' in df_1m.columns:
timestamp = pd.to_datetime(row['timestamp'])
else:
logger.warning("Cannot determine timestamp for 1m candle")
continue
# Extract OHLCV values
open_price = float(row['open'])
high_price = float(row['high'])
low_price = float(row['low'])
close_price = float(row['close'])
volume = float(row['volume'])
# Calculate price change per second (linear interpolation)
price_change = close_price - open_price
price_per_second = price_change / 60.0
# Volume per second (distributed evenly)
volume_per_second = volume / 60.0
# Generate 60 1s candles from this 1m candle
for second in range(60):
# Calculate timestamp for this second
candle_time = timestamp + timedelta(seconds=second)
# Interpolate price (linear from open to close)
if second == 0:
candle_open = open_price
candle_close = open_price + price_per_second
elif second == 59:
candle_open = open_price + (price_per_second * 59)
candle_close = close_price
else:
candle_open = open_price + (price_per_second * second)
candle_close = open_price + (price_per_second * (second + 1))
# High and low: use the interpolated prices, but ensure they stay within the 1m candle's range
# Add small random variation to make it more realistic (but keep within bounds)
candle_high = max(candle_open, candle_close)
candle_low = min(candle_open, candle_close)
# Ensure high/low don't exceed the 1m candle's range
candle_high = min(candle_high, high_price)
candle_low = max(candle_low, low_price)
# If high == low, add small spread
if candle_high == candle_low:
spread = (high_price - low_price) / 60.0
candle_high = candle_high + (spread * 0.5)
candle_low = candle_low - (spread * 0.5)
candles_1s.append({
'timestamp': candle_time,
'open': candle_open,
'high': candle_high,
'low': candle_low,
'close': candle_close,
'volume': volume_per_second
})
if not candles_1s:
return None
# Convert to DataFrame
df_1s = pd.DataFrame(candles_1s)
df_1s['timestamp'] = pd.to_datetime(df_1s['timestamp'])
df_1s = df_1s.set_index('timestamp')
# Sort by timestamp
df_1s = df_1s.sort_index()
# Take the most recent candles up to the limit
if len(df_1s) > min_candles:
df_1s = df_1s.tail(min_candles)
logger.debug(f"Generated {len(df_1s)} 1s candles from {len(df_1m)} 1m candles")
return df_1s
except Exception as e:
logger.warning(f"Error generating 1s candles from 1m: {e}")
import traceback
logger.debug(traceback.format_exc())
return None
def _fetch_historical_from_api(self, symbol: str, timeframe: str, start_time: datetime, end_time: datetime, limit: int) -> Optional[pd.DataFrame]:
"""
Fetch historical OHLCV data from exchange APIs for a specific time range
@@ -3288,34 +3116,13 @@ class RealTrainingAdapter:
logger.info(f"Stopped real-time inference: {inference_id}")
def get_latest_signals(self, limit: int = 50) -> List[Dict]:
"""Get latest inference signals from orchestrator predictions and active sessions"""
"""Get latest inference signals from all active sessions"""
if not hasattr(self, 'inference_sessions'):
return []
all_signals = []
# CRITICAL FIX: Get signals from orchestrator's stored predictions (primary source)
if self.orchestrator and hasattr(self.orchestrator, 'recent_transformer_predictions'):
# Get predictions for all symbols
for symbol, predictions in self.orchestrator.recent_transformer_predictions.items():
if predictions:
# Convert predictions to signal format
for pred in list(predictions)[-limit:]:
signal = {
'timestamp': pred.get('timestamp', datetime.now(timezone.utc).isoformat()),
'action': pred.get('action', 'HOLD'),
'confidence': pred.get('confidence', 0.0),
'current_price': pred.get('current_price', 0.0),
'price': pred.get('current_price', 0.0), # Alias for compatibility
'predicted_price': pred.get('predicted_price', pred.get('current_price', 0.0)),
'price_change': pred.get('price_change', 0.0),
'model': 'transformer',
'predicted_candle': pred.get('predicted_candle', {}),
'source': pred.get('source', 'live_inference')
}
all_signals.append(signal)
# Also get signals from active inference sessions (secondary source)
if hasattr(self, 'inference_sessions'):
for session in self.inference_sessions.values():
all_signals.extend(session.get('signals', []))
for session in self.inference_sessions.values():
all_signals.extend(session.get('signals', []))
# Sort by timestamp and return latest
all_signals.sort(key=lambda x: x.get('timestamp', ''), reverse=True)