From 31a41785d6634f408bf0b5732d8deadcdaa1f8e1 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 9 Aug 2025 23:03:45 +0300 Subject: [PATCH] try fixing COB MA and COB data quality --- core/data_provider.py | 37 +++++-- core/standardized_data_provider.py | 53 +++++++--- data/ui_state.json | 6 +- web/clean_dashboard.py | 162 ++++++++++++++++++----------- web/models_training_panel.py | 11 +- 5 files changed, 174 insertions(+), 95 deletions(-) diff --git a/core/data_provider.py b/core/data_provider.py index 830e1fd..186183c 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -1059,20 +1059,43 @@ class DataProvider: return df def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]: - """Get historical OHLCV data from cache only - no external API calls""" + """Get historical OHLCV data. + - Prefer cached data for low latency. + - If cache is empty or refresh=True, fetch real data from exchanges. + - Never generate synthetic data. + """ try: - # Only return cached data - never trigger external API calls + # Serve from cache when available if symbol in self.cached_data and timeframe in self.cached_data[symbol]: cached_df = self.cached_data[symbol][timeframe] - if not cached_df.empty: - # Return requested amount from cached data + if not cached_df.empty and not refresh: return cached_df.tail(limit) - - logger.warning(f"No cached data available for {symbol} {timeframe}") + + # Cache empty or refresh requested: fetch real data now + df = self._fetch_from_binance(symbol, timeframe, limit) + if (df is None or df.empty): + df = self._fetch_from_mexc(symbol, timeframe, limit) + + if df is not None and not df.empty: + df = self._ensure_datetime_index(df) + # Store/merge into cache + if symbol not in self.cached_data: + self.cached_data[symbol] = {} + if timeframe not in self.cached_data[symbol] or self.cached_data[symbol][timeframe].empty: + self.cached_data[symbol][timeframe] = df.tail(1500) + else: + combined_df = pd.concat([self.cached_data[symbol][timeframe], df], ignore_index=False) + combined_df = combined_df[~combined_df.index.duplicated(keep='last')] + combined_df = combined_df.sort_index() + self.cached_data[symbol][timeframe] = combined_df.tail(1500) + logger.info(f"Cached {len(self.cached_data[symbol][timeframe])} candles for {symbol} {timeframe}") + return self.cached_data[symbol][timeframe].tail(limit) + + logger.warning(f"No real data available for {symbol} {timeframe} at request time") return None except Exception as e: - logger.error(f"Error getting cached data for {symbol} {timeframe}: {e}") + logger.error(f"Error getting historical data for {symbol} {timeframe}: {e}") return None diff --git a/core/standardized_data_provider.py b/core/standardized_data_provider.py index cad82f5..5160a52 100644 --- a/core/standardized_data_provider.py +++ b/core/standardized_data_provider.py @@ -271,41 +271,70 @@ class StandardizedDataProvider(DataProvider): with self.ma_calculation_lock: # Add current imbalance data to history self.cob_imbalance_history[symbol].append((timestamp, bid_ask_imbalance)) - + # Calculate MAs for different timeframes ma_results = {'1s': {}, '5s': {}, '15s': {}, '60s': {}} - + # Get current price for ±5 bucket calculation current_price = self.current_prices.get(symbol.replace('/', '').upper(), 0.0) if current_price <= 0: return ma_results - + bucket_size = 1.0 if 'ETH' in symbol else 10.0 - + + # Helper: quantize any floating price to the nearest COB bucket center used in snapshots + def quantize_to_bucket(p: float) -> float: + try: + # Align bucket to integer multiples of bucket_size around the rounded current price + base = round(current_price / bucket_size) * bucket_size + steps = round((p - base) / bucket_size) + return base + steps * bucket_size + except Exception: + return p + # Calculate MAs for ±5 buckets around current price for i in range(-5, 6): - price = current_price + (i * bucket_size) + raw_price = current_price + (i * bucket_size) + price = quantize_to_bucket(raw_price) if price <= 0: continue - + # Get historical imbalance data for this price bucket history = self.cob_imbalance_history[symbol] - + # Calculate different MA periods for period, period_name in [(1, '1s'), (5, '5s'), (15, '15s'), (60, '60s')]: recent_data = [] cutoff_time = timestamp - timedelta(seconds=period) - + for hist_timestamp, hist_imbalance in history: - if hist_timestamp >= cutoff_time and price in hist_imbalance: + if hist_timestamp < cutoff_time: + continue + # Attempt exact price key match; if not found, match nearest bucket key + if price in hist_imbalance: recent_data.append(hist_imbalance[price]) - + else: + # Find nearest key within half a bucket + try: + nearest_key = None + min_diff = bucket_size / 2.0 + for k in hist_imbalance.keys(): + diff = abs(float(k) - price) + if diff <= min_diff: + min_diff = diff + nearest_key = k + if nearest_key is not None: + recent_data.append(hist_imbalance[nearest_key]) + except Exception: + pass + # Calculate moving average if recent_data: - ma_results[period_name][price] = sum(recent_data) / len(recent_data) + ma_results[period_name][price] = float(sum(recent_data) / len(recent_data)) else: + # Respect rule: no synthetic metadata; use 0.0 for unavailable ma_results[period_name][price] = 0.0 - + return ma_results except Exception as e: diff --git a/data/ui_state.json b/data/ui_state.json index 13c2c0c..70e145f 100644 --- a/data/ui_state.json +++ b/data/ui_state.json @@ -21,9 +21,9 @@ "training_enabled": true }, "dqn_agent": { - "inference_enabled": true, - "training_enabled": true + "inference_enabled": "inference_enabled", + "training_enabled": false } }, - "timestamp": "2025-08-01T21:40:16.976016" + "timestamp": "2025-08-09T00:59:11.537013" } \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 06d6158..5c0e259 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -318,6 +318,11 @@ class CleanTradingDashboard: 'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots 'BTC/USDT': deque(maxlen=61) } + # Per-second imbalance history used for real moving averages over 1s/5s/15s/60s windows + self.cob_per_second_imbalance_history: Dict[str, deque] = { + 'ETH/USDT': deque(maxlen=120), # keep at least 60 seconds; 120 for headroom + 'BTC/USDT': deque(maxlen=120) + } # Initialize timezone timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia') @@ -366,6 +371,13 @@ class CleanTradingDashboard: # Then subscribe to updates self.data_provider.subscribe_to_cob(self._on_cob_data_update) logger.info("Subscribed to COB data updates from data provider") + # Also subscribe to 1s aggregated updates to build per-second imbalance series + try: + if hasattr(self.data_provider, 'subscribe_to_cob_aggregated'): + self.data_provider.subscribe_to_cob_aggregated(self._on_cob_1s_aggregated_update) + logger.info("Subscribed to COB 1s aggregated updates for per-second imbalance MAs") + except Exception as agg_e: + logger.error(f"Failed subscribing to COB aggregated updates: {agg_e}") except Exception as e: logger.error(f"Failed to start COB collection or subscribe: {e}") @@ -501,6 +513,35 @@ class CleanTradingDashboard: except Exception as e: logger.error(f"Error handling COB data update for {symbol}: {e}") + + def _on_cob_1s_aggregated_update(self, symbol: str, aggregated_data: dict): + """Receive 1s aggregated COB snapshot and record a single imbalance value per second. + This ensures moving averages are computed over true seconds, not over raw tick updates. + """ + try: + # Determine the per-second imbalance value + per_sec_imbalance = None + stats = aggregated_data.get('stats') or {} + # Prefer explicit 1s imbalance if available + if 'imbalance_1s' in stats and isinstance(stats.get('imbalance_1s'), (int, float)): + per_sec_imbalance = float(stats.get('imbalance_1s') or 0.0) + else: + # Fallback to aggregated imbalance average structure + imb_section = aggregated_data.get('imbalance') or {} + if isinstance(imb_section, dict) and 'average' in imb_section: + try: + per_sec_imbalance = float(imb_section.get('average') or 0.0) + except Exception: + per_sec_imbalance = 0.0 + if per_sec_imbalance is None: + per_sec_imbalance = 0.0 + + # Append to per-second history for the symbol + if symbol not in self.cob_per_second_imbalance_history: + self.cob_per_second_imbalance_history[symbol] = deque(maxlen=120) + self.cob_per_second_imbalance_history[symbol].append(per_sec_imbalance) + except Exception as e: + logger.error(f"Error handling COB 1s aggregated update for {symbol}: {e}") def start_overnight_training(self): """Start the overnight training session""" @@ -8931,73 +8972,68 @@ class CleanTradingDashboard: raise def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]: - """Calculate Moving Averages (MA) of imbalance over different periods.""" - stats = {} - history = self.cob_data_history.get(symbol) + """Calculate true per-second SMA of imbalance over 1s/5s/15s/60s windows. + Uses the per-second imbalance series populated by aggregated 1s updates. + Falls back to grouping raw updates by second if needed. + """ + try: + # Prefer per-second series if available + per_second_series = list(self.cob_per_second_imbalance_history.get(symbol, [])) - if not history: - return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + if not per_second_series: + # Fallback: build per-second averages from raw tick history + history = self.cob_data_history.get(symbol, []) + if history: + second_to_values: Dict[int, list] = {} + for snap in list(history): + try: + ts_ms = snap.get('timestamp') + if isinstance(ts_ms, (int, float)): + sec = int(int(ts_ms) / 1000) + else: + # If missing timestamp, skip + continue + imb = None + st = snap.get('stats') or {} + # Use raw tick imbalance if present; otherwise check 1s field + if 'imbalance' in st and isinstance(st.get('imbalance'), (int, float)): + imb = float(st.get('imbalance') or 0.0) + elif 'imbalance_1s' in st and isinstance(st.get('imbalance_1s'), (int, float)): + imb = float(st.get('imbalance_1s') or 0.0) + if imb is None: + continue + second_to_values.setdefault(sec, []).append(imb) + except Exception: + continue + # Sort by second and compute one value per second + per_second_series = [ + (sum(vals) / len(vals)) for _, vals in sorted(second_to_values.items()) + ] - # Convert history to list and get recent snapshots - history_list = list(history) - if not history_list: - return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + if not per_second_series: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - # Extract imbalance values from recent snapshots - imbalances = [] - for snap in history_list: - if isinstance(snap, dict) and 'stats' in snap and snap['stats']: - imbalance = snap['stats'].get('imbalance') - if imbalance is not None: - imbalances.append(imbalance) - - if not imbalances: - return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} - - # Calculate Moving Averages over different periods - # MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods - ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} - - for name, period in ma_periods.items(): - if len(imbalances) >= period: - # Calculate SMA over the last 'period' values - recent_imbalances = imbalances[-period:] - sma_value = sum(recent_imbalances) / len(recent_imbalances) - - # Also calculate EMA for better responsiveness - if period > 1: - # EMA calculation with alpha = 2/(period+1) - alpha = 2.0 / (period + 1) - ema_value = recent_imbalances[0] # Start with first value - for value in recent_imbalances[1:]: - ema_value = alpha * value + (1 - alpha) * ema_value - # Use EMA for better responsiveness - stats[name] = ema_value + def sma(values: list, n: int) -> float: + if not values or n <= 0: + return 0.0 + if len(values) < n: + # average available values + window = values[-len(values):] else: - # For 1s, use SMA (no EMA needed) - stats[name] = sma_value - else: - # If not enough data, use available data - available_imbalances = imbalances[-min(period, len(imbalances)):] - if available_imbalances: - if len(available_imbalances) > 1: - # Calculate EMA for available data - alpha = 2.0 / (len(available_imbalances) + 1) - ema_value = available_imbalances[0] - for value in available_imbalances[1:]: - ema_value = alpha * value + (1 - alpha) * ema_value - stats[name] = ema_value - else: - # Single value, use as is - stats[name] = available_imbalances[0] - else: - stats[name] = 0.0 - - # Debug logging to verify MA calculation - if any(value != 0.0 for value in stats.values()): - logger.debug(f"[MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)") - - return stats + window = values[-n:] + return sum(window) / float(len(window)) if window else 0.0 + + stats = { + '1s': sma(per_second_series, 1), + '5s': sma(per_second_series, 5), + '15s': sma(per_second_series, 15), + '60s': sma(per_second_series, 60), + } + + return stats + except Exception as e: + logger.error(f"Error calculating cumulative imbalance MAs for {symbol}: {e}") + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} def _connect_to_orchestrator(self): """Connect to orchestrator for real trading signals""" diff --git a/web/models_training_panel.py b/web/models_training_panel.py index 31f2327..0d257f4 100644 --- a/web/models_training_panel.py +++ b/web/models_training_panel.py @@ -293,16 +293,7 @@ class ModelsTrainingPanel: 'win_rate': safe_get(model_stats, 'win_rate', 0) } - # Extract real performance metrics from logs - # For DQN: we see "Performance: 81.9% (158/193)" in logs - if model_name == 'dqn_agent': - model_data['signal_stats']['accuracy'] = 81.9 # From logs - model_data['signal_stats']['total_signals'] = 193 # From logs - model_data['signal_stats']['correct_predictions'] = 158 # From logs - elif model_name == 'enhanced_cnn': - model_data['signal_stats']['accuracy'] = 65.3 # From logs - model_data['signal_stats']['total_signals'] = 193 # From logs - model_data['signal_stats']['correct_predictions'] = 126 # From logs + # Do not inject synthetic performance metrics; rely only on available stats return model_data