try fixing COB MA and COB data quality

This commit is contained in:
Dobromir Popov
2025-08-09 23:03:45 +03:00
parent 87193f3d6f
commit 31a41785d6
5 changed files with 174 additions and 95 deletions

View File

@ -318,6 +318,11 @@ class CleanTradingDashboard:
'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots
'BTC/USDT': deque(maxlen=61)
}
# Per-second imbalance history used for real moving averages over 1s/5s/15s/60s windows
self.cob_per_second_imbalance_history: Dict[str, deque] = {
'ETH/USDT': deque(maxlen=120), # keep at least 60 seconds; 120 for headroom
'BTC/USDT': deque(maxlen=120)
}
# Initialize timezone
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
@ -366,6 +371,13 @@ class CleanTradingDashboard:
# Then subscribe to updates
self.data_provider.subscribe_to_cob(self._on_cob_data_update)
logger.info("Subscribed to COB data updates from data provider")
# Also subscribe to 1s aggregated updates to build per-second imbalance series
try:
if hasattr(self.data_provider, 'subscribe_to_cob_aggregated'):
self.data_provider.subscribe_to_cob_aggregated(self._on_cob_1s_aggregated_update)
logger.info("Subscribed to COB 1s aggregated updates for per-second imbalance MAs")
except Exception as agg_e:
logger.error(f"Failed subscribing to COB aggregated updates: {agg_e}")
except Exception as e:
logger.error(f"Failed to start COB collection or subscribe: {e}")
@ -501,6 +513,35 @@ class CleanTradingDashboard:
except Exception as e:
logger.error(f"Error handling COB data update for {symbol}: {e}")
def _on_cob_1s_aggregated_update(self, symbol: str, aggregated_data: dict):
"""Receive 1s aggregated COB snapshot and record a single imbalance value per second.
This ensures moving averages are computed over true seconds, not over raw tick updates.
"""
try:
# Determine the per-second imbalance value
per_sec_imbalance = None
stats = aggregated_data.get('stats') or {}
# Prefer explicit 1s imbalance if available
if 'imbalance_1s' in stats and isinstance(stats.get('imbalance_1s'), (int, float)):
per_sec_imbalance = float(stats.get('imbalance_1s') or 0.0)
else:
# Fallback to aggregated imbalance average structure
imb_section = aggregated_data.get('imbalance') or {}
if isinstance(imb_section, dict) and 'average' in imb_section:
try:
per_sec_imbalance = float(imb_section.get('average') or 0.0)
except Exception:
per_sec_imbalance = 0.0
if per_sec_imbalance is None:
per_sec_imbalance = 0.0
# Append to per-second history for the symbol
if symbol not in self.cob_per_second_imbalance_history:
self.cob_per_second_imbalance_history[symbol] = deque(maxlen=120)
self.cob_per_second_imbalance_history[symbol].append(per_sec_imbalance)
except Exception as e:
logger.error(f"Error handling COB 1s aggregated update for {symbol}: {e}")
def start_overnight_training(self):
"""Start the overnight training session"""
@ -8931,73 +8972,68 @@ class CleanTradingDashboard:
raise
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
"""Calculate Moving Averages (MA) of imbalance over different periods."""
stats = {}
history = self.cob_data_history.get(symbol)
"""Calculate true per-second SMA of imbalance over 1s/5s/15s/60s windows.
Uses the per-second imbalance series populated by aggregated 1s updates.
Falls back to grouping raw updates by second if needed.
"""
try:
# Prefer per-second series if available
per_second_series = list(self.cob_per_second_imbalance_history.get(symbol, []))
if not history:
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
if not per_second_series:
# Fallback: build per-second averages from raw tick history
history = self.cob_data_history.get(symbol, [])
if history:
second_to_values: Dict[int, list] = {}
for snap in list(history):
try:
ts_ms = snap.get('timestamp')
if isinstance(ts_ms, (int, float)):
sec = int(int(ts_ms) / 1000)
else:
# If missing timestamp, skip
continue
imb = None
st = snap.get('stats') or {}
# Use raw tick imbalance if present; otherwise check 1s field
if 'imbalance' in st and isinstance(st.get('imbalance'), (int, float)):
imb = float(st.get('imbalance') or 0.0)
elif 'imbalance_1s' in st and isinstance(st.get('imbalance_1s'), (int, float)):
imb = float(st.get('imbalance_1s') or 0.0)
if imb is None:
continue
second_to_values.setdefault(sec, []).append(imb)
except Exception:
continue
# Sort by second and compute one value per second
per_second_series = [
(sum(vals) / len(vals)) for _, vals in sorted(second_to_values.items())
]
# Convert history to list and get recent snapshots
history_list = list(history)
if not history_list:
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
if not per_second_series:
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
# Extract imbalance values from recent snapshots
imbalances = []
for snap in history_list:
if isinstance(snap, dict) and 'stats' in snap and snap['stats']:
imbalance = snap['stats'].get('imbalance')
if imbalance is not None:
imbalances.append(imbalance)
if not imbalances:
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
# Calculate Moving Averages over different periods
# MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods
ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
for name, period in ma_periods.items():
if len(imbalances) >= period:
# Calculate SMA over the last 'period' values
recent_imbalances = imbalances[-period:]
sma_value = sum(recent_imbalances) / len(recent_imbalances)
# Also calculate EMA for better responsiveness
if period > 1:
# EMA calculation with alpha = 2/(period+1)
alpha = 2.0 / (period + 1)
ema_value = recent_imbalances[0] # Start with first value
for value in recent_imbalances[1:]:
ema_value = alpha * value + (1 - alpha) * ema_value
# Use EMA for better responsiveness
stats[name] = ema_value
def sma(values: list, n: int) -> float:
if not values or n <= 0:
return 0.0
if len(values) < n:
# average available values
window = values[-len(values):]
else:
# For 1s, use SMA (no EMA needed)
stats[name] = sma_value
else:
# If not enough data, use available data
available_imbalances = imbalances[-min(period, len(imbalances)):]
if available_imbalances:
if len(available_imbalances) > 1:
# Calculate EMA for available data
alpha = 2.0 / (len(available_imbalances) + 1)
ema_value = available_imbalances[0]
for value in available_imbalances[1:]:
ema_value = alpha * value + (1 - alpha) * ema_value
stats[name] = ema_value
else:
# Single value, use as is
stats[name] = available_imbalances[0]
else:
stats[name] = 0.0
# Debug logging to verify MA calculation
if any(value != 0.0 for value in stats.values()):
logger.debug(f"[MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)")
return stats
window = values[-n:]
return sum(window) / float(len(window)) if window else 0.0
stats = {
'1s': sma(per_second_series, 1),
'5s': sma(per_second_series, 5),
'15s': sma(per_second_series, 15),
'60s': sma(per_second_series, 60),
}
return stats
except Exception as e:
logger.error(f"Error calculating cumulative imbalance MAs for {symbol}: {e}")
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
def _connect_to_orchestrator(self):
"""Connect to orchestrator for real trading signals"""

View File

@ -293,16 +293,7 @@ class ModelsTrainingPanel:
'win_rate': safe_get(model_stats, 'win_rate', 0)
}
# Extract real performance metrics from logs
# For DQN: we see "Performance: 81.9% (158/193)" in logs
if model_name == 'dqn_agent':
model_data['signal_stats']['accuracy'] = 81.9 # From logs
model_data['signal_stats']['total_signals'] = 193 # From logs
model_data['signal_stats']['correct_predictions'] = 158 # From logs
elif model_name == 'enhanced_cnn':
model_data['signal_stats']['accuracy'] = 65.3 # From logs
model_data['signal_stats']['total_signals'] = 193 # From logs
model_data['signal_stats']['correct_predictions'] = 126 # From logs
# Do not inject synthetic performance metrics; rely only on available stats
return model_data